代码之家  ›  专栏  ›  技术社区  ›  y2k-shubham

当通过WebUI触发时,气流DAG将以递归错误方式爆炸

  •  0
  • y2k-shubham  · 技术社区  · 5 年前
    • 我还在继续 LocalExecutor 因为我的所有任务都在远程机器上执行一些命令/命令(在同一台机器内没有实际计算繁重)

    经过几天的重写,我能够以上述方式(编程方式)生成我的DAG


    但是,在尝试通过UI触发DAG时(测试时,我设置 schedule_interval=None 因此,触发器是手动的),DAG爆炸(核弹文本图像)并有以下stacktrace(参见完整的stacktrace) here )可能是从 this place

      recurse_nodes(t, visited) for t in task.upstream_list
      File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/www/views.py", line 1483, in <listcomp>
        if node_count[0] < node_limit or t not in visited]
      File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/www/views.py", line 1478, in recurse_nodes
        visited.add(task)
      File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2304, in __hash__
        hash(val)
      File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2304, in __hash__
        hash(val)
      File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2304, in __hash__
        hash(val)
      [Previous line repeated 477 more times]
      File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2302, in __hash__
        val = getattr(self, c, None)
      File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2396, in dag_id
        if self.has_dag():
      File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/models/__init__.py", line 2392, in has_dag
        return getattr(self, '_dag', None) is not None
    RecursionError: maximum recursion depth exceeded while calling a Python object
    

    此外,当我重新加载webUI(或打开另一个浏览器选项卡)时,我可以看到DAG被卡住了 RUNNING 国家(全部) TaskInstance state / PID NULL

    enter image description here enter image description here


    有趣的是,一旦发生这种情况,直到我通过UI将DAG标记为失败,我开始在 /logs/scheduler/latest/<path/to/my/dag_script.py.log> (显然是从 here

    enter image description here

    [2019-12-24 06:27:32,851] {jobs.py:1446} INFO - Processing derived_tables_presto_cbot_events_1
    [2019-12-24 06:27:32,855] {jobs.py:921} INFO - Examining DAG run <DagRun derived_tables_presto_cbot_events_1 @ 2019-12-23 13:08:36.747641+00:00: manual__2019-12-23T13:08:36.747641+00:00, externally triggered: True>
    [2019-12-24 06:27:32,856] {jobs.py:410} ERROR - Got an exception! Propagating...
    Traceback (most recent call last):
      File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/jobs.py", line 402, in helper
        pickle_dags)
      File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
        return func(*args, **kwargs)
      File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/jobs.py", line 1760, in process_file
        self._process_dags(dagbag, dags, ti_keys_to_schedule)
      File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/jobs.py", line 1451, in _process_dags
        self._process_task_instances(dag, tis_out)
      File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
        return func(*args, **kwargs)
      File "/home/admin/.pyenv/versions/3.7.3/lib/python3.7/site-packages/airflow/jobs.py", line 930, in _process_task_instances
        if len(active_dag_runs) >= dag.max_active_runs:
    TypeError: '>=' not supported between instances of 'int' and 'NoneType'
    

    一旦我通过WebUI将Dag标记为失败,调度程序日志中的上述堆栈跟踪就会消失( DELETE FROM.. SQL查询)

    enter image description here enter image description here enter image description here


    我试过几件事都没有成功

    • 确定我的气流 本地执行器 部署仍然完好无损(旧工作流的DAG仍在正常运行)
    • 关闭并重新启动气流 scheduler & webserver
    • airflow initdb
    • 选中的 airflow.cfg 对任何差异进行归档

    • Python 3.7.3 (通过安装 PyEnv )
    • Airflow 1.10.3 具有 本地执行器
    • Linux ip-XXX-XX-XX-XX 4.9.0-8-amd64 #1 SMP Debian 4.9.130-2 (2018-10-27) x86_64 GNU/Linux
    0 回复  |  直到 5 年前
        1
  •  1
  •   y2k-shubham    5 年前

    同样正确 pointed out by @kaxil ,错误出现在一个自定义运算符中


    我不小心使用了操作符类中定义的方法 on_failure_callback self 那是什么 kill_task() 方法导致了 RecursionError DAG

    class MyCustomOperator(BaseOperator):
    
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.on_failure_callback = self.kill_task
        ..
        def kill_task(self) -> None:
            // do some cleanup work
    

    kill_任务() 类外的方法解决了该问题