Skip to content

Commit

Permalink
Fix edge cases in checking if zip64 extensions are required
Browse files Browse the repository at this point in the history
This fixes an issue where if data requiring zip64 extensions was added
to an unseekable stream without specifying `force_zip64=True`, zip64
extensions would not be used and a RuntimeError would not be raised when
closing the file (even though the size would be known at that point).
This would result in successfully writing corrupt zip files.

Deciding if zip64 extensions are required outside of the `FileHeader`
function means that both `FileHeader` and `_ZipWriteFile` will always be
in sync. Previously, the `FileHeader` function could enable zip64
extensions without propagating that decision to the `_ZipWriteFile`
class, which would then not correctly write the data descriptor record
or check for errors on close.
  • Loading branch information
pR0Ps committed Apr 28, 2023
1 parent c42700d commit d24c6a4
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 15 deletions.
86 changes: 86 additions & 0 deletions Lib/test/test_zipfile/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,92 @@ def test_force_zip64(self):
self.assertEqual(len(zinfos), 1)
self.assertGreaterEqual(zinfos[0].extract_version, zipfile.ZIP64_VERSION) # requires zip64 to extract

def test_unseekable_zip_unknown_filesize(self):
"""Test that creating a zip with/without seeking will raise a RuntimeError if zip64 was required but not used"""

def make_zip(fp):
with zipfile.ZipFile(fp, mode="w", allowZip64=True) as zf:
with zf.open("text.txt", mode="w", force_zip64=False) as zi:
zi.write(b"_" * (zipfile.ZIP64_LIMIT + 1))

self.assertRaises(RuntimeError, make_zip, io.BytesIO())
self.assertRaises(RuntimeError, make_zip, Unseekable(io.BytesIO()))

def test_zip64_required_not_allowed_fail(self):
"""Test that trying to add a large file to a zip that doesn't allow zip64 extensions fails on add"""
def make_zip(fp):
with zipfile.ZipFile(fp, mode="w", allowZip64=False) as zf:
# pretend zipfile.ZipInfo.from_file was used to get the name and filesize
info = zipfile.ZipInfo("text.txt")
info.file_size = zipfile.ZIP64_LIMIT + 1
zf.open(info, mode="w")

self.assertRaises(zipfile.LargeZipFile, make_zip, io.BytesIO())
self.assertRaises(zipfile.LargeZipFile, make_zip, Unseekable(io.BytesIO()))

def test_unseekable_zip_known_filesize(self):
"""Test that creating a zip without seeking will use zip64 extensions if the file size is provided up-front"""

file_size = zipfile.ZIP64_LIMIT + 1

def make_zip(fp):
with zipfile.ZipFile(fp, mode="w", allowZip64=True) as zf:
# pretend zipfile.ZipInfo.from_file was used to get the name and filesize
info = zipfile.ZipInfo("text.txt")
info.file_size = file_size
with zf.open(info, mode="w", force_zip64=False) as zi:
zi.write(b"_" * file_size)
return fp

# check seekable file information
seekable_data = make_zip(io.BytesIO()).getvalue()
(
header, vers, os, flags, comp, csize, usize, fn_len,
ex_total_len, filename, ex_id, ex_len, ex_usize, ex_csize,
cd_sig
) = struct.unpack("<4sBBHH8xIIHH8shhQQ{}x4s".format(file_size), seekable_data[:62 + file_size])

self.assertEqual(header, b"PK\x03\x04")
self.assertGreaterEqual(vers, zipfile.ZIP64_VERSION) # requires zip64 to extract
self.assertEqual(os, 0) # compatible with MS-DOS
self.assertEqual(flags, 0) # no flags set
self.assertEqual(comp, 0) # compression method = stored
self.assertEqual(csize, 0xFFFFFFFF) # sizes are in zip64 extra
self.assertEqual(usize, 0xFFFFFFFF)
self.assertEqual(fn_len, 8) # filename len
self.assertEqual(ex_total_len, 20) # size of extra records
self.assertEqual(ex_id, 1) # Zip64 extra record
self.assertEqual(ex_len, 16) # 16 bytes of data
self.assertEqual(ex_usize, file_size) # uncompressed size
self.assertEqual(ex_csize, file_size) # compressed size
self.assertEqual(cd_sig, b"PK\x01\x02") # ensure the central directory header is next

# check unseekable file information
unseekable_data = make_zip(Unseekable(io.BytesIO())).fp.getvalue()
(
header, vers, os, flags, comp, csize, usize, fn_len,
ex_total_len, filename, ex_id, ex_len, ex_usize, ex_csize,
dd_header, dd_usize, dd_csize, cd_sig
) = struct.unpack("<4sBBHH8xIIHH8shhQQ{}x4s4xQQ4s".format(file_size), unseekable_data[:86 + file_size])

self.assertEqual(header, b"PK\x03\x04")
self.assertGreaterEqual(vers, zipfile.ZIP64_VERSION) # requires zip64 to extract
self.assertEqual(os, 0) # compatible with MS-DOS
self.assertEqual("{:b}".format(flags), "1000") # streaming flag set
self.assertEqual(comp, 0) # compression method = stored
self.assertEqual(csize, 0xFFFFFFFF) # sizes are in zip64 extra
self.assertEqual(usize, 0xFFFFFFFF)
self.assertEqual(fn_len, 8) # filename len
self.assertEqual(ex_total_len, 20) # size of extra records
self.assertEqual(ex_id, 1) # Zip64 extra record
self.assertEqual(ex_len, 16) # 16 bytes of data
self.assertEqual(ex_usize, 0) # uncompressed size - 0 to defer to data descriptor
self.assertEqual(ex_csize, 0) # compressed size - 0 to defer to data descriptor
self.assertEqual(dd_header, b"PK\07\x08") # data descriptor
self.assertEqual(dd_usize, file_size) # file size (8 bytes because zip64)
self.assertEqual(dd_csize, file_size) # compressed size (8 bytes because zip64)
self.assertEqual(cd_sig, b"PK\x01\x02") # ensure the central directory header is next


@requires_zlib()
class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
Expand Down
25 changes: 10 additions & 15 deletions Lib/zipfile/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ def __repr__(self):
result.append('>')
return ''.join(result)

def FileHeader(self, zip64=None):
def FileHeader(self, zip64):
"""Return the per-file header as a bytes object."""
dt = self.date_time
dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
Expand All @@ -455,11 +455,6 @@ def FileHeader(self, zip64=None):
extra = self.extra

min_version = 0
if (file_size > ZIP64_LIMIT or compress_size > ZIP64_LIMIT):
if zip64 is None:
zip64 = True
elif not zip64:
raise LargeZipFile("Filesize would require ZIP64 extensions")
if zip64:
fmt = '<HHQQ'
extra = extra + struct.pack(fmt,
Expand Down Expand Up @@ -1215,6 +1210,12 @@ def close(self):
self._zinfo.CRC = self._crc
self._zinfo.file_size = self._file_size

if not self._zip64:
if self._file_size > ZIP64_LIMIT:
raise RuntimeError("File size too large, try using force_zip64")
if self._compress_size > ZIP64_LIMIT:
raise RuntimeError("Compressed size too large, try using force_zip64")

# Write updated header info
if self._zinfo.flag_bits & _MASK_USE_DATA_DESCRIPTOR:
# Write CRC and file sizes after the file data
Expand All @@ -1223,13 +1224,6 @@ def close(self):
self._zinfo.compress_size, self._zinfo.file_size))
self._zipfile.start_dir = self._fileobj.tell()
else:
if not self._zip64:
if self._file_size > ZIP64_LIMIT:
raise RuntimeError(
'File size too large, try using force_zip64')
if self._compress_size > ZIP64_LIMIT:
raise RuntimeError(
'Compressed size too large, try using force_zip64')
# Seek backwards and write file header (which will now include
# correct CRC and file sizes)

Expand Down Expand Up @@ -1668,8 +1662,9 @@ def _open_to_write(self, zinfo, force_zip64=False):
zinfo.external_attr = 0o600 << 16 # permissions: ?rw-------

# Compressed size can be larger than uncompressed size
zip64 = self._allowZip64 and \
(force_zip64 or zinfo.file_size * 1.05 > ZIP64_LIMIT)
zip64 = force_zip64 or (zinfo.file_size * 1.05 > ZIP64_LIMIT)
if not self._allowZip64 and zip64:
raise LargeZipFile("Filesize would require ZIP64 extensions")

if self._seekable:
self.fp.seek(self.start_dir)
Expand Down

0 comments on commit d24c6a4

Please sign in to comment.