代码之家  ›  专栏  ›  技术社区  ›  Chris B.

从Python中的生成器创建zip文件?

  •  25
  • Chris B.  · 技术社区  · 16 年前

    我有大量数据(几GB)需要用Python写入zip文件。我无法一次将其全部加载到内存中以传递给ZipFile的.writestr方法,而且我真的不想使用临时文件将其全部输出到磁盘,然后再将其读回。

    是否有方法将生成器或类似文件的对象馈送到ZipFile库?还是有什么原因导致这种功能似乎不受支持?

    我所说的zip文件,是指zip文件。如Python压缩文件包所支持的。

    13 回复  |  直到 16 年前
        1
  •  13
  •   lambacck    11 年前

    唯一的解决方案是重写它用于压缩文件以从缓冲区读取的方法。将其添加到标准库中是微不足道的;我有点惊讶它还没有完成。我想大家都同意整个界面需要彻底改革,这似乎阻碍了任何渐进式的改进。

    import zipfile, zlib, binascii, struct
    class BufferedZipFile(zipfile.ZipFile):
        def writebuffered(self, zipinfo, buffer):
            zinfo = zipinfo
    
            zinfo.file_size = file_size = 0
            zinfo.flag_bits = 0x00
            zinfo.header_offset = self.fp.tell()
    
            self._writecheck(zinfo)
            self._didModify = True
    
            zinfo.CRC = CRC = 0
            zinfo.compress_size = compress_size = 0
            self.fp.write(zinfo.FileHeader())
            if zinfo.compress_type == zipfile.ZIP_DEFLATED:
                cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
            else:
                cmpr = None
    
            while True:
                buf = buffer.read(1024 * 8)
                if not buf:
                    break
    
                file_size = file_size + len(buf)
                CRC = binascii.crc32(buf, CRC) & 0xffffffff
                if cmpr:
                    buf = cmpr.compress(buf)
                    compress_size = compress_size + len(buf)
    
                self.fp.write(buf)
    
            if cmpr:
                buf = cmpr.flush()
                compress_size = compress_size + len(buf)
                self.fp.write(buf)
                zinfo.compress_size = compress_size
            else:
                zinfo.compress_size = file_size
    
            zinfo.CRC = CRC
            zinfo.file_size = file_size
    
            position = self.fp.tell()
            self.fp.seek(zinfo.header_offset + 14, 0)
            self.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
            self.fp.seek(position, 0)
            self.filelist.append(zinfo)
            self.NameToInfo[zinfo.filename] = zinfo
    
        2
  •  9
  •   Community CDub    8 年前

    更改于 Python 3.5 (来自官方文件):已添加 support 写信给 无法辨认 溪流。

    这意味着现在 zipfile.ZipFile 我们可以使用不将整个文件存储在内存中的流。这样的溪流 do not support 在整个数据卷上移动。

    这是一个简单的生成器:

    from zipfile import ZipFile, ZipInfo
    
    def zipfile_generator(path, stream):
        with ZipFile(stream, mode='w') as zf:
            z_info = ZipInfo.from_file(path)
            with open(path, 'rb') as entry, zf.open(z_info, mode='w') as dest:
                for chunk in iter(lambda: entry.read(16384), b''):
                    dest.write(chunk)
                    # Yield chunk of the zip file stream in bytes.
                    yield stream.get()
        # ZipFile was closed.
        yield stream.get()
    

    path 是大文件或目录的字符串路径,或 pathlike 对象。

    stream 无法辨认 类的流实例如下(根据 official docs ):

    from io import RawIOBase
    
    class UnseekableStream(RawIOBase):
        def __init__(self):
            self._buffer = b''
    
        def writable(self):
            return True
    
        def write(self, b):
            if self.closed:
                raise ValueError('Stream was closed!')
            self._buffer += b
            return len(b)
    
        def get(self):
            chunk = self._buffer
            self._buffer = b''
            return chunk
    

    您可以在线尝试此代码: https://repl.it/@IvanErgunov/zipfilegenerator


    还有另一种方法可以创建一个没有 ZipInfo 手动读取和分割大文件。你可以通过 queue.Queue() 反对你的 UnseekableStream() 对象,并在另一个线程中写入此队列。然后,在当前线程中,您可以简单地以可迭代的方式从该队列中读取块。看见 docs

    备注 Python Zipstream by allanlei 这是一种过时且不可靠的方式。这是在正式完成之前,试图添加对不可访问流的支持。

        3
  •  7
  •   don_vanchos    6 年前

    我拿了 Chris B.'s answer 并创建了一个完整的解决方案。以下是其他人感兴趣的情况:

    import os
    import threading
    from zipfile import *
    import zlib, binascii, struct
    
    class ZipEntryWriter(threading.Thread):
        def __init__(self, zf, zinfo, fileobj):
            self.zf = zf
            self.zinfo = zinfo
            self.fileobj = fileobj
    
            zinfo.file_size = 0
            zinfo.flag_bits = 0x00
            zinfo.header_offset = zf.fp.tell()
    
            zf._writecheck(zinfo)
            zf._didModify = True
    
            zinfo.CRC = 0
            zinfo.compress_size = compress_size = 0
            zf.fp.write(zinfo.FileHeader())
    
            super(ZipEntryWriter, self).__init__()
    
        def run(self):
            zinfo = self.zinfo
            zf = self.zf
            file_size = 0
            CRC = 0
    
            if zinfo.compress_type == ZIP_DEFLATED:
                cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
            else:
                cmpr = None
            while True:
                buf = self.fileobj.read(1024 * 8)
                if not buf:
                    self.fileobj.close()
                    break
    
                file_size = file_size + len(buf)
                CRC = binascii.crc32(buf, CRC)
                if cmpr:
                    buf = cmpr.compress(buf)
                    compress_size = compress_size + len(buf)
    
                zf.fp.write(buf)
    
            if cmpr:
                buf = cmpr.flush()
                compress_size = compress_size + len(buf)
                zf.fp.write(buf)
                zinfo.compress_size = compress_size
            else:
                zinfo.compress_size = file_size
    
            zinfo.CRC = CRC
            zinfo.file_size = file_size
    
            position = zf.fp.tell()
            zf.fp.seek(zinfo.header_offset + 14, 0)
            zf.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
            zf.fp.seek(position, 0)
            zf.filelist.append(zinfo)
            zf.NameToInfo[zinfo.filename] = zinfo
    
    class EnhZipFile(ZipFile, object):
    
        def _current_writer(self):
            return hasattr(self, 'cur_writer') and self.cur_writer or None
    
        def assert_no_current_writer(self):
            cur_writer = self._current_writer()
            if cur_writer and cur_writer.isAlive():
                raise ValueError('An entry is already started for name: %s' % cur_write.zinfo.filename)
    
        def write(self, filename, arcname=None, compress_type=None):
            self.assert_no_current_writer()
            super(EnhZipFile, self).write(filename, arcname, compress_type)
    
        def writestr(self, zinfo_or_arcname, bytes):
            self.assert_no_current_writer()
            super(EnhZipFile, self).writestr(zinfo_or_arcname, bytes)
    
        def close(self):
            self.finish_entry()
            super(EnhZipFile, self).close()
    
        def start_entry(self, zipinfo):
            """
            Start writing a new entry with the specified ZipInfo and return a
            file like object. Any data written to the file like object is
            read by a background thread and written directly to the zip file.
            Make sure to close the returned file object, before closing the
            zipfile, or the close() would end up hanging indefinitely.
    
            Only one entry can be open at any time. If multiple entries need to
            be written, make sure to call finish_entry() before calling any of
            these methods:
            - start_entry
            - write
            - writestr
            It is not necessary to explicitly call finish_entry() before closing
            zipfile.
    
            Example:
                zf = EnhZipFile('tmp.zip', 'w')
                w = zf.start_entry(ZipInfo('t.txt'))
                w.write("some text")
                w.close()
                zf.close()
            """
            self.assert_no_current_writer()
            r, w = os.pipe()
            self.cur_writer = ZipEntryWriter(self, zipinfo, os.fdopen(r, 'r'))
            self.cur_writer.start()
            return os.fdopen(w, 'w')
    
        def finish_entry(self, timeout=None):
            """
            Ensure that the ZipEntry that is currently being written is finished.
            Joins on any background thread to exit. It is safe to call this method
            multiple times.
            """
            cur_writer = self._current_writer()
            if not cur_writer or not cur_writer.isAlive():
                return
            cur_writer.join(timeout)
    
    if __name__ == "__main__":
        zf = EnhZipFile('c:/tmp/t.zip', 'w')
        import time
        w = zf.start_entry(ZipInfo('t.txt', time.localtime()[:6]))
        w.write("Line1\n")
        w.write("Line2\n")
        w.close()
        zf.finish_entry()
        w = zf.start_entry(ZipInfo('p.txt', time.localtime()[:6]))
        w.write("Some text\n")
        w.close()
        zf.close()
    
        4
  •  3
  •   altunyurt    16 年前

    gzip。GzipFile以gzip压缩的块写入数据,您可以根据从文件读取的行数设置块的大小。

    例如:

    file = gzip.GzipFile('blah.gz', 'wb')
    sourcefile = open('source', 'rb')
    chunks = []
    for line in sourcefile:
      chunks.append(line)
      if len(chunks) >= X: 
          file.write("".join(chunks))
          file.flush()
          chunks = []
    
        5
  •  3
  •   S.Lott    16 年前

    基本的压缩是由zlib.compressobj.ZipFile完成的(在MacOSX上的Python 2.5下似乎是编译的)。Python 2.3版本如下。

    您可以看到,它将压缩文件构建为8k块。取出源文件信息很复杂,因为zip文件头中记录了许多源文件属性(如未压缩的大小)。

    def write(self, filename, arcname=None, compress_type=None):
        """Put the bytes from filename into the archive under the name
        arcname."""
    
        st = os.stat(filename)
        mtime = time.localtime(st.st_mtime)
        date_time = mtime[0:6]
        # Create ZipInfo instance to store file information
        if arcname is None:
            zinfo = ZipInfo(filename, date_time)
        else:
            zinfo = ZipInfo(arcname, date_time)
        zinfo.external_attr = st[0] << 16L      # Unix attributes
        if compress_type is None:
            zinfo.compress_type = self.compression
        else:
            zinfo.compress_type = compress_type
        self._writecheck(zinfo)
        fp = open(filename, "rb")
    
        zinfo.flag_bits = 0x00
        zinfo.header_offset = self.fp.tell()    # Start of header bytes
        # Must overwrite CRC and sizes with correct data later
        zinfo.CRC = CRC = 0
        zinfo.compress_size = compress_size = 0
        zinfo.file_size = file_size = 0
        self.fp.write(zinfo.FileHeader())
        zinfo.file_offset = self.fp.tell()      # Start of file bytes
        if zinfo.compress_type == ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
                 zlib.DEFLATED, -15)
        else:
            cmpr = None
        while 1:
            buf = fp.read(1024 * 8)
            if not buf:
                break
            file_size = file_size + len(buf)
            CRC = binascii.crc32(buf, CRC)
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size = compress_size + len(buf)
            self.fp.write(buf)
        fp.close()
        if cmpr:
            buf = cmpr.flush()
            compress_size = compress_size + len(buf)
            self.fp.write(buf)
            zinfo.compress_size = compress_size
        else:
            zinfo.compress_size = file_size
        zinfo.CRC = CRC
        zinfo.file_size = file_size
        # Seek backwards and write CRC and file sizes
        position = self.fp.tell()       # Preserve current position in file
        self.fp.seek(zinfo.header_offset + 14, 0)
        self.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size,
              zinfo.file_size))
        self.fp.seek(position, 0)
        self.filelist.append(zinfo)
        self.NameToInfo[zinfo.filename] = zinfo
    
        6
  •  1
  •   Oddthinking    16 年前

    一些(许多?大多数?)压缩算法基于查看整个系统的冗余 整个的 文件。

    一些压缩库会根据哪种压缩算法对文件最有效来在几种压缩算法之间进行选择。

    我相信ZipFile模块可以做到这一点,所以它希望看到整个文件,而不仅仅是一次查看一部分。

    因此,它不适用于无法在内存中加载的生成器或文件。这就解释了Zipfile库的局限性。

        7
  •  1
  •   jkitchen    8 年前

    如果有人偶然发现了这个问题,这个问题在2017年仍然适用于Python 2.7,这里有一个真正的流式zip文件的工作解决方案,不需要像其他情况一样寻找输出。秘诀是设置通用位标志的第3位(参见 https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT 第4.3.9.1节)。

    请注意,此实现将始终创建ZIP64样式的文件,允许流式处理任意大的文件。它包括一个丑陋的黑客,可以强制中央目录记录的zip64端,所以请注意,这将导致您的进程写入的所有zip文件都变成zip64风格。

    import io
    import zipfile
    import zlib
    import binascii
    import struct
    
    class ByteStreamer(io.BytesIO):
        '''
        Variant on BytesIO which lets you write and consume data while
        keeping track of the total filesize written. When data is consumed
        it is removed from memory, keeping the memory requirements low.
        '''
        def __init__(self):
            super(ByteStreamer, self).__init__()
            self._tellall = 0
    
        def tell(self):
            return self._tellall
    
        def write(self, b):
            orig_size = super(ByteStreamer, self).tell()
            super(ByteStreamer, self).write(b)
            new_size = super(ByteStreamer, self).tell()
            self._tellall += (new_size - orig_size)
    
        def consume(self):
            bytes = self.getvalue()
            self.seek(0)
            self.truncate(0)
            return bytes
    
    class BufferedZipFileWriter(zipfile.ZipFile):
        '''
        ZipFile writer with true streaming (input and output).
        Created zip files are always ZIP64-style because it is the only safe way to stream
        potentially large zip files without knowing the full size ahead of time.
    
        Example usage:
        >>> def stream():
        >>>     bzfw = BufferedZip64FileWriter()
        >>>     for arc_path, buffer in inputs:  # buffer is a file-like object which supports read(size)
        >>>         for chunk in bzfw.streambuffer(arc_path, buffer):
        >>>             yield chunk
        >>>     yield bzfw.close()
        '''
        def __init__(self, compression=zipfile.ZIP_DEFLATED):
            self._buffer = ByteStreamer()
            super(BufferedZipFileWriter, self).__init__(self._buffer, mode='w', compression=compression, allowZip64=True)
    
        def streambuffer(self, zinfo_or_arcname, buffer, chunksize=2**16):
            if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
                zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname,
                                        date_time=time.localtime(time.time())[:6])
                zinfo.compress_type = self.compression
                zinfo.external_attr = 0o600 << 16     # ?rw-------
            else:
                zinfo = zinfo_or_arcname
    
            zinfo.file_size = file_size = 0
            zinfo.flag_bits = 0x08  # Streaming mode: crc and size come after the data
            zinfo.header_offset = self.fp.tell()
    
            self._writecheck(zinfo)
            self._didModify = True
    
            zinfo.CRC = CRC = 0
            zinfo.compress_size = compress_size = 0
            self.fp.write(zinfo.FileHeader())
            if zinfo.compress_type == zipfile.ZIP_DEFLATED:
                cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
            else:
                cmpr = None
    
            while True:
                buf = buffer.read(chunksize)
                if not buf:
                    break
    
                file_size += len(buf)
                CRC = binascii.crc32(buf, CRC) & 0xffffffff
                if cmpr:
                    buf = cmpr.compress(buf)
                    compress_size += len(buf)
    
                self.fp.write(buf)
                compressed_bytes = self._buffer.consume()
                if compressed_bytes:
                    yield compressed_bytes
    
            if cmpr:
                buf = cmpr.flush()
                compress_size += len(buf)
                self.fp.write(buf)
                zinfo.compress_size = compress_size
                compressed_bytes = self._buffer.consume()
                if compressed_bytes:
                    yield compressed_bytes
            else:
                zinfo.compress_size = file_size
    
            zinfo.CRC = CRC
            zinfo.file_size = file_size
    
            # Write CRC and file sizes after the file data
            # Always write as zip64 -- only safe way to stream what might become a large zipfile
            fmt = '<LQQ'
            self.fp.write(struct.pack(fmt, zinfo.CRC, zinfo.compress_size, zinfo.file_size))
    
            self.fp.flush()
            self.filelist.append(zinfo)
            self.NameToInfo[zinfo.filename] = zinfo
            yield self._buffer.consume()
    
        # The close method needs to be patched to force writing a ZIP64 file
        # We'll hack ZIP_FILECOUNT_LIMIT to do the forcing
        def close(self):
            tmp = zipfile.ZIP_FILECOUNT_LIMIT
            zipfile.ZIP_FILECOUNT_LIMIT = 0
            super(BufferedZipFileWriter, self).close()
            zipfile.ZIP_FILECOUNT_LIMIT = tmp
            return self._buffer.consume()
    
        8
  •  0
  •   Oddthinking    16 年前

    gzip库将采用类似文件的对象进行压缩。

    class GzipFile([filename [,mode [,compresslevel [,fileobj]]]])
    

    您仍然需要提供一个标称文件名以包含在zip文件中,但您可以将数据源传递给fileobj。

    (这个答案与Damnsweet的答案不同,因为重点应该放在增量读取的数据源上,而不是增量写入的压缩文件上。)

    现在我看到原来的提问者不会接受Gzip:-(

        9
  •  0
  •   Belug    12 年前

    现在,使用python 2.7,您可以将数据添加到zip文件而不是文件中:

    http://docs.python.org/2/library/zipfile#zipfile.ZipFile.writestr

        10
  •  0
  •   Edwinner    8 年前

    现在是2017年。如果您仍然希望优雅地完成此操作,请使用 Python Zipstream by allanlei . 到目前为止,它可能是唯一一个写得很好的库。