代码之家  ›  专栏  ›  技术社区  ›  morganics

删除Dask中的空分区

  •  9
  • morganics  · 技术社区  · 7 年前

    从CSV加载数据时,某些CSV无法加载,导致分区为空。我想删除所有空分区,因为有些方法似乎不能很好地处理空分区。我已尝试重新分区,其中(例如) repartition(npartitions=10) 工作正常,但大于此值仍可能导致空分区。

    实现这一目标的最佳方式是什么?谢谢

    4 回复  |  直到 7 年前
        1
  •  11
  •   tpegbert    6 年前

    我发现过滤Dask数据帧(例如按日期)通常会导致空分区。如果您在使用带有空分区的数据帧时遇到问题,根据MRocklin的指导,这里有一个函数可以剔除它们:

    def cull_empty_partitions(df):
        ll = list(df.map_partitions(len).compute())
        df_delayed = df.to_delayed()
        df_delayed_new = list()
        pempty = None
        for ix, n in enumerate(ll):
            if 0 == n:
                pempty = df.get_partition(ix)
            else:
                df_delayed_new.append(df_delayed[ix])
        if pempty is not None:
            df = dd.from_delayed(df_delayed_new, meta=pempty)
        return df
    
        2
  •  2
  •   Stuart Berg    5 年前

    对于处理行李(非数据帧)的任何人,此功能将实现以下功能:

    def cull_empty_partitions(bag):
        """
        When bags are created by filtering or grouping from a different bag,
        it retains the original bag's partition count, even if a lot of the
        partitions become empty.
        Those extra partitions add overhead, so it's nice to discard them.
        This function drops the empty partitions.
        """
        bag = bag.persist()
        def get_len(partition):
            # If the bag is the result of bag.filter(),
            # then each partition is actually a 'filter' object,
            # which has no __len__.
            # In that case, we must convert it to a list first.
            if hasattr(partition, '__len__'):
                return len(partition)
            return len(list(partition))
        partition_lengths = bag.map_partitions(get_len).compute()
    
        # Convert bag partitions into a list of 'delayed' objects
        lengths_and_partitions = zip(partition_lengths, bag.to_delayed())
    
        # Drop the ones with empty partitions
        partitions = (p for l,p in lengths_and_partitions if l > 0)
    
        # Convert from list of delayed objects back into a Bag.
        return dask.bag.from_delayed(partitions)
    
        3
  •  1
  •   MRocklin    7 年前

    没有简单的API可以做到这一点。你可以打电话 df.map_partitions(len) 要确定哪些分区是空的,然后显式删除它们,可以使用 df.to_delayed() dask.dataframe.from_delayed(...) .

    如果您将来发现一个函数不能很好地与空分区配合使用,如果您愿意提出问题,我们将不胜感激。 https://github.com/dask/dask/issues/new

        4
  •  1
  •   SultanOrazbayev    3 年前

    下面是我删除空分区的尝试:

    import numpy as np
    
    def remove_empty_partitions(ddf):
        """ remove empty partitions """
        partition_lens = ddf.map_partitions(len).compute()
        ids_of_empty_partitions = np.where(partition_lens==0)
        if len(ids_of_empty_partitions) == len(partition_lens):
            # all partitions are empty
            ddf_nonzero = ddf.partitions[0]
        elif len(ids_of_empty_partitions)>0:
            ddf_nonzero = dd.concat([
                ddf.get_partition(num_partition) for num_partition, partition in enumerate(ddf.partitions)
                if num_partition not in ids_of_empty_partitions
            ])
        return ddf_nonzero
    

    FWIW,@tpegbert的答案似乎在获取过滤数据帧所需的任务数量方面更有效。