代码之家  ›  专栏  ›  技术社区  ›  Black Dynamite

Apache Airflow 2.0.0.b2-动态EmailOperator[文件]属性

  •  0
  • Black Dynamite  · 技术社区  · 4 年前

    TL;DR如何创建一个动态EmailOperator,从一个XCom属性的文件路径发送一个文件

    大家好,

    我使用的是ApacheAirflow 2.0.0.b2。我的问题是,我的DAG创建了一个文件,其名称在运行时发生了变化。我想通过电子邮件发送此文件,但在将动态文件名输入EmailOperator时遇到问题。

    我试过的都失败了!:

    1. 使用模板 files 财产。

      files=["{{ ti.xcom_pull(key='OUTPUT_CSV') }}"],
      

      不幸的是,只有当操作符中的字段被标记为模板时,模板才起作用。 文件夹 不是EmailOperator上的模板字段

    2. 使用函数动态创建我的任务

       def get_email_operator(?...):
          export_file_path = ti.xcom_pull(key='OUTPUT_CSV')
          email_subject = 'Some Subject'
          return EmailOperator(
              task_id="get_email_operator",
              to=['someemail@somedomain.net'],
              subject=email_subject,
              files=[export_file_path,],
              html_content='<br>',
              dag=current_dag)
      
      ..task3 >> get_email_operator() >> task4
      

      不幸的是,我似乎不知道如何通过电流 **kwargs ti 将信息输入函数调用以获取当前文件路径。

    编辑: 下面埃拉德的回答给了我正确的方向。我唯一要做的就是添加 kwargs 打电话的时候操作执行()

    解决方案:

    def get_email_operator(**kwargs):
        export_file_path = kwargs['ti'].xcom_pull(key='OUTPUT_CSV')
        email_subject = 'Termed Drivers - ' + date_string
        op = EmailOperator(
            task_id="get_email_operator",
            to=['someemail@somedomain.net'],
            subject=email_subject,
            files=[export_file_path,],
            html_content='<br>')
        op.execute(kwargs)
    
    0 回复  |  直到 4 年前
        1
  •  2
  •   Black Dynamite    4 年前

    文件将在Airflow 2中模板化为 PR 上周被合并了。

    但是,您不需要等待,您可以用自己的自定义操作符包装当前操作符,指定模板化字段的列表。

    比如:

    class MyEmailOperator(EmailOperator):
         template_fields = ('to', 'subject', 'html_content', 'files')
    

    那你可以用 MyEmailOperator 在你的代码里。 files 将被模板化。

    您还可以通过使用包装EmailOperator的PythonOperator来解决此问题:

    def get_email_operator(**context):
        xcom = context['ti'].xcom_pull(task_ids='OUTPUT_CSV')
        email_subject = 'Some Subject'
        op = EmailOperator(
            task_id="get_email_operator",
            to=['someemail@somedomain.net'],
            subject=email_subject,
            files=[xcom,],
            html_content='<br>')
        op.execute()
    
    python = PythonOperator(
        task_id='archive_s3_file',
        dag=dag,
        python_callable=get_email_operator,
        provide_context=True
    )
    
    ..task3 >> python >> task4