代码之家  ›  专栏  ›  技术社区  ›  Daniel Severo

具有未来的Dask计算子图

  •  2
  • Daniel Severo  · 技术社区  · 7 年前

    1. 使用dask构建延迟dask图。袋子( def fakejob )
    2. 从1开始计算图形,并将其保存到parquet(去掉这一部分,只是一个动机)

    from dask.distributed import Client
    
    client = Client(processes=True)
    
    def fakejob(path):
        return (
            dask.bag
            .read_text(path)
            .to_dataframe()
        )
    
    futures = client.map(fakejob, [input_path1, input_path2])
    

    问题是我一直得到: AssertionError: daemonic processes are not allowed to have children

    我试过跟随 this link

    from dask.distributed import Client
    
    client = Client(processes=True)
    
    def fakejob(path):
        with dask.set_options(get=client.get):
            return (
                dask.bag
                .read_text(path)
                .to_dataframe()
            )
    
    futures = client.map(fakejob, [input_path1, input_path2])
    

    关于如何做到这一点,有什么线索吗?

    干杯

    1 回复  |  直到 7 年前
        1
  •  2
  •   mdurant    7 年前

    这个奇怪的、有点夸张的错误消息来自于试图在工作进程内构建dask图(这是一个包的含义),如果使用client.map调用,则会在这里结束。如果您可以将整个工作流程放在函数中,包括向parquet写入,并且没有尝试将包传递回调用方,那么您的第二次尝试将与本地客户一起工作。

    解决方案更简单。

    bags = [dask.bag.read_text(path)
            .to_dataframe() for path in [input_path1, input_path2])
    futures = client.compute(bags)   # run in background on the cluster
    client.gather(futures)   # wait and get results
    

    在这里 bags 是dask包的列表,即已定义但尚未运行的工作任务。您可以将最后两行替换为 dask.compute(*bags) 在不担心未来的情况下获得结果。