代码之家  ›  专栏  ›  技术社区  ›  Mulgard

如何将大熊猫数据帧保存到hdfs?

  •  13
  • Mulgard  · 技术社区  · 7 年前

    我正在与熊猫和spark数据帧合作。数据帧总是非常大(>20 GB),标准的spark函数不足以满足这些大小。目前,我正在将我的pandas数据帧转换为spark数据帧,如下所示:

    dataframe = spark.createDataFrame(pandas_dataframe)  
    

    我这样做是因为spark很容易将数据帧写入hdfs:

    dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")
    

    但对于大于2 GB的数据帧,转换失败。 如果我将spark数据帧转换为pandas,我可以使用pyarrow:

    // temporary write spark dataframe to hdfs
    dataframe.write.parquet(path, mode="overwrite", compression="snappy")
    
    // open hdfs connection using pyarrow (pa)
    hdfs = pa.hdfs.connect("default", 0)
    // read parquet (pyarrow.parquet (pq))
    parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
    table = parquet.read(nthreads=4)
    // transform table to pandas
    pandas = table.to_pandas(nthreads=4)
    
    // delete temp files
    hdfs.delete(path, recursive=True)
    

    这是一个从spark到pandas的快速转换,它也适用于大于2 GB的数据帧。我还没有找到另一种方法。这意味着有一个熊猫数据帧,我在pyarrow的帮助下将其转换为spark。问题是我真的找不到如何将pandas数据帧写入hdfs。

    我的熊猫版本:0.19.0

    4 回复  |  直到 7 年前
        1
  •  21
  •   zero323 little_kid_pea    7 年前

    这意味着有一个熊猫数据帧,我在pyarrow的帮助下将其转换为spark。

    pyarrow.Table.fromPandas 您想要的功能是:

    Table.from_pandas(type cls, df, bool timestamps_to_ms=False, Schema schema=None, bool preserve_index=True)
    
    Convert pandas.DataFrame to an Arrow Table
    
    import pyarrow as pa
    
    pdf = ...  # type: pandas.core.frame.DataFrame
    adf = pa.Table.from_pandas(pdf)  # type: pyarrow.lib.Table
    

    结果可以直接写入拼花地板/HDF,而无需通过Spark传递数据:

    import pyarrow.parquet as pq
    
    fs  = pa.hdfs.connect()
    
    with fs.open(path, "wb") as fw
        pq.write_table(adf, fw)
    

    另请参见

    Spark注释 :

    此外,由于Spark 2.3(当前主)箭头在 createDataFrame ( SPARK-20791 - Use Apache Arrow to Improve Spark createDataFrame from Pandas.DataFrame ). 它 uses SparkContext.defaultParallelism to compute number of chunks 因此,您可以轻松控制各个批次的大小。

    最后 defaultParallelism 可以用于控制使用标准 _convert_from_pandas ,有效地将切片的大小减少到更易于管理的程度。

    不幸的是,这些不太可能解决您的问题 current memory problems . 两者都取决于 parallelize ,因此将所有数据存储在驱动程序节点的内存中。切换到箭头或调整配置只能加快进程或地址块大小限制。

    实际上,只要你使用当地的熊猫,我看没有任何理由在这里改用Spark DataFrame 作为输入。这种情况下最严重的瓶颈是驱动程序的网络I/O,分发数据无法解决这一问题。

        2
  •  1
  •   mikep    7 年前

    从…起 https://issues.apache.org/jira/browse/SPARK-6235

    支持并行化R数据。大于2GB的帧

    已解决。

    从…起 https://pandas.pydata.org/pandas-docs/stable/r_interface.html

    将数据帧转换为R对象

    您可以将pandas数据帧转换为R数据。框架

    因此,也许熊猫的转变->R->火花->hdfs?

        3
  •  1
  •   lego king    5 年前

    另一种方法是将pandas数据帧转换为spark数据帧(使用pyspark),并使用save命令将其保存到hdfs。 实例

        df = pd.read_csv("data/as/foo.csv")
        df[['Col1', 'Col2']] = df[['Col2', 'Col2']].astype(str)
        sc = SparkContext(conf=conf)
        sqlCtx = SQLContext(sc)
        sdf = sqlCtx.createDataFrame(df)
    
    

    在这里 astype 更改列的类型 object string . 这避免了spark无法识别熊猫类型而引发的异常 对象 . 但要确保这些列确实是string类型。

    现在要在hdfs中保存df:

        sdf.write.csv('mycsv.csv')
    
        4
  •  -1
  •   Edge7    7 年前

    一种黑客可能是从大数据帧中创建N个pandas数据帧(每个小于2 GB)(水平分区),并创建N个不同的spark数据帧,然后将它们合并(联合)以创建最后一个数据帧写入HDF。我假设你的主机器功能强大,但你也有一个可用的集群,在其中运行Spark。