代码之家  ›  专栏  ›  技术社区  ›  Cherry Wu

如何将pandas数据帧传递给气流任务

  •  0
  • Cherry Wu  · 技术社区  · 3 年前

    我正在学习如何使用气流来构建机器学习管道。

    但没有找到将一个任务生成的panda数据帧传递到另一个任务的方法。。。似乎需要将数据转换为JSON格式,还是在每个任务中将数据保存在数据库中?

    最后,我不得不把所有的东西放在一个任务中。。。 是否存在在气流任务之间传递数据帧的方法?

    这是我的代码:

    from datetime import datetime
    import pandas as pd
    import numpy as np
    import os
    
    import lightgbm as lgb
    from sklearn.model_selection import train_test_split
    from sklearn.model_selection import StratifiedKFold
    from sklearn.metrics import balanced_accuracy_score
    
    from airflow.decorators import dag, task
    from airflow.operators.python_operator import PythonOperator
    
    
    @dag(dag_id='super_mini_pipeline', schedule_interval=None, 
     start_date=datetime(2021, 11, 5), catchup=False, tags=['ml_pipeline'])
    def baseline_pipeline():
    
        def all_in_one(label):
            path_to_csv = os.path.join('~/airflow/data','leaf.csv') 
            df = pd.read_csv(path_to_csv)
    
            y = df[label]
            X = df.drop(label, axis=1)
    
            folds = StratifiedKFold(n_splits=5, shuffle=True, random_state=10)
            lgbm = lgb.LGBMClassifier(objective='multiclass', random_state=10)
            metrics_lst = []
    
            for train_idx, val_idx in folds.split(X, y):
                X_train, y_train = X.iloc[train_idx], y.iloc[train_idx]
                X_val, y_val = X.iloc[val_idx], y.iloc[val_idx]
            
                lgbm.fit(X_train, y_train)
                y_pred = lgbm.predict(X_val)
            
                cv_balanced_accuracy = balanced_accuracy_score(y_val, y_pred)
                metrics_lst.append(cv_balanced_accuracy)
        
            avg_performance = np.mean(metrics_lst)
    
            print(f"Avg Performance: {avg_performance}")
    
    
        all_in_one_task = PythonOperator(task_id='all_in_one_task', python_callable=all_in_one, op_kwargs={'label':'species'})
        all_in_one_task 
    
    
    # dag invocation
    pipeline_dag = baseline_pipeline()
    
    0 回复  |  直到 3 年前
        1
  •  11
  •   Talgat    3 年前

    尽管它被用于许多ETL任务,但Airflow并不是这类操作的正确选择,它适用于工作流而非数据流。但是有很多方法可以做到这一点,而无需在任务之间传递整个数据帧。

    您可以使用xcom.push和xcom.pull传递有关数据的信息:

    a.将第一个任务的结果保存在某个位置(json、csv等)

    b.传递到xcom.push有关已保存文件的信息。例如,文件名、路径。

    c.使用xcom.pull从其他任务中读取此文件名,并执行所需的操作。

    上面的所有内容都使用了一些数据库表:

    a.在task_1中,您可以从某个数据帧中的table_1下载数据,对其进行处理并保存在另一个table_2中(df.to_sql())。

    b.使用xcom.push传递表的名称。

    c.从另一个任务中,使用xcom.pull获取table_2,并使用df.read_sql()读取它。

    关于如何使用xcom的信息,您可以从气流示例中获得。 实例 https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial_etl_dag.py

    IMHO还有很多其他更好的方法,我刚刚写下了我尝试过的。

        2
  •  4
  •   Hamza Tahir    3 年前

    完全同意@Talgat的观点,即Airflow并不是为这个而建造的。它关注的是任务相关性,而不是数据相关性。

    也许你可以看看一个以数据为中心的管道解决方案,比如 ZenML 来解决这个问题?它有一个 guide 举例说明跨管道步骤传递Pandas数据帧。您还可以利用跨步骤的数据缓存和其他功能,使其更适合您正在做的事情。

    最重要的是,ZenML管道也是 deploy-able as an Airflow DAG 。因此,与其专注于自己编写工件逻辑的持久化,不如让ZenML来处理它。

    免责声明:我是ZenML的核心贡献者之一,所以这是公认的有偏见的。仍然认为这可能对OP有帮助!