代码之家  ›  专栏  ›  技术社区  ›  edesz

Dask数据帧过滤和重分区提供了一些空分区

  •  0
  • edesz  · 技术社区  · 4 年前

    我正在尝试过滤一个Dask DataFrame map_partitions 对每个分区应用一个函数。函数需要一个 数据帧 至少有一行。

    pandas 数据帧 (然后转换为Dask) )为了一个MCVE

    def create_data(n):
        df = pd.DataFrame(np.random.rand(6 * n), columns=["A"])
        random_integers = np.random.default_rng().choice(14, size=n, replace=False)
        df.insert(0, 'store_id', [d for s in random_integers for d in [s] * 6])
        return df
    
    df = create_data(n=10)
    print(df.head(15))
    >>>
        store_id         A
    0         10  0.850730
    1         10  0.581119
    2         10  0.825802
    3         10  0.657797
    4         10  0.291961
    5         10  0.864984
    6          9  0.161334
    7          9  0.397162
    8          9  0.089300
    9          9  0.435914
    10         9  0.750741
    11         9  0.920625
    12         3  0.635727
    13         3  0.425270
    14         3  0.904043
    

    数据结构:每个 store_id ,正好有6行。

    现在我创建一个 我想用它来过滤上面的数据

    filtered_store_ids = df["store_id"].value_counts().index[:6].tolist()
    print(filtered_store_ids)
    >>> [13, 12, 11, 10, 9, 7]
    

    然后将上述数据(a)转换为 数据帧 dask.dataframe

    ddf = dd.from_pandas(df, npartitions=10)
    

    现在我打印 ddf

    for p in range(ddf.npartitions):
        print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
    >>>
    Partition Index=0, Number of Rows=6
    Partition Index=1, Number of Rows=6
    Partition Index=2, Number of Rows=6
    Partition Index=3, Number of Rows=6
    Partition Index=4, Number of Rows=6
    Partition Index=5, Number of Rows=6
    Partition Index=6, Number of Rows=6
    Partition Index=7, Number of Rows=6
    Partition Index=8, Number of Rows=6
    Partition Index=9, Number of Rows=6
    

    存储\u id . 因此,每个分区都包含单个分区的数据 存储\u id .

    ddf = ddf[ddf["store_id"].isin(filtered_store_ids)]
    

    我再次打印过滤后的分区

    for p in range(ddf.npartitions):
        print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
    >>>
    Partition Index=0, Number of Rows=0
    Partition Index=1, Number of Rows=0
    Partition Index=2, Number of Rows=6
    Partition Index=3, Number of Rows=6
    Partition Index=4, Number of Rows=0
    Partition Index=5, Number of Rows=6
    Partition Index=6, Number of Rows=6
    Partition Index=7, Number of Rows=6
    Partition Index=8, Number of Rows=0
    Partition Index=9, Number of Rows=6
    

    这是预期的,因为每个分区都有一个 而且,通过过滤,一些分区将被完全过滤掉,因此它们将包含零行。

    Dataframe Dask DataFrame best practices

    ddf = ddf.repartition(npartitions=len(filtered_store_ids))
    print(ddf)
    >>>
    Dask DataFrame Structure:
                  store_id        A
    npartitions=6                  
    0                int64  float64
    6                  ...      ...
    ...                ...      ...
    48                 ...      ...
    59                 ...      ...
    Dask Name: repartition, 47 tasks
    

    但是 ,现在当我重新打印分区时,我得到了与前一个类似的输出(分区大小不均匀和一些空分区),就好像重新分区没有发生一样

    for p in range(ddf.npartitions):
        print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
    >>>
    Partition Index=0, Number of Rows=0
    Partition Index=1, Number of Rows=6
    Partition Index=2, Number of Rows=6
    Partition Index=3, Number of Rows=6
    Partition Index=4, Number of Rows=12
    Partition Index=5, Number of Rows=6
    

    s) 函数无法处理,因为它们缺少行。

    def myadd(df):
        assert df.shape[0] > 0
        ...
        return ...
    
    ddf.map_partitions(myadd)
    >>> AssertionError                            Traceback (most recent call last)
    .
    .
    .
    AssertionError: 
    

    重新分区的Dask文档是 well-explained (与我上面链接的最佳实践相同)看起来很简单,但是在重新分区之后,我仍然得到一些没有行和行的分区 映射分区 会在这里失败。我肯定我错过了什么。

    有几个关于重新划分的帖子( 1 , 2 )但它们不处理空分区。

    问题

    有没有办法确保在重新分区之后,所有分区都将有6行,并且没有空分区?i、 e.是否有可能重新划分Dask 数据帧

    编辑

    1 , 2 . 这些可能与我在这里遇到的问题有关。

    0 回复  |  直到 4 年前
        1
  •  1
  •   edesz    4 年前

    我用它们来解决这个问题。

    从问题中的原始代码开始(无需更改)

    .
    <identical code from question here>
    .
    ddf = ddf.repartition(npartitions=len(filtered_store_ids))
    

    ddf

    ddf = cull_empty_partitions(ddf)  # remove empties
    ddf = _rebalance_ddf(ddf)         # re-size
    

    当我现在重新打印分区大小时,所有分区大小都是相等的,没有一个是空的

    for p in range(ddf.npartitions):
        print(f"Partition Index={p}, Number of Rows={len(ddf.get_partition(p))}")
    >>>
    Partition Index=0, Number of Rows=6
    Partition Index=1, Number of Rows=6
    Partition Index=2, Number of Rows=6
    Partition Index=3, Number of Rows=6
    Partition Index=4, Number of Rows=6
    Partition Index=5, Number of Rows=6