代码之家  ›  专栏  ›  技术社区  ›  systempuntoout

谷歌应用引擎:如何使用TaskQueue或AsyncUrlFetch并行下载?

  •  2
  • systempuntoout  · 技术社区  · 14 年前

    我的GAE应用程序从第三方站点检索JSON数据;给定一个表示要下载的项目的ID,这个站点上的项目数据被组织成多个页面,因此我的代码必须逐页下载数据块,直到检索到最后一个可用页面的数据为止。
    我的简化代码如下:

    class FetchData(webapp.RequestHandler):
      def get(self):
        ...
        data_list = []
        page = 1
        while True:
          fetched_data= urlfetch.fetch('http://www.foo.com/getdata?id=xxx&result=JSON&page=%s' % page)
          data_chunk = fetched_data["data"] 
          data_list = data_list + data_chunk
          if len(data_list) == int(fetched_data["total_pages"]):
             break
          else:
             page = page +1 
        ...  
        doRender('dataview.htm',{'data_list':data_list} )
    

    这个 data_list 结果是一个有序的列表,其中第一个项目具有第1页的数据,最后一个项目具有最新页面的数据;此 数据列表 一旦检索到,将在视图中呈现。

    这种方法可以工作99%,但有时,由于 30秒 谷歌应用引擎的限制,对有很多页面的项目我感到恐惧 DeadlineExceededError . 我想知道是否使用 TaskQueue γ Deferred γ AsyncUrlfetch 我可以在某种程度上改进这种并行化算法。

    2 回复  |  直到 14 年前
        1
  •  1
  •   Matt Williamson    14 年前

    使用此: http://code.google.com/appengine/docs/python/urlfetch/asynchronousrequests.html

    这很简单,就像这样:

    def handle_result(rpc):
        result = rpc.get_result()
        # ... Do something with result...
    
    # Use a helper function to define the scope of the callback.
    def create_callback(rpc):
        return lambda: handle_result(rpc)
    
    rpcs = []
    for url in urls:
        rpc = urlfetch.create_rpc()
        rpc.callback = create_callback(rpc)
        urlfetch.make_fetch_call(rpc, url)
        rpcs.append(rpc)
    
    # ...
    
    # Finish all RPCs, and let callbacks process the results.
    for rpc in rpcs:
        rpc.wait()
    
        2
  •  0
  •   systempuntoout    14 年前

    我已经解决了这个问题:

    chunks_dict = {}
    
    def handle_result(rpc, page):
        result = rpc.get_result()
        chunks_dict[page] = result["data"]
    
    def create_callback(rpc, page):
        return lambda: handle_result(rpc, page)
    
    rpcs = []
    while True:
        rpc = urlfetch.create_rpc(deadline = 10)
        rpc.callback = create_callback(rpc, page)
        urlfetch.make_fetch_call(rpc, 'http://www.foo.com/getdata?id=xxx&result=JSON&page=%s' % page)
        rpcs.append(rpc)
        if page > total_pages:
           break
        else:
           page = page +1   
    for rpc in rpcs:
        rpc.wait()
    
    page_keys = chunks_dict.keys()
    page_keys.sort()
    for key in page_keys:
        data_list= data_list + chunks_dict[key]