1
11
尽管它被用于许多ETL任务,但Airflow并不是这类操作的正确选择,它适用于工作流而非数据流。但是有很多方法可以做到这一点,而无需在任务之间传递整个数据帧。 您可以使用xcom.push和xcom.pull传递有关数据的信息: a.将第一个任务的结果保存在某个位置(json、csv等) b.传递到xcom.push有关已保存文件的信息。例如,文件名、路径。 c.使用xcom.pull从其他任务中读取此文件名,并执行所需的操作。 或 上面的所有内容都使用了一些数据库表: a.在task_1中,您可以从某个数据帧中的table_1下载数据,对其进行处理并保存在另一个table_2中(df.to_sql())。 b.使用xcom.push传递表的名称。 c.从另一个任务中,使用xcom.pull获取table_2,并使用df.read_sql()读取它。 关于如何使用xcom的信息,您可以从气流示例中获得。 实例 https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial_etl_dag.py IMHO还有很多其他更好的方法,我刚刚写下了我尝试过的。 |
2
4
完全同意@Talgat的观点,即Airflow并不是为这个而建造的。它关注的是任务相关性,而不是数据相关性。 也许你可以看看一个以数据为中心的管道解决方案,比如 ZenML 来解决这个问题?它有一个 guide 举例说明跨管道步骤传递Pandas数据帧。您还可以利用跨步骤的数据缓存和其他功能,使其更适合您正在做的事情。 最重要的是,ZenML管道也是 deploy-able as an Airflow DAG 。因此,与其专注于自己编写工件逻辑的持久化,不如让ZenML来处理它。 免责声明:我是ZenML的核心贡献者之一,所以这是公认的有偏见的。仍然认为这可能对OP有帮助! |
bz_jf · CNN训练损失太不稳定了 2 年前 |
Bad Coder · 如何在Pyte中使用SMOTE? 2 年前 |
Sherwin R · 随机森林预测错误的输出形状 2 年前 |
Palkin Jangra · 如何迭代一列以获得每行的平均值? 2 年前 |