代码之家  ›  专栏  ›  技术社区  ›  R zu

使用锁创建Dask延迟。错误:线程。本地没有执行状态

  •  1
  • R zu  · 技术社区  · 6 年前

    我想创建一个包含多个块的Dask数组。 每个块都来自一个读取文件的函数。 为了避免同时从硬盘读取多个文件,我遵循以下答案 here 用一把锁。

    但是创建交易会产生以下错误:

    AttributeError: '_thread._local' object has no attribute 'execution_state'
    

    测试:

    import numpy as np
    import dask
    import distributed
    
    def make_test_data():
        n = 2
        m = 3
        x = np.arange(n * m, dtype=np.int).reshape(n, m)
        np.save('0.npy', x)
        np.save('1.npy', x)
        shape = (n, m)
        return shape
    
    @dask.delayed
    def load_numpy(lock, fn):
        lock.acquire()
        out = np.load(fn)
        lock.release()
        return out
    
    def make_delayed():
        # np.load is a function that reads a file
        # and returns a numpy array.
        read_lock = distributed.Lock('numpy-read')
        return [load_numpy(read_lock, '%d.npy' % i) for i in range(2)]
    
    def main():
        shape = make_test_data()
        ds = make_delayed()
    
    main()
    

    完整错误消息:

    Traceback (most recent call last):
      File "<...>/site-packages/distributed/worker.py", line 2536, in get_worker
        return thread_state.execution_state['worker']
    AttributeError: '_thread._local' object has no attribute 'execution_state'
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "test_lock.py", line 32, in <module>
        main()
      File "test_lock.py", line 30, in main
        ds = make_delayed()
      File "test_lock.py", line 25, in make_delayed
        read_lock = distributed.Lock('numpy-read')
      File "<...>/site-packages/distributed/lock.py", line 92, in __init__
        self.client = client or _get_global_client() or get_worker().client
      File "<...>/site-packages/distributed/worker.py", line 2542, in get_worker
        raise ValueError("No workers found")
    ValueError: No workers found
    
    2 回复  |  直到 6 年前
        1
  •  1
  •   mdurant    6 年前

    试试这个吧

    @dask.delayed
    def load_numpy(fn):
        lock = distributed.Lock('numpy-read')
        lock.acquire()
        out = np.load(fn)
        lock.release()
        return out
    
    def make_delayed():
        # np.load is a function that reads a file
        # and returns a numpy array.
        read_lock = distributed.Lock('numpy-read')
        return [load_numpy('%d.npy' % i) for i in range(2)]
    
        2
  •  0
  •   R zu    6 年前

    我跟着 mdurant 这是一个基准:

    import numpy as np
    import dask
    from dask.distributed import Client, Lock
    import time
    
    @dask.delayed
    def locked_load(fn):
        lock = Lock('numpy-read')
        lock.acquire()
        out = np.load(fn)
        lock.release()
        return out
    
    
    @dask.delayed
    def unlocked_load(fn):
        return np.load(fn)
    
    
    def work(arr_size, n_parts, use_lock=True):
        if use_lock:
            f = locked_load
        else:
            f = unlocked_load
        x = np.arange(arr_size, dtype=np.int)
        for i in range(n_parts):
            np.save('%d.npy' % i, x)
        d = [f('%d.npy' % i) for i in range(n_parts)]
        return dask.compute(*d)
    
    
    def main():
        client = Client()
        with open("lock_time.txt", "a") as fh:
            n_parts_list = [20, 100]
            arr_size_list = [1_000_000, 5_000_000, 10_000_000]
            for n_part in n_parts_list:
                for arr_size in arr_size_list:
                    for use_lock in [True, False]:
                        st = time.time()
                        work(arr_size, n_part, use_lock)
                        en = time.time()
                        fh.write("%d %d %s %s\n" % (
                            n_part, arr_size, use_lock, str(en - st))
                        )
                        fh.flush()
        client.close()
    
    
    if __name__ == '__main__':
        main()
    

    结果(计算机内存为16 GB):

    +--------+----------+----------+----------+
    | n_part | arr_size | use_lock |   time   |
    +--------+----------+----------+----------+
    |     20 |  1000000 | True     |   0.97   |
    |     20 |  1000000 | False    |   0.89   |
    |     20 |  5000000 | True     |   7.52   |
    |     20 |  5000000 | False    |   6.80   |
    |     20 | 10000000 | True     |  16.70   |
    |     20 | 10000000 | False    |  15.78   |
    |    100 |  1000000 | True     |   3.76   |
    |    100 |  1000000 | False    |   6.88   |
    |    100 |  5000000 | True     |  43.22   |
    |    100 |  5000000 | False    |  38.96   |
    |    100 | 10000000 | True     | 291.34   | 
    |    100 | 10000000 | False    | 389.34   |
    +--------+----------+----------+----------+