代码之家  ›  专栏  ›  技术社区  ›  Chris B.

从Python中的生成器创建zip文件?

  •  19
  • Chris B.  · 技术社区  · 16 年前

    我需要用Python将大量数据(几gig)写入zip文件。我不能一次将其全部加载到内存中以传递给ZipFile的.writest方法,而且我真的不想使用临时文件将其全部输出到磁盘,然后再将其读回。

    我所说的zip文件是指zip文件。正如Python zipfile包中支持的那样。

    9 回复  |  直到 16 年前
        1
  •  13
  •   lambacck    10 年前

    唯一的解决方案是重写用于压缩文件以从缓冲区读取的方法。将其添加到标准库中是很简单的;我有点惊讶它还没有完成。我想大家都同意整个界面需要大修,这似乎阻碍了任何增量改进。

    import zipfile, zlib, binascii, struct
    class BufferedZipFile(zipfile.ZipFile):
        def writebuffered(self, zipinfo, buffer):
            zinfo = zipinfo
    
            zinfo.file_size = file_size = 0
            zinfo.flag_bits = 0x00
            zinfo.header_offset = self.fp.tell()
    
            self._writecheck(zinfo)
            self._didModify = True
    
            zinfo.CRC = CRC = 0
            zinfo.compress_size = compress_size = 0
            self.fp.write(zinfo.FileHeader())
            if zinfo.compress_type == zipfile.ZIP_DEFLATED:
                cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
            else:
                cmpr = None
    
            while True:
                buf = buffer.read(1024 * 8)
                if not buf:
                    break
    
                file_size = file_size + len(buf)
                CRC = binascii.crc32(buf, CRC) & 0xffffffff
                if cmpr:
                    buf = cmpr.compress(buf)
                    compress_size = compress_size + len(buf)
    
                self.fp.write(buf)
    
            if cmpr:
                buf = cmpr.flush()
                compress_size = compress_size + len(buf)
                self.fp.write(buf)
                zinfo.compress_size = compress_size
            else:
                zinfo.compress_size = file_size
    
            zinfo.CRC = CRC
            zinfo.file_size = file_size
    
            position = self.fp.tell()
            self.fp.seek(zinfo.header_offset + 14, 0)
            self.fp.write(struct.pack("<LLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
            self.fp.seek(position, 0)
            self.filelist.append(zinfo)
            self.NameToInfo[zinfo.filename] = zinfo
    
        2
  •  9
  •   Community CDub    7 年前

    Chris B.'s answer 并创建了一个完整的解决方案。如果其他人对此感兴趣,请参阅:

    import os
    import threading
    from zipfile import *
    import zlib, binascii, struct
    
    class ZipEntryWriter(threading.Thread):
        def __init__(self, zf, zinfo, fileobj):
            self.zf = zf
            self.zinfo = zinfo
            self.fileobj = fileobj
    
            zinfo.file_size = 0
            zinfo.flag_bits = 0x00
            zinfo.header_offset = zf.fp.tell()
    
            zf._writecheck(zinfo)
            zf._didModify = True
    
            zinfo.CRC = 0
            zinfo.compress_size = compress_size = 0
            zf.fp.write(zinfo.FileHeader())
    
            super(ZipEntryWriter, self).__init__()
    
        def run(self):
            zinfo = self.zinfo
            zf = self.zf
            file_size = 0
            CRC = 0
    
            if zinfo.compress_type == ZIP_DEFLATED:
                cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
            else:
                cmpr = None
            while True:
                buf = self.fileobj.read(1024 * 8)
                if not buf:
                    self.fileobj.close()
                    break
    
                file_size = file_size + len(buf)
                CRC = binascii.crc32(buf, CRC)
                if cmpr:
                    buf = cmpr.compress(buf)
                    compress_size = compress_size + len(buf)
    
                zf.fp.write(buf)
    
            if cmpr:
                buf = cmpr.flush()
                compress_size = compress_size + len(buf)
                zf.fp.write(buf)
                zinfo.compress_size = compress_size
            else:
                zinfo.compress_size = file_size
    
            zinfo.CRC = CRC
            zinfo.file_size = file_size
    
            position = zf.fp.tell()
            zf.fp.seek(zinfo.header_offset + 14, 0)
            zf.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size, zinfo.file_size))
            zf.fp.seek(position, 0)
            zf.filelist.append(zinfo)
            zf.NameToInfo[zinfo.filename] = zinfo
    
    class EnhZipFile(ZipFile, object):
    
        def _current_writer(self):
            return hasattr(self, 'cur_writer') and self.cur_writer or None
    
        def assert_no_current_writer(self):
            cur_writer = self._current_writer()
            if cur_writer and cur_writer.isAlive():
                raise ValueError('An entry is already started for name: %s' % cur_write.zinfo.filename)
    
        def write(self, filename, arcname=None, compress_type=None):
            self.assert_no_current_writer()
            super(EnhZipFile, self).write(filename, arcname, compress_type)
    
        def writestr(self, zinfo_or_arcname, bytes):
            self.assert_no_current_writer()
            super(EnhZipFile, self).writestr(zinfo_or_arcname, bytes)
    
        def close(self):
            self.finish_entry()
            super(EnhZipFile, self).close()
    
        def start_entry(self, zipinfo):
            """
            Start writing a new entry with the specified ZipInfo and return a
            file like object. Any data written to the file like object is
            read by a background thread and written directly to the zip file.
            Make sure to close the returned file object, before closing the
            zipfile, or the close() would end up hanging indefinitely.
    
            Only one entry can be open at any time. If multiple entries need to
            be written, make sure to call finish_entry() before calling any of
            these methods:
            - start_entry
            - write
            - writestr
            It is not necessary to explicitly call finish_entry() before closing
            zipfile.
    
            Example:
                zf = EnhZipFile('tmp.zip', 'w')
                w = zf.start_entry(ZipInfo('t.txt'))
                w.write("some text")
                w.close()
                zf.close()
            """
            self.assert_no_current_writer()
            r, w = os.pipe()
            self.cur_writer = ZipEntryWriter(self, zipinfo, os.fdopen(r, 'r'))
            self.cur_writer.start()
            return os.fdopen(w, 'w')
    
        def finish_entry(self, timeout=None):
            """
            Ensure that the ZipEntry that is currently being written is finished.
            Joins on any background thread to exit. It is safe to call this method
            multiple times.
            """
            cur_writer = self._current_writer()
            if not cur_writer or not cur_writer.isAlive():
                return
            cur_writer.join(timeout)
    
    if __name__ == "__main__":
        zf = EnhZipFile('c:/tmp/t.zip', 'w')
        import time
        w = zf.start_entry(ZipInfo('t.txt', time.localtime()[:6]))
        w.write("Line1\n")
        w.write("Line2\n")
        w.close()
        zf.finish_entry()
        w = zf.start_entry(ZipInfo('p.txt', time.localtime()[:6]))
        w.write("Some text\n")
        w.close()
        zf.close()
    
        3
  •  7
  •   don_vanchos    6 年前

    变更 Python 3.5 support 写信给 溪流。

    这意味着现在 zipfile.ZipFile 我们可以使用不将整个文件存储在内存中的流。这样的溪流 do not support 在整个数据卷上移动。

    from zipfile import ZipFile, ZipInfo
    
    def zipfile_generator(path, stream):
        with ZipFile(stream, mode='w') as zf:
            z_info = ZipInfo.from_file(path)
            with open(path, 'rb') as entry, zf.open(z_info, mode='w') as dest:
                for chunk in iter(lambda: entry.read(16384), b''):
                    dest.write(chunk)
                    # Yield chunk of the zip file stream in bytes.
                    yield stream.get()
        # ZipFile was closed.
        yield stream.get()
    

    path 是大文件或目录的字符串路径,或 pathlike

    stream 类的流实例(根据 official docs ):

    from io import RawIOBase
    
    class UnseekableStream(RawIOBase):
        def __init__(self):
            self._buffer = b''
    
        def writable(self):
            return True
    
        def write(self, b):
            if self.closed:
                raise ValueError('Stream was closed!')
            self._buffer += b
            return len(b)
    
        def get(self):
            chunk = self._buffer
            self._buffer = b''
            return chunk
    

    您可以联机尝试此代码: https://repl.it/@IvanErgunov/zipfilegenerator


    还有另一种方法可以创建不带 ZipInfo 手动读取和分割大文件。你可以通过考试 queue.Queue() 反对你的 UnseekableStream() 对象并在另一个线程中写入此队列。然后在当前线程中,您可以简单地以可编辑的方式从这个队列中读取块。看见 docs

    附笔。 Python Zipstream by allanlei 这是一种过时且不可靠的方法。这是一次尝试,以增加对不可见流的支持,然后才正式完成。

        4
  •  3
  •   altunyurt    16 年前

    gzip.gzip文件将数据写入gzip块中,您可以根据从文件中读取的行数设置块的大小。

    例如:

    file = gzip.GzipFile('blah.gz', 'wb')
    sourcefile = open('source', 'rb')
    chunks = []
    for line in sourcefile:
      chunks.append(line)
      if len(chunks) >= X: 
          file.write("".join(chunks))
          file.flush()
          chunks = []
    
        5
  •  3
  •   S.Lott    16 年前

    基本压缩由zlib.compressobj完成。ZipFile(在MacOSX上的Python2.5下,似乎要编译)。Python2.3版本如下所示。

    您可以看到,它以8k块构建压缩文件。提取源文件信息非常复杂,因为zip文件头中记录了许多源文件属性(如未压缩的大小)。

    def write(self, filename, arcname=None, compress_type=None):
        """Put the bytes from filename into the archive under the name
        arcname."""
    
        st = os.stat(filename)
        mtime = time.localtime(st.st_mtime)
        date_time = mtime[0:6]
        # Create ZipInfo instance to store file information
        if arcname is None:
            zinfo = ZipInfo(filename, date_time)
        else:
            zinfo = ZipInfo(arcname, date_time)
        zinfo.external_attr = st[0] << 16L      # Unix attributes
        if compress_type is None:
            zinfo.compress_type = self.compression
        else:
            zinfo.compress_type = compress_type
        self._writecheck(zinfo)
        fp = open(filename, "rb")
    
        zinfo.flag_bits = 0x00
        zinfo.header_offset = self.fp.tell()    # Start of header bytes
        # Must overwrite CRC and sizes with correct data later
        zinfo.CRC = CRC = 0
        zinfo.compress_size = compress_size = 0
        zinfo.file_size = file_size = 0
        self.fp.write(zinfo.FileHeader())
        zinfo.file_offset = self.fp.tell()      # Start of file bytes
        if zinfo.compress_type == ZIP_DEFLATED:
            cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
                 zlib.DEFLATED, -15)
        else:
            cmpr = None
        while 1:
            buf = fp.read(1024 * 8)
            if not buf:
                break
            file_size = file_size + len(buf)
            CRC = binascii.crc32(buf, CRC)
            if cmpr:
                buf = cmpr.compress(buf)
                compress_size = compress_size + len(buf)
            self.fp.write(buf)
        fp.close()
        if cmpr:
            buf = cmpr.flush()
            compress_size = compress_size + len(buf)
            self.fp.write(buf)
            zinfo.compress_size = compress_size
        else:
            zinfo.compress_size = file_size
        zinfo.CRC = CRC
        zinfo.file_size = file_size
        # Seek backwards and write CRC and file sizes
        position = self.fp.tell()       # Preserve current position in file
        self.fp.seek(zinfo.header_offset + 14, 0)
        self.fp.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size,
              zinfo.file_size))
        self.fp.seek(position, 0)
        self.filelist.append(zinfo)
        self.NameToInfo[zinfo.filename] = zinfo
    
        6
  •  1
  •   Oddthinking    16 年前

    一些(许多?大多数?)压缩算法是基于查看整个系统的冗余 全部的

    一些压缩库会根据对文件最有效的压缩算法在几种压缩算法之间进行选择。

    因此,它不会与生成器或文件一起工作,因为它们太大而无法加载到内存中。这就解释了Zipfile库的局限性。

        7
  •  1
  •   jkitchen    7 年前

    如果有人偶然发现这个问题,这在2017年仍然与Python 2.7相关,这里有一个真正的流式zip文件的工作解决方案,不需要像其他情况那样要求输出是可查找的。秘密在于设置通用位标志的第3位(参见 https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT

    import io
    import zipfile
    import zlib
    import binascii
    import struct
    
    class ByteStreamer(io.BytesIO):
        '''
        Variant on BytesIO which lets you write and consume data while
        keeping track of the total filesize written. When data is consumed
        it is removed from memory, keeping the memory requirements low.
        '''
        def __init__(self):
            super(ByteStreamer, self).__init__()
            self._tellall = 0
    
        def tell(self):
            return self._tellall
    
        def write(self, b):
            orig_size = super(ByteStreamer, self).tell()
            super(ByteStreamer, self).write(b)
            new_size = super(ByteStreamer, self).tell()
            self._tellall += (new_size - orig_size)
    
        def consume(self):
            bytes = self.getvalue()
            self.seek(0)
            self.truncate(0)
            return bytes
    
    class BufferedZipFileWriter(zipfile.ZipFile):
        '''
        ZipFile writer with true streaming (input and output).
        Created zip files are always ZIP64-style because it is the only safe way to stream
        potentially large zip files without knowing the full size ahead of time.
    
        Example usage:
        >>> def stream():
        >>>     bzfw = BufferedZip64FileWriter()
        >>>     for arc_path, buffer in inputs:  # buffer is a file-like object which supports read(size)
        >>>         for chunk in bzfw.streambuffer(arc_path, buffer):
        >>>             yield chunk
        >>>     yield bzfw.close()
        '''
        def __init__(self, compression=zipfile.ZIP_DEFLATED):
            self._buffer = ByteStreamer()
            super(BufferedZipFileWriter, self).__init__(self._buffer, mode='w', compression=compression, allowZip64=True)
    
        def streambuffer(self, zinfo_or_arcname, buffer, chunksize=2**16):
            if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
                zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname,
                                        date_time=time.localtime(time.time())[:6])
                zinfo.compress_type = self.compression
                zinfo.external_attr = 0o600 << 16     # ?rw-------
            else:
                zinfo = zinfo_or_arcname
    
            zinfo.file_size = file_size = 0
            zinfo.flag_bits = 0x08  # Streaming mode: crc and size come after the data
            zinfo.header_offset = self.fp.tell()
    
            self._writecheck(zinfo)
            self._didModify = True
    
            zinfo.CRC = CRC = 0
            zinfo.compress_size = compress_size = 0
            self.fp.write(zinfo.FileHeader())
            if zinfo.compress_type == zipfile.ZIP_DEFLATED:
                cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
            else:
                cmpr = None
    
            while True:
                buf = buffer.read(chunksize)
                if not buf:
                    break
    
                file_size += len(buf)
                CRC = binascii.crc32(buf, CRC) & 0xffffffff
                if cmpr:
                    buf = cmpr.compress(buf)
                    compress_size += len(buf)
    
                self.fp.write(buf)
                compressed_bytes = self._buffer.consume()
                if compressed_bytes:
                    yield compressed_bytes
    
            if cmpr:
                buf = cmpr.flush()
                compress_size += len(buf)
                self.fp.write(buf)
                zinfo.compress_size = compress_size
                compressed_bytes = self._buffer.consume()
                if compressed_bytes:
                    yield compressed_bytes
            else:
                zinfo.compress_size = file_size
    
            zinfo.CRC = CRC
            zinfo.file_size = file_size
    
            # Write CRC and file sizes after the file data
            # Always write as zip64 -- only safe way to stream what might become a large zipfile
            fmt = '<LQQ'
            self.fp.write(struct.pack(fmt, zinfo.CRC, zinfo.compress_size, zinfo.file_size))
    
            self.fp.flush()
            self.filelist.append(zinfo)
            self.NameToInfo[zinfo.filename] = zinfo
            yield self._buffer.consume()
    
        # The close method needs to be patched to force writing a ZIP64 file
        # We'll hack ZIP_FILECOUNT_LIMIT to do the forcing
        def close(self):
            tmp = zipfile.ZIP_FILECOUNT_LIMIT
            zipfile.ZIP_FILECOUNT_LIMIT = 0
            super(BufferedZipFileWriter, self).close()
            zipfile.ZIP_FILECOUNT_LIMIT = tmp
            return self._buffer.consume()
    
        8
  •  0
  •   Oddthinking    16 年前

    gzip库将采用类似文件的对象进行压缩。

    class GzipFile([filename [,mode [,compresslevel [,fileobj]]]])
    

    现在我看到原来的提问者不会接受Gzip:-(

        9
  •  0
  •   Belug    11 年前

    现在,使用python 2.7,您可以将数据添加到zipfile而不是文件:

    http://docs.python.org/2/library/zipfile#zipfile.ZipFile.writestr

        10
  •  0
  •   Edwinner    8 年前

    这是2017年。如果您仍希望优雅地完成此操作,请使用 Python Zipstream by allanlei 到目前为止,它可能是实现这一点的唯一编写良好的库。