代码之家  ›  专栏  ›  技术社区  ›  Eliethesaiyan

未调用apply_async回调函数

  •  3
  • Eliethesaiyan  · 技术社区  · 8 年前

    我是python的新手,我有一个函数,它为我的数据计算特性,然后返回一个应该在文件中处理和写入的列表。。。。我使用Pool进行计算,然后使用回调函数写入文件,但是回调函数没有被调用,我在其中放了一些打印语句,但它肯定没有被调用。 我的代码如下所示:

    def write_arrow_format(results):
    print("writer called")
    results[1].to_csv("../data/model_data/feature-"+results[2],sep='\t',encoding='utf-8')
    with open('../data/model_data/arow-'+results[2],'w') as f:
         for dic in results[0]:
             feature_list=[]
             print(dic)
             beginLine=True
             for key,value in dic.items():
                  if(beginLine):
                    feature_list.append(str(value))
                    beginLine=False
                  else:
                    feature_list.append(str(key)+":"+str(value))
             feature_line=" ".join(feature_list)
             f.write(feature_line+"\n")
    
    
    def generate_features(users,impressions,interactions,items,filename):
        #some processing 
        return [result1,result2,filename]
    
    
    
    
    
    if __name__=="__main__":
       pool=mp.Pool(mp.cpu_count()-1)
    
       for i in range(interval):
           if i==interval:
              pool.apply_async(generate_features,(users[begin:],impressions,interactions,items,str(i)),callback=write_arrow_format)
           else:
               pool.apply_async(generate_features,(users[begin:begin+interval],impressions,interactions,items,str(i)),callback=write_arrow_format)
               begin=begin+interval
       pool.close()
       pool.join()
    
    1 回复  |  直到 8 年前
        1
  •  6
  •   Mr. Frobenius    7 年前

    从您的帖子中看不出 generate_features 然而,如果有 result1 , result2 filename 是不可序列化的,那么由于某种原因,多处理库将不会调用回调函数,也不会以静默方式调用。我 认为 这是因为多处理库试图在子进程和父进程之间来回传递对象之前对对象进行pickle。如果返回的内容不是“可pickle”(即不可序列化),则不会调用回调。

    我自己也遇到过这个bug,它原来是一个给我带来麻烦的logger对象的实例。下面是一些复制我的问题的示例代码:

    import multiprocessing as mp
    import logging 
    
    def bad_test_func(ii):
        print('Calling bad function with arg %i'%ii)
        name = "file_%i.log"%ii
        logging.basicConfig(filename=name,level=logging.DEBUG)
        if ii < 4:
            log = logging.getLogger()
        else:
            log = "Test log %i"%ii
        return log
    
    def good_test_func(ii):
        print('Calling good function with arg %i'%ii)
        instance = ('hello', 'world', ii)
        return instance
    
    def pool_test(func):
        def callback(item):
            print('This is the callback')
            print('I have been given the following item: ')
            print(item)
        num_processes = 3
        pool = mp.Pool(processes = num_processes)
        results = []
        for i in range(5):
            res = pool.apply_async(func, (i,), callback=callback)
            results.append(res)
        pool.close()
        pool.join()
    
    def main():
    
        print('#'*30)
        print('Calling pool test with bad function')
        print('#'*30)
    
        pool_test(bad_test_func)
    
        print('#'*30)
        print('Calling pool test with good function')
        print('#'*30)
        pool_test(good_test_func)
    
    if __name__ == '__main__':
        main()
    

    希望这有帮助,并为您指明正确的方向。