代码之家  ›  专栏  ›  技术社区  ›  Andrew Cassidy

一个bigqueryoperator的模板化目标数据集表arg从另一个bigqueryoperator模板化

  •  2
  • Andrew Cassidy  · 技术社区  · 6 年前

    我正试图在ETL管道中链接一组bigquery-sql命令,其中一些输出和输入将被时间戳。

    from datetime import timedelta
    import airflow
    from airflow import DAG
    from airflow.contrib.operators.bigquery_operator import BigQueryOperator
    
    DAG_NAME = 'foo'
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': airflow.utils.dates.days_ago(7),
        'email': ['xxx@xxx.com'],
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=1),
    }
    
    dag = DAG(
        dag_id="blah",
        default_args=default_args,
        schedule_interval=None,
        template_searchpath=["/usr/local/airflow/dags/xxx/sql"])
    
    
    GOOGLE_PROJECT_ID = 'xxx'
    DATASET_ID = 'xxx'
    first_output = GOOGLE_PROJECT_ID + ":" + DATASET_ID + "." + "first_output_" + '{{ ds_nodash }}'
    second_output = GOOGLE_PROJECT_ID + ":" + DATASET_ID + "." + "second_output"
    GOOGLE_CLOUD_PLATFORM_CONNECTION_ID="google_cloud_default"
    
    
    first_op = BigQueryOperator(
        task_id='first_output',
        dag=dag,
        bigquery_conn_id=GOOGLE_CLOUD_PLATFORM_CONNECTION_ID,
        bql="XXX.sql",
        use_legacy_sql=True,
        allow_large_results=True,
        destination_dataset_table=first_output # {{ ds }} gets substituted because destination_dataset_table is a templated field
    )
    
    second_op = BigQueryOperator(
        task_id='second_op',
        dag=dag,
        bigquery_conn_id=GOOGLE_CLOUD_PLATFORM_CONNECTION_ID,
        bql="XXX_two.sql", # XXX_two.sql contains a {{ params.input_table }} reference
        params={'input_table': first_op.destination_dataset_table},
        use_legacy_sql=True,
        allow_large_results=True,
        destination_dataset_table=second_output
    
    )
    
    second_op.set_upstream(first_op)
    

    xxx_two.sql的内容:

    SELECT * FROM [{{ params.input_table }}
    

    测试方式:

    airflow test blah second_op  2015-06-01
    

    我目前的错误是(也在生产中)

    Exception: BigQuery job failed. Final error was: {'reason': 'invalid', 'location': BLAH, 'message': 'Invalid table name: xxx:xx.first_output_{{ ds_nodash }}'}. 
    

    如何访问操作员执行之外的模板字段?

    3 回复  |  直到 6 年前
        1
  •  2
  •   Mask    6 年前

    first_op.destination_dataset_table input_table render_templates first_op second_op

    SELECT * FROM xxx:xx.first_output_{{ ds_nodash }}
    

        BigQueryOperator(task_id='second_op',...,
                         bql='SELECT * FROM [{table}]'.format(table=first_op.destination_dataset_table)
    

    XXX_two.sql

    SELECT * FROM [{{ params.input_table }}_{{ ds_nodash }}]
    

    first_ouput = GOOGLE_PROJECT_ID + ":" + DATASET_ID + "." + "first_output"
    first_op = BigQueryOperator(...,destination_dataset_table= "{}_{{{{ ds_nodash }}}}".format(first_ouput))
    second_op = BigQueryOperator(..., params={'input_table': first_output},...
    
        2
  •  4
  •   tobi6    6 年前

    destination_dataset_table in the source code

    template_fields = ('bql', 'destination_dataset_table')
    

    first_output = "[{project}:{dataset}.first_output_{{{{ ds_nodash }}}}]".format(
        project=GOOGLE_PROJECT_ID,
        dataset=DATASET_ID)
    

    [my_project:my_dataset.first_output_{{ ds_nodash }}]
    

    ds_nodash

    [ ]

    first_op second_op params

    • first_output
    • 如果你正在从一个任务中提取字符串, 不会 获取呈现的字符串,但始终是原始模板字符串*如果不确保字段已被处理(如mask所述)
    • 帕拉姆 只是没有模板化,因此无法正确更新

    这些是我能想到的解决方案:

    • 衍生出你自己的 BigDataOperator 并添加 帕拉姆 到模板字段(如果有效,则为dict)
    • 或者延长 xxx_two.sql 这样它就不用了 params.input_table 而且 第一输出 . 既然你想 第一输出 要在模板中可用,必须首先将其添加到DAG参数 user_defined_macros .

    要了解有关这些解决方案的更多信息,请查看以下相关问题: Make custom Airflow macros expand other macros

        3
  •  2
  •   Simon D    6 年前

    您可以像您所做的那样从操作器外部引用宏,这是我的一些工作流程。

    是否尝试更改为:

    first_output = GOOGLE_PROJECT_ID + ":" + DATASET_ID + "." + "first_output_{{ ds_nodash }}"
    

    也许金贾不喜欢用不同的引号连接字符串?