代码之家  ›  专栏  ›  技术社区  ›  Jay

对Pyarrows的HdfsClient使用多处理

  •  2
  • Jay  · 技术社区  · 7 年前

    我有一个toplevel函数,它获取一个包含拼花文件路径和列名的元组。

    函数只从文件中加载列,将其转换为pandas,然后将其打包/序列化为标准表单。类似于:

    import pyarrow as pa
    import pyarrow.parquet as pq
    from multiprocessing import Pool
    
    def binarizer(file_data_tuple):
       ''' Read a Parquet column a file, binarize and return'''
    
       path, col_name, col_meta, native = file_data_tuple
       if not native: 
           # Either this or using a top level hdfs_con
           hdfs_con = pa.hdfs.connect(params)     
       read_pq = pq.read_table if native else hdfs_con.read_parquet
    
       arrow_col = read_pq(filepath, columns = (col_name,))
       bin_col = imported_binarizng_function(arrow_col)
       return bin_col
    
    def read_binarize_parallel(filepaths):
        ''' Setup parallel reading and binarizing of a parquet file'''
    
        # list of tuples containing the filepath, column name, meta, and mode   
        pool_params = [(),..] 
        pool = Pool()
        for file in filepaths:
            bin_cols = pool.map(binarizer, pool_params)
            chunk =  b''.join(bin_cols)
            send_over_socket(chunk)
    

    当我使用本机模式时,也就是从本地文件系统读取文件时,这就可以工作了。

    然而,如果我尝试阅读hdfs,我会(对我来说)出现奇怪的箭头错误,无论是在每个进程内部打开连接时,还是在尝试使用相同的连接时。以下是错误的压缩版本:

    [libprotobuf ERROR google/protobuf/message\u lite.cc:123]无法分析 “Hdfs.Internal.RpcResponseHeaderProto”类型的消息,因为它是 缺少必填字段:callId,status[libprotobuf ERROR google/protobuf/message\u lite。抄送:123]无法分析类型为的消息 “Hdfs.Internal.RpcResponseHeaderProto”,因为缺少必需的 字段:callId,status[libprotobuf ERROR google/protobuf/message\u lite。抄送:123]无法分析类型为的消息 “Hdfs.Internal.RpcResponseHeaderProto”,因为缺少必需的 字段:callId,status[libprotobuf ERROR google/protobuf/message\u lite。抄送:123]无法分析类型为的消息 “Hdfs.Internal.RpcResponseHeaderProto”,因为缺少必需的 字段:callId,状态2018-01-09 21:41:47.939006,p10007, th139965275871040,在上调用RPC调用“getFileInfo”失败 服务器“192.168.0.101:9000”:RPC通道。cpp:703:HdfsRpcException: 到“192.168.0.101:9000”的RPC通道获得协议不匹配:RPC通道 找不到挂起的调用:id=3。 @未知

    @   Unknown
    @   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
    @   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
    

    _对象*,箭头::io::HdfsPathInfo*) @\uu pyx\u pw\u 7pyarrow\u 3lib\u 16HadoopFileSystem\u 15isfile(\u object*,\u object*) @未知 @未知

    @   Unknown
    

    2018-01-09 21:41:47.939103,p10007,th139965275871040,信息重试 服务器“192.168.0.101:9000”上的幂等RPC调用“getFileInfo” 2018-01-09 21:41:47.939357,p10010,th139965275871040,错误失败 在服务器“192.168.0.101:9000”上调用RPC调用“getFileInfo”: RPC通道。cpp:780:HdfsRpcException:RPC通道到 “192.168.0.101:9000”获取的协议不匹配:RPC通道无法分析 响应标题。 @未知

    @未知
    @箭头::io::HadoopFileSystem::GetPathInfo(标准::字符串常量,箭头::io::HdfsPathInfo*)
    @\uu pyx\u f\u 7pyarrow\u 3lib\u 16HadoopFileSystem\uu path\u info(\uu pyx\u obj\u 7pyarrow\u 3lib\u HadoopFileSystem*,
    

    _对象*,箭头::io::HdfsPathInfo*) @\uu pyx\u pw\u 7pyarrow\u 3lib\u 16HadoopFileSystem\u 13isdir(\u object*,\u object*) @未知 @未知

    @   Unknown
    @2018-01-09 21:41:47.939406, p10008, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
    

    “192.168.0.101:9000”:RPC通道。cpp:780:HdfsRpcException:RPC “192.168.0.101:9000”的通道获得协议不匹配:RPC通道 无法分析响应标头。 @未知

    @未知
    @箭头::io::HadoopFileSystem::GetPathInfo(标准::字符串常量,箭头::io::HdfsPathInfo*)
    @\uu pyx\u f\u 7pyarrow\u 3lib\u 16HadoopFileSystem\uu path\u info(\uu pyx\u obj\u 7pyarrow\u 3lib\u HadoopFileSystem*,
    

    _对象*,箭头::io::HdfsPathInfo*) @\uu pyx\u pw\u 7pyarrow\u 3lib\u 16HadoopFileSystem\u 13isdir(\u object*,\u object*) @未知

    @   Unknown 2018-01-09 21:41:47.939422, p10013, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
    

    “192.168.0.101:9000”:RPC通道。cpp:780:HdfsRpcException:RPC “192.168.0.101:9000”的通道获得协议不匹配:RPC通道 无法分析响应标头。 @未知

    @未知
    @箭头::io::HadoopFileSystem::GetPathInfo(标准::字符串常量,箭头::io::HdfsPathInfo*)
    @\uu pyx\u f\u 7pyarrow\u 3lib\u 16HadoopFileSystem\uu path\u info(\uu pyx\u obj\u 7pyarrow\u 3lib\u HadoopFileSystem*,
    

    _对象*,箭头::io::HdfsPathInfo*) @\uu pyx\u pw\u 7pyarrow\u 3lib\u 16HadoopFileSystem\u 13isdir(\u object*,\u object*) @未知

    @   Unknown
    @2018-01-09 21:41:47.939431, p10009, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
    

    “192.168.0.101:9000”:RPC通道。cpp:780:HdfsRpcException:RPC “192.168.0.101:9000”的通道获得协议不匹配:RPC通道 无法分析响应标头。 @未知

    @未知
    @箭头::io::HadoopFileSystem::GetPathInfo(标准::字符串常量,箭头::io::HdfsPathInfo*)
    @\uu pyx\u f\u 7pyarrow\u 3lib\u 16HadoopFileSystem\uu path\u info(\uu pyx\u obj\u 7pyarrow\u 3lib\u HadoopFileSystem*,
    

    _对象*,箭头::io::HdfsPathInfo*) @\uu pyx\u pw\u 7pyarrow\u 3lib\u 16HadoopFileSystem\u 13isdir(\u object*,\u object*) @未知

    @   Unknown
    @   @   Unknown
    Unknown 2018-01-09 21:41:47.939457, p10012, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
    

    “192.168.0.101:9000”:RPC通道。cpp:780:HdfsRpcException:RPC “192.168.0.101:9000”的通道获得协议不匹配:RPC通道 无法分析响应标头。 @未知

    @未知
    @箭头::io::HadoopFileSystem::GetPathInfo(标准::字符串常量,箭头::io::HdfsPathInfo*)
    @\uu pyx\u f\u 7pyarrow\u 3lib\u 16HadoopFileSystem\uu path\u info(\uu pyx\u obj\u 7pyarrow\u 3lib\u HadoopFileSystem*,
    

    _对象*,箭头::io::HdfsPathInfo*) @\uu pyx\u pw\u 7pyarrow\u 3lib\u 16HadoopFileSystem\u 13isdir(\u object*,\u object*) @未知 @未知

    @   Unknown
    @   Unknown
    Unknown
    @   Unknown binarizing process filepath: /parquet_430mb/5e6.parquet
    @   Unknown
    Unknown
    @   Unknown
    
    @   Unknown
    
    
    @   Unknown
    

    2018-01-09 21:41:47.939854,p10010,th139965275871040,信息重试 服务器“192.168.0.101:9000”上的幂等RPC调用“getFileInfo”

    2018-01-09 21:41:47.939864,p10013,th139965275871040,信息重试 服务器“192.168.0.101:9000”上的幂等RPC调用“getFileInfo” 2018-01-09 21:41:47.939866,p10008,th139965275871040,信息重试 服务器“192.168.0.101:9000”上的幂等RPC调用“getFileInfo” 2018-01-09 21:41:47.939868,p10012,th139965275871040,信息重试 服务器“192.168.0.101:9000”上的幂等RPC调用“getFileInfo” 2018-01-09 21:41:47.939868,p10009,th139965275871040,信息重试 服务器“192.168.0.101:9000”上的幂等RPC调用“getFileInfo” 2018-01-09 21:41:47.940813,p10014,th139965275871040,错误失败 在服务器“192.168.0.101:9000”上调用RPC调用“getFileInfo”: RPC通道。cpp:780:HdfsRpcException:RPC通道到 “192.168.0.101:9000”获取的协议不匹配:RPC通道无法分析 响应标题。 @未知

    @未知
    @箭头::io::HadoopFileSystem::GetPathInfo(标准::字符串常量,箭头::io::HdfsPathInfo*)
    @\uu pyx\u f\u 7pyarrow\u 3lib\u 16HadoopFileSystem\uu path\u info(\uu pyx\u obj\u 7pyarrow\u 3lib\u HadoopFileSystem*,
    

    _对象*,箭头::io::HdfsPathInfo*) @\uu pyx\u pw\u 7pyarrow\u 3lib\u 16HadoopFileSystem\u 13isdir(\u object*,\u object*) @未知

    @未知
    

    2018-01-09 21:41:47.940937,p10014,th139965275871040,信息重试 服务器“192.168.0.101:9000”上的幂等RPC调用“getFileInfo” 2018-01-09 21:41:47.944352,p10011,th139965275871040,错误失败 在服务器“192.168.0.101:9000”上调用RPC调用“getFileInfo”: RPC通道。cpp:393:HdfsRpcException:调用RPC调用失败 服务器“192.168.0.101:9000”上的“getFileInfo” @未知 @未知

    @未知
    @箭头::io::HadoopFileSystem::GetPathInfo(标准::字符串常量,箭头::io::HdfsPathInfo*)
    @\uu pyx\u f\u 7pyarrow\u 3lib\u 16HadoopFileSystem\uu path\u info(\uu pyx\u obj\u 7pyarrow\u 3lib\u HadoopFileSystem*,
    

    _对象*,箭头::io::HdfsPathInfo*) @\uu pyx\u pw\u 7pyarrow\u 3lib\u 16HadoopFileSystem\u 13isdir(\u object*,\u object*) @未知

    @   Unknown Caused by TcpSocket.cpp: 127: HdfsNetworkException: Write 124 bytes failed to "192.168.0.101:9000": (errno: 32) Broken
    

    管 @未知 @未知

    @未知
    @箭头::io::HadoopFileSystem::GetPathInfo(标准::字符串常量,箭头::io::HdfsPathInfo*)
    @\uu pyx\u f\u 7pyarrow\u 3lib\u 16HadoopFileSystem\uu path\u info(\uu pyx\u obj\u 7pyarrow\u 3lib\u HadoopFileSystem*,
    

    _对象*,箭头::io::HdfsPathInfo*) @\uu pyx\u pw\u 7pyarrow\u 3lib\u 16HadoopFileSystem\u 13isdir(\u object*,\u object*) @未知 @未知

    @未知
    

    2018-01-09 21:41:47.944519,p10011,th139965275871040,信息重试 服务器“192.168.0.101:9000”上的幂等RPC调用“getFileInfo” ---------------------------------------------------------------------------箭头IO错误回溯(最近的调用 (最后)

    /住宅/拼花地板发送器。insert\U files\U parallel(自我)中的pyc 374#打印('372 sqparquet filepath:',filepath) 375 params\u with\u path\u and\u mode=[col\u params+(filepath,native)for col\u params in pool\u params] --&燃气轮机;376 bin\u col=自身。水塘映射(read\u binarize,params\u with\u path\u and\u mode) 377已获取('映射已完成') 378 num\u rows=bin\u col[0][2]

    /usr/lib/python2.7/multiprocessing/pool。映射中的pyc(self、func、, iterable,chunksize) 249 ''' 250断言自我_状态==运行 --&燃气轮机;251返回自我。map\u async(func、iterable、chunksize)。获取() 252 253 def imap(self、func、iterable、chunksize=1):

    /usr/lib/python2.7/multiprocessing/pool。获取中的pyc(自我,超时) 556返回自我_价值 557其他: --&燃气轮机;558提升自我_价值 559 560 def \u设置(自身、i、obj):

    ArrowIOError:HDFS:GetPathInfo失败

    我很高兴能得到关于这个错误原因的任何反馈,以及我应该如何使用平行拼花地板加载。

    1 回复  |  直到 7 年前
        1
  •  14
  •   Wes McKinney    7 年前

    这是一个与多处理序列化详细信息相关的错误。我在这里打开了一个bug报告 https://issues.apache.org/jira/browse/ARROW-1986