代码之家  ›  专栏  ›  技术社区  ›  Ram Rachum

python:类似“map”的东西,在线程上工作[关闭]

  •  25
  • Ram Rachum  · 技术社区  · 14 年前

    我确信在标准库中有这样的东西,但似乎我错了。

    我有一堆我想要的网址 urlopen 并行地。我想要一个像内置的 map 函数,但工作是由一组线程并行完成的。

    有没有好的模块可以做到这一点?

    5 回复  |  直到 8 年前
        1
  •  39
  •   Scott Robinson    14 年前

    有一个 map 方法在 multiprocessing.Pool . 这需要多个过程。

    如果多个过程不是你的菜,你可以使用 multiprocessing.dummy 它使用线程。

    import urllib
    import multiprocessing.dummy
    
    p = multiprocessing.dummy.Pool(5)
    def f(post):
        return urllib.urlopen('http://stackoverflow.com/questions/%u' % post)
    
    print p.map(f, range(3329361, 3329361 + 5))
    
        2
  •  11
  •   Ram Rachum    14 年前

    有人建议我用 futures 用于此的包。我试过了,它好像起作用了。

    http://pypi.python.org/pypi/futures

    下面是一个例子:

    "Download many URLs in parallel."
    
    import functools
    import urllib.request
    import futures
    
    URLS = ['http://www.foxnews.com/',
            'http://www.cnn.com/',
            'http://europe.wsj.com/',
            'http://www.bbc.co.uk/',
            'http://some-made-up-domain.com/']
    
    def load_url(url, timeout):
        return urllib.request.urlopen(url, timeout=timeout).read()
    
    with futures.ThreadPoolExecutor(50) as executor:
       future_list = executor.run_to_futures(
               [functools.partial(load_url, url, 30) for url in URLS])
    
        3
  •  3
  •   cypheon    14 年前

    python模块 Queue 可能对你有帮助。使用一个线程 Queue.put() 只需简单地将所有URL推送到队列和工作线程中 get() 一个接一个的URL。

    Python Docs: queue — A synchronized queue class

        4
  •  3
  •   Rugnar    8 年前

    下面是线程映射的实现:

    from threading import Thread
    from queue import Queue
    
    def thread_map(f, iterable, pool=None):
        """
        Just like [f(x) for x in iterable] but each f(x) in a separate thread.
        :param f: f
        :param iterable: iterable
        :param pool: thread pool, infinite by default
        :return: list if results
        """
        res = {}
        if pool is None:
            def target(arg, num):
                try:
                    res[num] = f(arg)
                except:
                    res[num] = sys.exc_info()
    
            threads = [Thread(target=target, args=[arg, i]) for i, arg in enumerate(iterable)]
        else:
            class WorkerThread(Thread):
                def run(self):
                    while True:
                        try:
                            num, arg = queue.get(block=False)
                            try:
                                res[num] = f(arg)
                            except:
                                res[num] = sys.exc_info()
                        except Empty:
                            break
    
            queue = Queue()
            for i, arg in enumerate(iterable):
                queue.put((i, arg))
    
            threads = [WorkerThread() for _ in range(pool)]
    
        [t.start() for t in threads]
        [t.join() for t in threads]
        return [res[i] for i in range(len(res))]
    
        5
  •  0
  •   pillmuncher    14 年前

    我会用一个函数(未测试)来结束它:

    import itertools
    import threading
    import urllib2
    import Queue
    
    def openurl(url, queue):
        def starter():
            try:
                result = urllib2.urlopen(url)
            except Ecxeption, exc:
                def raiser():
                    raise exc
                queue.put((url, raiser))
            else:
                queue.put((url, lambda:result))
        threadind.Thread(target=starter).start()
    
    myurls = ... # the list of urls
    myqueue = Queue.Queue()
    
    map(openurl, myurls, itertools.repeat(myqueue))
    
    for each in myurls:
        url, getresult = queue.get()
        try:
            result = getresult()
        except Exception, exc:
            print 'exception raised:' + str(exc)
        else:
            # do stuff with result