在Python 2中,您希望
避免
一
TextIOWrapper
csv.reader()
unicode
物体
TextIOWrapper
提供。
IOBase
否则实现就足够简单:
class IOCompatibleKey(object):
def __init__(self, s3_key):
self.s3_key = s3_key
def readable(self):
return True
def writeable(self):
return False
@property
def closed(self):
return self.s3_key.closed
def close(self):
self.s3_key.close()
def read(self, num_bytes):
return self.s3_key.read(num_bytes)
def readinto(self, n):
chunk = self.s3_key.read(len(n))
read = len(chunk)
n[:read] = chunk
return read
BufferedReader
使用Python 2时:
buffered_reader = BufferedReader(IOCompatibleKey(s3_key))
csv_reader = csv.reader(buffered_reader)
for row in csv_reader:
print(row)
在Python 3上,只需添加一个
TextIOWrapper()
BufferedReader()
Python 2中的演示,使用模拟键:
>>> import random, csv
>>> from io import BufferedReader
>>> class Key(object):
... closed = False
... def read(self, bytes=1024):
... if random.random() < 0.2:
... bytes = random.randrange(bytes)
... return ''.join([random.choice('abcdefghijklmnopqrstuvwxyz \n,') for _ in range(bytes)])
...
>>> s3_key = Key()
>>> buffered_reader = BufferedReader(IOCompatibleKey(s3_key))
>>> next(buffered_reader) # produces a single \n terminated line
'nffdahuitmdaktibxjsdgyhlyfm gurfyo,nt\n'
>>> reader = csv.reader(buffered_reader) # which satisfies csv.reader
>>> next(reader)
['bi iydribq', 'u']
>>> next(reader)
['qzxtbhkk se', 'v', 'b', 'nunyjemtkxaphuqmvgfrfjdloxwohqamdtvfqgddfna cjuzpaotccenxhhhgnvrbey']