代码之家  ›  专栏  ›  技术社区  ›  Kyle Bridenstine

气流S3键传感器-如何使其继续运行

  •  8
  • Kyle Bridenstine  · 技术社区  · 6 年前

    在…的帮助下 this Stackoverflow post 我刚做了一个程序(如文章所示),当一个文件放在S3桶中时,一个正在运行的DAG中的任务被触发,然后我使用bashoperator执行一些工作。一旦完成,尽管DAG不再处于运行状态,而是进入成功状态,如果我想让它获取另一个文件,我需要清除所有“过去”、“未来”、“上游”、“下游”活动。我想做这个程序,这样它总是在运行,每当一个新的文件放在S3存储桶中,程序就开始执行任务。

    我可以继续使用s3keysenor来完成这个任务吗,或者我需要找出一种设置 External Trigger 来运行我的DAG?到目前为止,如果只运行一次,我的S3keysensor就没有意义了。

    from airflow import DAG
    from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
    from datetime import datetime, timedelta
    from airflow.operators.bash_operator import BashOperator
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2018, 5, 29),
        'email': ['something@here.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 5,
        'retry_delay': timedelta(minutes=5)
    }
    
    dag = DAG('s3_triggered_emr_cluster_dag', default_args=default_args, schedule_interval= '@once')
    
    # This Activity runs a Python script that creates an AWS EMR cluster and then does EMR activity on the EMR cluster.
    t2 = BashOperator(
        task_id='create_emr_cluster_1',
        bash_command='python /home/ec2-user/aws-python-sample/Create_EMR_Then_Do_EMR_Activities.py',
        retries=1,
        dag=dag)
    
    t1 = BashOperator(
        task_id='success_log',
        bash_command='echo "Dag ran successfully" >> /home/ec2-user/s3_triggered_dag.txt',
        dag=dag)
    
    sensor = S3KeySensor(
        task_id='new_s3_file_in_foobar-bucket',
        bucket_key='*',
        wildcard_match=True,
        bucket_name='foobar-bucket',
        s3_conn_id='s3://foobar-bucket',
        timeout=18*60*60,
        poke_interval=120,
        dag=dag)
    
    t1.set_upstream(sensor)
    t2.set_upstream(t1)
    

    我想知道这是否不可能,因为它不是有向非循环图,而是有一个重复的循环 传感器->T1->T2->传感器->T1->T2->传感器->…不断重复

    更新:

    我的用例非常简单,每当一个新文件被放置在一个指定的AWS S3桶中时,我希望触发DAG并开始我的各种任务的处理。这些任务将执行如下操作:实例化一个新的AWS EMR集群,从AWS S3存储桶中提取文件,执行一些AWS EMR活动,然后关闭AWS EMR集群。从那里,DAG将返回到等待状态,在该状态下,它将等待新文件到达AWS S3存储桶,然后无限期地重复该过程。

    1 回复  |  直到 6 年前
        1
  •  8
  •   Taylor D. Edmiston    6 年前

    在气流中,没有一个概念可以映射到一个始终运行的DAG。如果适合您的用例,您可以每隔1到5分钟运行一次DAG。

    这里的主要内容是,s3keysensor检查直到它检测到第一个文件存在于密钥的通配符路径(或超时)中,然后运行它。但是,当第二个、第三个或第四个文件着陆时,S3传感器将已经完成该DAG运行的运行。在下一次DAG运行之前,它不会被安排再次运行。(您描述的循环思想大致相当于调度程序在创建DAG时所做的操作,但不是永久运行)。

    对于您的用例来说,外部触发器听起来绝对是最好的方法,无论该触发器是通过airflow cli的 trigger_dag 命令( $ airflow trigger_dag ... ):

    https://github.com/apache/incubator-airflow/blob/972086aeba4616843005b25210ba3b2596963d57/airflow/bin/cli.py#L206-L222

    或者通过其他API:

    https://github.com/apache/incubator-airflow/blob/5de22d7fa0d8bc6b9267ea13579b5ac5f62c8bb5/airflow/www/api/experimental/endpoints.py#L41-L89

    都掉头打电话给 trigger_dag 通用(实验)API中的函数:

    https://github.com/apache/incubator-airflow/blob/089c996fbd9ecb0014dbefedff232e8699ce6283/airflow/api/common/experimental/trigger_dag.py#L28-L67

    例如,您可以设置一个AWS lambda函数,当文件落在S3上时调用该函数,运行触发器DAG调用。