代码之家  ›  专栏  ›  技术社区  ›  MawrCoffeePls

任何DAG故障的全局警报

  •  0
  • MawrCoffeePls  · 技术社区  · 7 年前

    我目前有100多个DAG正在生产中。我知道如何添加警报 on_failure_callback 对于由上游故障触发的操作员,有没有一种方法可以将气流本身配置为在DAG故障时始终发送电子邮件,而不必通过并更新我的每个DAG来单独发出故障警报?

    1 回复  |  直到 7 年前
        1
  •  5
  •   Daniel Huang    7 年前

    据我所知不是这样,但我有这个助手来处理我的全局/默认dag/运算符设置:

    def on_failure_callback(context):
        ...
    
    def on_success_callback(context):
        ...
    
    def build_default_args(**kwargs):
        default_args = {
            'on_failure_callback': on_failure_callback,
            'on_success_callback': on_success_callback,
            'owner': 'me',
            'queue': 'default',
            'execution_timeout': timedelta(hours=1),
            'retries': 3,
            'retry_delay': timedelta(seconds=10),
        }
        default_args.update(kwargs)
        return default_args
    

    dag = DAG(
        dag_id='my_dag',
        default_args=build_default_args(
            start_date=datetime(2017, 9, 20),
            execution_timeout=timedelta(hours=8),  # overrides default
        ),
        schedule_interval='@hourly',
    )
    

    或者一些自定义基础 DAG