代码之家  ›  专栏  ›  技术社区  ›  Glauberiano

气流-尝试运行TaskInstance时从sqlalchemy引发的错误

  •  0
  • Glauberiano  · 技术社区  · 3 年前

    我正在测试气流结构内部爬行器的执行情况。如果我运行下面的脚本,一切正常,有效载荷也会打印出来。

    from airflow import DAG
    from airflow.models import BaseOperator, TaskInstance
    from hooks.crawler_hook import CrawlerHook
    
    from datetime import datetime
    import time
    
    class CrawlerOperator(BaseOperator):
        def __init__(self, conn_id=None, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.conn_id = conn_id
    
        def execute(self):
            hook = CrawlerHook(conn_id=self.conn_id)
            print(hook.run())
    
    if __name__ == "__main__":
        CrawlerOperator(task_id='test_run').execute()
    

    但当我试图在DAG中运行TaskInstance时,我遇到了一个错误,无法理解为什么:

    if __name__ == "__main__":
        with DAG(dag_id="DAG1", start_date=datetime.now(), catchup=False) as dag:
            to = CrawlerOperator(task_id="test_run")
            ti = TaskInstance(task=to)
            ti.run()
    

    错误是:

    Traceback (most recent call last):
      File "/home/../.env/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
        return func(*args, **kwargs)
      File "/home/../.env/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1123, in get_dagrun
        dr = session.query(DagRun).filter(DagRun.dag_id == self.dag_id, DagRun.run_id == self.run_id).one()
      File "/home/../.env/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3500, in one
        raise orm_exc.NoResultFound("No row was found for one()")
    sqlalchemy.orm.exc.NoResultFound: No row was found for one()
    
    

    有什么建议吗?

    0 回复  |  直到 3 年前
        1
  •  1
  •   Elad Kalif    3 年前

    我假设您正在某种单元测试中使用它。您缺少的(正如错误所示)是一个DagRun:

    from airflow.models import DagRun
    DagRun(dag_id=self.dag.dag_id, execution_date=timezone.utcnow(), run_id="test")
    ti.dag_run = dag_run
    

    这是必需的,因为任务与DAG关联,而不是与DAG关联。 达格可以跑很多次。

    您可以在“气流”中的一个单元测试中看到示例 codebase .