代码之家  ›  专栏  ›  技术社区  ›  dark horse

气流-脚本在初始化期间执行

  •  0
  • dark horse  · 技术社区  · 6 年前

    我有一个Airflow脚本,可以将数据从表2插入表1。作为airflow初始化过程的一部分,我看到insert函数一直在后台运行,即使我没有触发它或计划它。我想知道脚本中有什么错误导致它自动触发。我需要在下面的脚本中修改什么,以确保它不会在初始化过程中执行该命令。

    ## Library Imports
    import psycopg2
    import airflow
    from airflow import DAG
    from airflow.operators import BashOperator
    from sqlalchemy import create_engine
    import io
    
    
    # Following are defaults which can be overridden later on
    default_args = {
    'owner': 'admin',
    'depends_on_past': False,
    'start_date': datetime(2018, 5, 25),
    'email': ['admin@mail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    }
    
    dag = DAG('sample', default_args=default_args)
    
    
    #######################
    
    def db_login():
        global db_con
    try:
        db_con = psycopg2.connect(" dbname = 'db' user = 'user' password = 'pass' host = 'hostname' port = '5439' sslmode = 'require' ")
    except:
        print("I am unable to connect to the database.")
    print('Connection Task Complete: Connected to DB')
    return(db_con)
    
    
    #######################
    
    def insert_data():
        cur = db_con.cursor()
        cur.execute("""insert into table_1 select id,name,status from table_2 limit 2 ;""")
        db_con.commit()
        print('ETL Task Complete: Inserting data into table_1')
    
    
    db_login()
    insert_data()
    db_con.close()
    
    ##########################################
    
    
    t1 = BashOperator(
    task_id='db_con',
    python_callable=db_login(),
    bash_command='python3 ~/airflow/dags/sample.py',
    email_on_failure=True,
    email=['admin@mail.com'],
    dag=dag)
    
    t2 = BashOperator(
    task_id='insert',
    python_callable=insert_data(),
    bash_command='python3 ~/airflow/dags/sample.py',
    email_on_failure=True,
    email=['admin@mail.com'],
    dag=dag)
    
    
    t1.set_downstream(t2)
    

    有人能帮忙吗。谢谢

    更新代码:

    ## Third party Library Imports
    
    import psycopg2
    import airflow
    from airflow import DAG
    from airflow.operators import BashOperator
    from datetime import datetime, timedelta
    from sqlalchemy import create_engine
    import io
    
    
    
    default_args = {
    'owner': 'admin',
    #'depends_on_past': False,
    'start_date': datetime(2018, 5, 25),
     'email': ['admin@mail.com'],
     'email_on_failure': True,
     'email_on_retry': True,
     'retries': 1,
     'retry_delay': timedelta(minutes=1), }
    
    dag = DAG('sample', default_args=default_args, catchup=False, schedule_interval="@once")
    
    
    def db_login():
        global db_con
        try:
            db_con = psycopg2.connect(
            " dbname = 'db' user = 'user' password = 'password' host = 'host' port = '5439' sslmode = 'require' ")
        except:
            print("I am unable to connect to the database.")
        print('Connection success')
        return (db_con)
    
    def insert_data():
        cur = db_con.cursor()
        cur.execute("""insert into table_1 select id,name,status from table_2 limit 2;""")
        db_con.commit()
        print('ETL Task Complete: Inserting data into table_1')
    
    def load_etl():
        db_login()
        insert_data()
        dwh_connection.close()
    
    #Function to execute the query
    load_etl()
    
    t1 = BashOperator(
        task_id='db_connection',
        python_callable=load_etl(),
        bash_command='python3 ~/airflow/dags/sample.py',
        email_on_failure=True,
        email=['admin@mail.com'],
        dag=dag)
    
    #t2 = BashOperator(
    #task_id='ops_load_del',
    #python_callable=insert_data(),
    #bash_command='python3 ~/airflow/dags/sample.py',
    #email_on_failure=True,
    #email=['admin@mail.com'],
    #dag=dag)
    
    t1
    #t1.set_downstream(t2)
    
    1 回复  |  直到 6 年前
        1
  •  0
  •   tobi6    6 年前

    如果你从Python风格的角度来看DAG,缩进会引发一些思考。

    首先,试着用 python name-of-dag.py .是的,不要使用 airflow 命令气流的某些部分也在做这项工作,以检查该做什么。

    现在,如果某个代码正在执行,那么这可能与intendation有关。

    功能分析

    这里的凹痕似乎断裂了:

    def db_login(): 全球db_con 尝试: db_con=psycopg2。连接(“数据库名='db'用户='user'密码='pass'主机='hostname'端口='5439'sslmode='require'”) 除了: 打印(“我无法连接到数据库。”) 打印('连接任务完成:连接到数据库') 返回(db_con)

    应该是:

    def db_login():
        global db_con
        try:
            db_con = psycopg2.connect(" dbname = 'db' user = 'user' password = 'pass' host = 'hostname' port = '5439' sslmode = 'require' ")
        except:
            print("I am unable to connect to the database.")
        print('Connection Task Complete: Connected to DB')
        return(db_con)
    

    否则,将始终执行最左边的代码。

    也: 全球的 在气流的其他方法中,变量不一定可用!要共享连接,请使用例如气流 XCOM : https://airflow.apache.org/concepts.html#xcoms

    直接在DAG中调用函数

    另外,由于我不知道的原因,你想要执行一些函数 完全不受气流控制 但每一次处决。

    db_login() 
    insert_data()
    db_con.close()
    

    此代码将被执行 每次叫DAG的时候 你的日程安排可能会完全不同。

    如果您希望此代码用于测试目的,您可能希望将其放在主调用中:

    if __name__ == '__main__':
        db_login() 
        insert_data()
        db_con.close()
    

    即使您这样做了,关闭操作也仅在此工作流中可用,而在DAG中不可用。没有关闭连接的任务。

    因为你正在使用 PythonOperator 为了实现这一点,最好构建一个小型def,并且只有一个任务称为该def:

    def load_etl():
        db_login() 
        insert_data()
        db_con.close()
    

    TL/DR: 消除所有缩进错误,这样如果纯粹用Python调用文件,就不会执行任何代码。

    编辑

    这也意味着没有函数调用在任务或def之外。这条线

    #Function to execute the query
    load_etl()
    

    将被执行,因为它不是任务或def的一部分。它必须被移除。既然函数调用是任务的一部分,那么它就应该可以工作了。

    因为这个函数是Python函数,所以应该使用 蟒蛇器 及其参数 python_callable=load_etl (注:该行末尾无括号)