代码之家  ›  专栏  ›  技术社区  ›  Kyle Bridenstine

气流动态任务创建流设置不工作

  •  1
  • Kyle Bridenstine  · 技术社区  · 6 年前

    sources = ['source_1', 'source_2', 'source_3', 'source_4', 'source_5', 'source_6']
    
    for source in sources:
    
        source_task_1 = PythonOperator(
            task_id=source + '_create_emr',
            dag=dag,
            provide_context=True,
            retries=10,
            python_callable=execute_arl_source_emr_creation,
            op_args=[source])
    
        source_task_2 = BashOperator(
            task_id=source + '_starting_step_1',
            retries=10,
            bash_command='echo "Starting step 1 for ' + source + '"',
            dag=dag)
    
        source_task_2.set_upstream(source_task_1)
    

    所有任务都在DAG中成功创建,因为我可以在Airflow UI中看到它们,但奇怪的是,它只链接流中的任务,以便循环中第一次出现(source_1)。

    所有其他任务都没有上下游。我不明白这是怎么可能的,因为循环中的第一个事件起作用了,难道它们不都起作用了吗?

    这是实际代码

    def create_emr_step_3_subdag(main_dag, subdag_id, source):
        subdag = DAG('{0}.{1}'.format(main_dag.dag_id, subdag_id), default_args=args)
    
        source_run_emr_step_3 = PythonOperator(
            task_id=source.sourceFullName + '_run_emr_step_3',
            dag=subdag,
            provide_context=True,
            retries=0,
            python_callable=execute_emr_step_3,
            op_args=[source_create_emr_task_id, source.sourceFullName])
    
        source_run_emr_step_3_waiter = PythonOperator(
            task_id=source.sourceFullName + '_run_emr_step_3_waiter',
            dag=subdag,
            provide_context=True,
            retries=10,
            python_callable=execute_emr_step_3_waiter,
            op_args=[source_create_emr_task_id, source.sourceFullName])
    
        source_run_emr_step_3_waiter.set_upstream(source_run_emr_step_3)
    
        return subdag
    
    class DatalakeDigitalPlatformArlWorkflowSource:
        sourceShortName = None  # source_1, source_2, source_3, source_4, source_5, source_6
        sourceFullName = None  # SOURCE_1, SOURCE_2, SOURCE_3, SOURCE_4, SOURCE_5, SOURCE_6
    
        def getSourceShortName(self):
            return self.sourceShortName
    
        def setSourceShortName(self, sourceShortName):
            self.sourceShortName = sourceShortName
    
        def getSourceFulltName(self):
            return self.sourceFullName
    
        def setSourceFullName(self, sourceFullName):
            self.sourceFullName = sourceFullName
    
    source_1 = DatalakeDigitalPlatformArlWorkflowSource()
    source_1.setSourceShortName("source_1")
    source_1.setSourceFullName("SOURCE_1")
    
    source_2 = DatalakeDigitalPlatformArlWorkflowSource()
    source_2.setSourceShortName("source_2")
    source_2.setSourceFullName("HZN")
    
    source_3 = DatalakeDigitalPlatformArlWorkflowSource()
    source_3.setSourceShortName("source_3")
    source_3.setSourceFullName("SOURCE_3")
    
    source_4 = DatalakeDigitalPlatformArlWorkflowSource()
    source_4.setSourceShortName("source_4")
    source_4.setSourceFullName("SOURCE_4")
    
    source_5 = DatalakeDigitalPlatformArlWorkflowSource()
    source_5.setSourceShortName("source_5")
    source_5.setSourceFullName("PP")
    
    source_6 = DatalakeDigitalPlatformArlWorkflowSource()
    source_6.setSourceShortName("source_6")
    source_6.setSourceFullName("SOURCE_6")
    
    sources = [source_1, source_2, source_3, source_4, source_5, source_6]
    
    for source in sources:
        source_create_emr_task_id = source.sourceFullName + '_create_emr'
    
        source_create_emr = PythonOperator(
            task_id=source_create_emr_task_id,
            dag=dag,
            provide_context=True,
            retries=10,
            python_callable=execute_blah_source_emr_creation,
            op_args=[source_create_emr_task_id, source.sourceFullName])
    
        # source_starting_step_1
        source_starting_step_1 = BashOperator(
            task_id=source.sourceFullName + '_starting_step_1',
            retries=10,
            bash_command='echo "Starting step 1 for ' + source.sourceShortName + '"',
            dag=dag)
    
        # Get source Batch ID
        source_get_batch_id = PythonOperator(
            task_id=source.sourceFullName + '_get_batch_id',
            retries=10,
            dag=dag,
            python_callable=get_batch_id,
            op_args=[airflow_home + '/resources/batch-id-inputs/batchid_input.json', source.sourceFullName])
    
        # source_licappts
        source_sensor_licappts = OmegaFileSensor(
            task_id=source.sourceFullName + '_sensor_licappts',
            retries=10,
            filepath=airflow_home + '/foo/data/bar/blah/test/data',
            filepattern=source.sourceShortName + '_licappts_(.*).txt',
            poke_interval=3,
            execution_timeout=timedelta(hours=23),
            dag=dag)
        source_process_licappts = PythonOperator(
            task_id=source.sourceFullName + '_process_licappts',
            retries=10,
            dag=dag,
            python_callable=execute_d_landing_import,
            op_args=[source.sourceShortName + '_licappts_(.*).txt', 'get' + source.sourceFullName + 'BatchId'])
    
        # source_agents
        source_sensor_agents = OmegaFileSensor(
            task_id=source.sourceFullName + '_sensor_agents',
            retries=10,
            filepath=airflow_home + '/foo/data/bar/blah/test/data',
            filepattern=source.sourceShortName + '_agents_(.*).txt',
            poke_interval=3,
            dag=dag)
        source_process_agents = PythonOperator(
            task_id=source.sourceFullName + '_process_agents',
            retries=10,
            dag=dag,
            python_callable=execute_d_landing_import,
            op_args=[source.sourceShortName + '_agents_*.txt', 'get' + source.sourceFullName + 'BatchId'])
    
        # source_agentpolicy
        source_sensor_agentpolicy = OmegaFileSensor(
            task_id=source.sourceFullName + '_sensor_agentpolicy',
            retries=10,
            filepath=airflow_home + '/foo/data/bar/blah/test/data',
            filepattern=source.sourceShortName + '_agentpolicy_(.*).txt',
            poke_interval=3,
            dag=dag)
        source_process_agentpolicy = PythonOperator(
            task_id=source.sourceFullName + '_process_agentpolicy',
            retries=10,
            dag=dag,
            python_callable=execute_d_landing_import,
            op_args=[source.sourceShortName + '_agentpolicy_*.txt', 'get' + source.sourceFullName + 'BatchId'])
    
        # source_finished_step_1
        source_finished_step_1 = BashOperator(
            task_id=source.sourceFullName + '_finished_step_1',
            retries=10,
            bash_command='echo "Finished step 1 for ' + source.sourceShortName + '"',
            dag=dag)
    
        # source_starting_step_2
        source_starting_step_2 = BashOperator(
            task_id=source.sourceFullName + '_source_starting_step_2',
            retries=10,
            bash_command='echo "Starting step 2 for ' + source.sourceShortName + '"',
            dag=dag)
    
        source_run_emr_step_2 = PythonOperator(
            task_id=source.sourceFullName + '_run_emr_step_2',
            dag=dag,
            provide_context=True,
            retries=0,
            python_callable=execute_emr_step_2,
            op_args=[source_create_emr_task_id, source.sourceFullName])
    
        source_run_emr_step_2_waiter = PythonOperator(
            task_id=source.sourceFullName + '_run_emr_step_2_waiter',
            dag=dag,
            provide_context=True,
            retries=10,
            python_callable=execute_emr_step_2_waiter,
            op_args=[source_create_emr_task_id, source.sourceFullName])
    
        # source_elastic_search_check
        source_elastic_search_check = PythonOperator(
            task_id=source.sourceFullName + '_elastic_search_check',
            retries=10,
            dag=dag,
            python_callable=execute_get_advisor_batch_stage_status,
            op_args=['get' + source.sourceFullName + 'BatchId', source.sourceFullName])
    
        # source_finished_step_2
        source_finished_step_2 = BashOperator(
            task_id=source.sourceFullName + '_finished_step_2',
            retries=10,
            bash_command='echo "Finished step 2 for ' + source.sourceShortName + '"',
            dag=dag)
    
        # source_starting_step_3
        source_starting_step_3 = BashOperator(
            task_id=source.sourceFullName + '_starting_step_3',
            retries=10,
            bash_command='echo "Starting step 3 for ' + source.sourceShortName + '"',
            dag=dag)
    
        source_emr_step_3_subdag_task_id = source.sourceFullName + '_emr_step_3_subdag'
    
        source_emr_step_3_subdag = SubDagOperator(
            task_id=source_emr_step_3_subdag_task_id,
            dag=dag,
            retries=10,
            pool='entitymatching_task_pool',
            subdag=create_emr_step_3_subdag(dag, source_emr_step_3_subdag_task_id, source)
        )
    
        # source_finished_step_3
        source_finished_step_3 = BashOperator(
            task_id=source.sourceFullName + '_finished_step_3',
            retries=10,
            bash_command='echo "Finished step 3 for ' + source.sourceShortName + '"',
            dag=dag)
    
        # source_starting_step_4
        source_starting_step_4 = BashOperator(
            task_id=source.sourceFullName + '_starting_step_4',
            retries=10,
            bash_command='echo "Starting step 4 for ' + source.sourceShortName + '"',
            dag=dag)
    
        source_run_emr_step_4 = PythonOperator(
            task_id=source.sourceFullName + '_run_emr_step_4',
            dag=dag,
            provide_context=True,
            retries=0,
            python_callable=execute_emr_step_4,
            op_args=[source_create_emr_task_id, source.sourceFullName])
    
        source_run_emr_step_4_waiter = PythonOperator(
            task_id=source.sourceFullName + '_run_emr_step_4_waiter',
            dag=dag,
            provide_context=True,
            retries=10,
            python_callable=execute_emr_step_4_waiter,
            op_args=[source_create_emr_task_id, source.sourceFullName])
    
        # source_finished_step_4
        source_finished_step_4 = BashOperator(
            task_id=source.sourceFullName + '_finished_step_4',
            retries=10,
            bash_command='echo "Finished step 4 for ' + source.sourceShortName + '"',
            dag=dag)
    
        source_emr_termination = PythonOperator(
            task_id=source.sourceFullName + '_emr_termination',
            dag=dag,
            provide_context=True,
            retries=10,
            retry_delay=timedelta(minutes=5),
            python_callable=execute_emr_termination,
            op_args=[source_create_emr_task_id, source.sourceFullName])
    
        # source_successful
        source_successful = BashOperator(
            task_id=source.sourceFullName + '_successful',
            retries=10,
            bash_command='sudo aws sns publish blah blah blah',
            dag=dag)
    
        # finished_foo_bar_blah_workflow
        finished_foo_bar_blah_workflow = BashOperator(
            task_id='finished_foo_bar_blah_workflow',
            bash_command='echo "Finished foo_bar_blah_workflow"',
            dag=dag)
    
        ### Stream ###
    
        # Create EMR Cluster
        source_create_emr.set_upstream(starting_foo_bar_blah_workflow)
    
        # Step 1
        source_starting_step_1.set_upstream(starting_foo_bar_blah_workflow)
        source_get_batch_id.set_upstream(source_starting_step_1)
    
        source_sensor_licappts.set_upstream(source_get_batch_id)
        source_process_licappts.set_upstream(source_sensor_licappts)
    
        source_sensor_agents.set_upstream(source_get_batch_id)
        source_process_agents.set_upstream(source_sensor_agents)
    
        source_sensor_agentpolicy.set_upstream(source_get_batch_id)
        source_process_agentpolicy.set_upstream(source_sensor_agentpolicy)
    
        source_finished_step_1.set_upstream(source_process_licappts)
        source_finished_step_1.set_upstream(source_process_agents)
        source_finished_step_1.set_upstream(source_process_agentpolicy)
    
        # Step 2
        source_starting_step_2.set_upstream(source_finished_step_1)
        source_starting_step_2.set_upstream(source_create_emr)  # Don't run EMR steps until the EMR is created
        source_run_emr_step_2.set_upstream(source_starting_step_2)
        source_run_emr_step_2_waiter.set_upstream(source_run_emr_step_2)
        source_elastic_search_check.set_upstream(source_run_emr_step_2_waiter)
        source_finished_step_2.set_upstream(source_elastic_search_check)
    
        # Step 3
        source_starting_step_3.set_upstream(source_finished_step_2)
        source_emr_step_3_subdag.set_upstream(source_starting_step_3)
        source_finished_step_3.set_upstream(source_emr_step_3_subdag)
    
        # Step 4
        source_starting_step_4.set_upstream(source_finished_step_3)
        source_run_emr_step_4.set_upstream(source_starting_step_4)
        source_run_emr_step_4_waiter.set_upstream(source_run_emr_step_4)
        source_finished_step_4.set_upstream(source_run_emr_step_4_waiter)
    
        # Terminate EMR Cluster
        source_emr_termination.set_upstream(source_finished_step_4)
        source_successful.set_upstream(source_emr_termination)
        finished_foo_bar_blah_workflow.set_upstream(source_successful)
    

    正如你所看到的,流媒体不起作用

    enter image description here

    在我最近修改文件之前,它运行得很好,可以在这里看到

    enter image description here

    我刚刚对代码进行了大量的重构,当我重新加载代码时,我看到了这个错误。我不知道我做了什么,但是我做了很多find+replace all来重命名,我想知道我是否在这个过程中弄错了什么,也许我只是没有看到代码中的错误。但让我认为这不是问题的原因是,如果这是问题所在,那么为什么我的第一个源代码会正确地链接到它的流中?

    有没有可能我在一个DAG中就已经达到了任务量的某种限制?

    1 回复  |  直到 6 年前
        1
  •  1
  •   Christopher Beck    6 年前

    我想我发现了你的错误:

    首先,为了排除这不是气流错误,我创建了一个小DAG,它为7个源创建了25个任务,并设置了上游,一切正常。

    所以我拿了你的代码,试了一下,我遇到了和你看到的完全一样的问题。

    finished_foo_bar_blah_workflow.set_upstream(source_successful)
    

    所以我看了任务 finished_foo_bar_blah_workflow 据我所见,这个任务只需要创建一次,而不是为每个源创建一次。所以我把密码

    # finished_foo_bar_blah_workflow
    finished_foo_bar_blah_workflow = BashOperator(
        task_id='finished_foo_bar_blah_workflow',
        bash_command='echo "Finished foo_bar_blah_workflow"',
        dag=dag)
    

    for source in sources: 行,瞧,一切正常。

    编辑
    因此,我查看了上游和下游的列表,以查找应该直接位于 同时 完成的工作流程