代码之家  ›  专栏  ›  技术社区  ›  ozw1z5rd

气流触发器如何更改执行日期

  •  9
  • ozw1z5rd  · 技术社区  · 7 年前

    我注意到,对于计划任务,执行日期是根据

    气流是为ETL需求开发的解决方案。在ETL领域, 您通常会总结数据。所以,如果我想总结 2016-02-19,我会在2016-02-20格林威治标准时间午夜做这件事 在2016-02-19的所有数据可用之后。

    但是,当一个dag触发另一个dag时,执行时间设置为now()。

    有没有办法使触发的dag与触发dag的执行时间相同?当然,我可以重写模板并使用Dayed\u ds,然而,这是一个棘手的解决方案。

    4 回复  |  直到 7 年前
        1
  •  6
  •   akoeltringer    6 年前

    下面的类扩展到 TriggerDagRunOperator 允许将执行日期作为字符串传递,然后将其转换回datetime。这有点粗糙,但这是我找到的完成工作的唯一方法。

    from datetime import datetime
    import logging
    
    from airflow import settings
    from airflow.utils.state import State
    from airflow.models import DagBag
    from airflow.operators.dagrun_operator import TriggerDagRunOperator, DagRunOrder
    
    class MMTTriggerDagRunOperator(TriggerDagRunOperator):
        """
        MMT-patched for passing explicit execution date
        (otherwise it's hard to hook the datetime.now() date).
        Use when you want to explicity set the execution date on the target DAG
        from the controller DAG.
    
        Adapted from Paul Elliot's solution on airflow-dev mailing list archives:
        http://mail-archives.apache.org/mod_mbox/airflow-dev/201711.mbox/%3cCAJuWvXgLfipPmMhkbf63puPGfi_ezj8vHYWoSHpBXysXhF_oZQ@mail.gmail.com%3e
    
        Parameters
        ------------------
        execution_date: str
            the custom execution date (jinja'd)
    
        Usage Example:
        -------------------
        my_dag_trigger_operator = MMTTriggerDagRunOperator(
            execution_date="{{execution_date}}"
            task_id='my_dag_trigger_operator',
            trigger_dag_id='my_target_dag_id',
            python_callable=lambda: random.getrandbits(1),
            params={},
            dag=my_controller_dag
        )
        """
        template_fields = ('execution_date',)
    
        def __init__(
            self, trigger_dag_id, python_callable, execution_date,
            *args, **kwargs
            ):
            self.execution_date = execution_date
            super(MMTTriggerDagRunOperator, self).__init__(
                trigger_dag_id=trigger_dag_id, python_callable=python_callable,
               *args, **kwargs
           )
    
        def execute(self, context):
            run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
            dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
            dro = self.python_callable(context, dro)
            if dro:
                session = settings.Session()
                dbag = DagBag(settings.DAGS_FOLDER)
                trigger_dag = dbag.get_dag(self.trigger_dag_id)
                dr = trigger_dag.create_dagrun(
                    run_id=dro.run_id,
                    state=State.RUNNING,
                    execution_date=self.execution_date,
                    conf=dro.payload,
                    external_trigger=True)
                logging.info("Creating DagRun {}".format(dr))
                session.add(dr)
                session.commit()
                session.close()
            else:
                logging.info("Criteria not met, moving on")
    

    使用此设置而不使用此设置时可能会遇到问题 execution_date=now() :如果您尝试使用相同的 execution_date 两次这是因为 执行日期 dag_id 用于创建行索引,不能插入索引相同的行。

    我想不出你为什么要用同样的机器运行两个相同的DAG 执行日期 无论如何,在生产中,但这是我在测试时遇到的东西,您不应该对此感到震惊。只需清除旧作业或使用其他日期时间。

        2
  •  5
  •   Ena    6 年前

    这个 TriggerDagRunOperator 现在有一个 execution_date 参数设置触发运行的执行日期。 遗憾的是,参数不在模板字段中。 如果将其添加到模板字段中(或者如果覆盖运算符并更改template_fields值),则可以这样使用它:

    my_trigger_task= TriggerDagRunOperator(task_id='my_trigger_task',
                                                  trigger_dag_id="triggered_dag_id",
                                                  python_callable=conditionally_trigger,
                                                  execution_date= '{{execution_date}}',
                                                  dag=dag)
    

    它尚未发布,但您可以在此处查看来源: https://github.com/apache/incubator-airflow/blob/master/airflow/operators/dagrun_operator.py

    进行更改的提交是: https://github.com/apache/incubator-airflow/commit/089c996fbd9ecb0014dbefedff232e8699ce6283#diff-41f9029188bd5e500dec9804fed26fb4

        3
  •  3
  •   ozw1z5rd    6 年前

    我对MMTTriggerDagRunOperator进行了一些改进。该功能检查dag\U运行是否已经存在,如果发现,请使用气流清除功能重新启动dag。这允许我们在dag之间创建依赖关系,因为将执行日期移动到触发的dag的可能性打开了一个惊人的可能性的整个宇宙。我想知道为什么这不是气流中的默认行为。

       def execute(self, context):
            run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
            dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
            dro = self.python_callable(context, dro)
            if dro:
                session = settings.Session()
                dbag = DagBag(settings.DAGS_FOLDER)
                trigger_dag = dbag.get_dag(self.trigger_dag_id)
    
                if not trigger_dag.get_dagrun( self.execution_date ):
                    dr = trigger_dag.create_dagrun(
                           run_id=dro.run_id,
                           state=State.RUNNING,
                           execution_date=self.execution_date,
                           conf=dro.payload,
                           external_trigger=True
                    )
                    logging.info("Creating DagRun {}".format(dr))
                    session.add(dr)
                    session.commit()
                else:
                    trigger_dag.clear( 
                        start_date = self.execution_date,
                        end_date = self.execution_date,
                        only_failed = False,
                        only_running = False,
                        confirm_prompt = False, 
                        reset_dag_runs = True, 
                        include_subdags= False,
                        dry_run = False 
                    )
                    logging.info("Cleared DagRun {}".format(trigger_dag))
    
                session.close()
            else:
                logging.info("Criteria not met, moving on")
    
        4
  •  0
  •   Sreenath Kamath    6 年前

    气流的实验API部分中提供了一个功能,允许您触发具有特定执行日期的dag。
    https://github.com/apache/incubator-airflow/blob/master/airflow/api/common/experimental/trigger_dag.py

    您可以将此函数作为 蟒蛇算子 并实现目标。

    所以看起来像
    from airflow.api.common.experimental.trigger_dag import trigger_dag

    trigger_operator=PythonOperator(task_id='YOUR_TASK_ID',
                                    python_callable=trigger_dag,
                                    op_args=['dag_id'],
                                    op_kwargs={'execution_date': datetime.now()})