代码之家  ›  专栏  ›  技术社区  ›  StatsNoob

Dask似乎在为全局变量共享内存,我认为这是不可能的

  •  0
  • StatsNoob  · 技术社区  · 3 年前

    以下是一些可复制的代码:

    1. 下面是带有全局变量的Dask:
    # method_file.py
    
    import dask
    import random
    
    class TestClass():
    
        def from_dataframe(self, pdf):
            global data
            data = pdf
    
        @staticmethod
        def work(elem):
            data.iloc[0, 0] = 9999
            return len(data) + elem
    
        def do(self):
            tasks = [dask.delayed(TestClass.work)(random.randint(1,500)) for x in range(10)]
            re = dask.compute(*tasks)
            return re
    
    # main_file.py
    
    from method_file import TestClass
    import numpy as np
    import pandas as pd 
    
    if __name__ == '__main__':
        ar = np.arange(500000000).reshape(5000000, 100)
        pdf = pd.DataFrame(ar)
    
        tc = TestClass()
        tc.from_dataframe(pdf)
        print(tc.do())
        print(pdf.head(3))
    
    python3 main_file.py
    

    这将产生:

    (5000117, 5000054, 5000304, 5000111, 5000010, 5000264, 5000201, 5000346, 5000486, 5000376)
         0    1    2    3    4    5    6   ...   93   94   95   96   97   98   99
    0  9999    1    2    3    4    5    6  ...   93   94   95   96   97   98   99
    1   100  101  102  103  104  105  106  ...  193  194  195  196  197  198  199
    2   200  201  202  203  204  205  206  ...  293  294  295  296  297  298  299
    
    [3 rows x 100 columns]
    
    

    这意味着 work 方法能够阅读 data 全局变量。不仅如此,它甚至使基因突变 pdf 变量我知道多处理 fork 我们也可以通过这种方式读取数据,如下所示。

    1. 这里是使用 启动方法。
    # method_file2.py
    
    from multiprocessing import Pool
    import multiprocessing
    import random
    
    class TestClass():
    
        def from_dataframe(self, pdf):
            global data
            data = pdf
    
        @staticmethod
        def work(elem):
            data.iloc[0, 0] = 9999
            return len(data) + elem
    
        def do(self):
    
            multiprocessing.set_start_method('fork')
    
            pool = Pool(6)
            procs = [pool.apply_async(TestClass.work, args=(random.randint(1,500), )) for i in range(1, 10)]
            re = [proc.get() for proc in procs]
            return re
    
    # main_file2.py
    
    from method_file2 import TestClass
    import numpy as np
    import pandas as pd 
    
    if __name__ == '__main__':
        ar = np.arange(500000000).reshape(5000000, 100)
        pdf = pd.DataFrame(ar)
    
        tc = TestClass()
        tc.from_dataframe(pdf)
        print(tc.do())
        print(pdf.head(3))
    
    python3 main_file2.py
    

    这将产生:

    [5000456, 5000346, 5000122, 5000120, 5000358, 5000067, 5000375, 5000444, 5000288]
        0    1    2    3    4    5    6   ...   93   94   95   96   97   98   99
    0    0    1    2    3    4    5    6  ...   93   94   95   96   97   98   99
    1  100  101  102  103  104  105  106  ...  193  194  195  196  197  198  199
    2  200  201  202  203  204  205  206  ...  293  294  295  296  297  298  299
    
    [3 rows x 100 columns]
    

    正如您所见,它可以读取,因为主进程的状态是复制的,但它不能改变对象( pdf ).

    为了简洁起见,我不会在这里包含更多的代码,但我也尝试过对 pdf 对象,我知道事实上Dask并没有对那个数据帧进行酸洗。Dask怎么可能以这种方式跨进程共享内存?

    0 回复  |  直到 3 年前
        1
  •  0
  •   mdurant    3 年前

    我没有看到你设置任何流程。Dask的默认调度程序使用线程池,因此所有任务都可以看到相同的变量。看见 https://docs.dask.org/en/latest/scheduler-overview.html