我正在测试气流结构内部爬行器的执行情况。如果我运行下面的脚本,一切正常,有效载荷也会打印出来。
from airflow import DAG
from airflow.models import BaseOperator, TaskInstance
from hooks.crawler_hook import CrawlerHook
from datetime import datetime
import time
class CrawlerOperator(BaseOperator):
def __init__(self, conn_id=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
def execute(self):
hook = CrawlerHook(conn_id=self.conn_id)
print(hook.run())
if __name__ == "__main__":
CrawlerOperator(task_id='test_run').execute()
但当我试图在DAG中运行TaskInstance时,我遇到了一个错误,无法理解为什么:
if __name__ == "__main__":
with DAG(dag_id="DAG1", start_date=datetime.now(), catchup=False) as dag:
to = CrawlerOperator(task_id="test_run")
ti = TaskInstance(task=to)
ti.run()
错误是:
Traceback (most recent call last):
File "/home/../.env/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/home/../.env/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1123, in get_dagrun
dr = session.query(DagRun).filter(DagRun.dag_id == self.dag_id, DagRun.run_id == self.run_id).one()
File "/home/../.env/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3500, in one
raise orm_exc.NoResultFound("No row was found for one()")
sqlalchemy.orm.exc.NoResultFound: No row was found for one()
有什么建议吗?