代码之家  ›  专栏  ›  技术社区  ›  dm03514

使用python io从缓冲流组成行读取器

  •  1
  • dm03514  · 技术社区  · 7 年前

    我正在使用python boto与s3进行交互。s3上的文件是CSV文件,我想使用缓冲区从s3中读取行,以限制内存使用。

    我想知道是否有人有任何方法来编写python的io类来实现这一点?目标是拥有某种能够封装boto键的抽象,并提供 readline 或键上的迭代器接口(仅提供 read(size=0) 呼叫复杂性在于,由于它存储为CSV,因此每一行的长度都是可变的。

    目标是有一个抽象,我能够用它包装python boto键,然后实现迭代器协议,这样我就可以将它传递给csv阅读器,最后我自己实现了它。

    看起来像python io BufferedReader TextIOWrapper Key 但是 缓冲区读取 IOBase 对象

    然后,我围绕密钥实现了IOBase协议,但出现了unicode错误,通常不确定我在做什么。

    有人知道python io是否可以做类似于上面描述的事情吗??


    s

    提供 read(num_bytes)

    def yield_lines(keys_iterator):
       # had to custom implement this
       # any way using io??
       # yield each CSV row across keys that only provide `read()` method
    

    我最初的尝试是尝试让boto 坚持 IOBase公司 readinto .

    class IOCompatibleKey(object):
    
       def __init__(self, s3_key):
          self.s3_key = s3_key
    
       def readable(self):
          return True
    
       def writeable(self):
          return False
    
       def read(num_bytes):
          return self.s3_key.read(num_bytes)
    
       def readinto(n):
          # .... ?????
    
    buffered_reader = BufferedReader(IOCompatibleKey(s3_key))
    text_reader = TextIOWrapper(buffered_reader)
    for line in text_reader: # <- IS THIS POSSIBLE????
        print(line)
    
    1 回复  |  直到 7 年前
        1
  •  4
  •   Martijn Pieters    7 年前

    在Python 2中,您希望 避免 TextIOWrapper csv.reader() unicode 物体 TextIOWrapper 提供。

    IOBase 否则实现就足够简单:

    class IOCompatibleKey(object):    
        def __init__(self, s3_key):
            self.s3_key = s3_key
    
        def readable(self):
            return True
    
        def writeable(self):
            return False
    
        @property
        def closed(self):
            return self.s3_key.closed
    
        def close(self):
            self.s3_key.close()
    
        def read(self, num_bytes):
            return self.s3_key.read(num_bytes)
    
        def readinto(self, n):
            chunk = self.s3_key.read(len(n))
            read = len(chunk)
            n[:read] = chunk
            return read
    

    BufferedReader 使用Python 2时:

    buffered_reader = BufferedReader(IOCompatibleKey(s3_key))
    csv_reader = csv.reader(buffered_reader)
    for row in csv_reader:
        print(row)
    

    在Python 3上,只需添加一个 TextIOWrapper() BufferedReader()

    Python 2中的演示,使用模拟键:

    >>> import random, csv
    >>> from io import BufferedReader
    >>> class Key(object):
    ...     closed = False
    ...     def read(self, bytes=1024):
    ...         if random.random() < 0.2:
    ...             bytes = random.randrange(bytes)
    ...         return ''.join([random.choice('abcdefghijklmnopqrstuvwxyz \n,') for _ in range(bytes)])
    ...
    >>> s3_key = Key()
    >>> buffered_reader = BufferedReader(IOCompatibleKey(s3_key))
    >>> next(buffered_reader)   # produces a single \n terminated line
    'nffdahuitmdaktibxjsdgyhlyfm gurfyo,nt\n'
    >>> reader = csv.reader(buffered_reader)  # which satisfies csv.reader
    >>> next(reader)
    ['bi iydribq', 'u']
    >>> next(reader)
    ['qzxtbhkk se', 'v', 'b', 'nunyjemtkxaphuqmvgfrfjdloxwohqamdtvfqgddfna cjuzpaotccenxhhhgnvrbey']