mirror of
https://github.com/9001/copyparty.git
synced 2025-08-17 09:02:15 -06:00
this avoids a false-positive in the info-zip unzip zipbomb detector. unfortunately, * now impossible to extract large (4 GiB) zipfiles using old software (WinXP, macos 10.12) * now less viable to stream download-as-zip into a zipfile unpacker (please use download-as-tar for that purpose) context: the zipfile specification (APPNOTE.TXT) is slightly ambiguous as to when data-descriptor (0x504b0708) filesize-fields change from 32bit to 64bit; both copyparty and libarchive independently made the same interpretation that this is only when the local header is zip64, AND the size-fields are both 0xFFFFFFFF. This makes sense because the data descriptor is only necessary when that particular file-to-be-added exceeds 4 GiB, and/or when the crc32 is not known ahead of time. another interpretation, seen in an early version of the patchset to fix CVE-2019-13232 (zip-bombs) in the info-zip unzip command, believes the only requirement is that the local header is zip64. in many linux distributions, the unzip command would thus fail on zipfiles created by copyparty, since they (by default) satisfy the three requirements to hit the zipbomb false-positive: * total filesize exceeds 4 GiB, and... * a mix of regular (32bit) and zip64 entries, and... * streaming-mode zipfile (not made with ?zip=crc) this issue no longer exists in a more recent version of that patchset, https://github.com/madler/unzip/commit/af0d07f95809653b but this fix has not yet made it into most linux distros
334 lines
9.2 KiB
Python
334 lines
9.2 KiB
Python
# coding: utf-8
|
|
from __future__ import print_function, unicode_literals
|
|
|
|
import calendar
|
|
import stat
|
|
import time
|
|
|
|
from .authsrv import AuthSrv
|
|
from .bos import bos
|
|
from .sutil import StreamArc, errdesc
|
|
from .util import min_ex, sanitize_fn, spack, sunpack, yieldfile, zlib
|
|
|
|
if True: # pylint: disable=using-constant-test
|
|
from typing import Any, Generator, Optional
|
|
|
|
from .util import NamedLogger
|
|
|
|
|
|
def dostime2unix(buf: bytes) -> int:
|
|
t, d = sunpack(b"<HH", buf)
|
|
|
|
ts = (t & 0x1F) * 2
|
|
tm = (t >> 5) & 0x3F
|
|
th = t >> 11
|
|
|
|
dd = d & 0x1F
|
|
dm = (d >> 5) & 0xF
|
|
dy = (d >> 9) + 1980
|
|
|
|
tt = (dy, dm, dd, th, tm, ts)
|
|
tf = "{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}"
|
|
iso = tf.format(*tt)
|
|
|
|
dt = time.strptime(iso, "%Y-%m-%d %H:%M:%S")
|
|
return int(calendar.timegm(dt))
|
|
|
|
|
|
def unixtime2dos(ts: int) -> bytes:
|
|
dy, dm, dd, th, tm, ts, _, _, _ = time.gmtime(ts + 1)
|
|
bd = ((dy - 1980) << 9) + (dm << 5) + dd
|
|
bt = (th << 11) + (tm << 5) + ts // 2
|
|
try:
|
|
return spack(b"<HH", bt, bd)
|
|
except:
|
|
return b"\x00\x00\x21\x00"
|
|
|
|
|
|
def gen_fdesc(sz: int, crc32: int, z64: bool) -> bytes:
|
|
ret = b"\x50\x4b\x07\x08"
|
|
fmt = b"<LQQ" if z64 else b"<LLL"
|
|
ret += spack(fmt, crc32, sz, sz)
|
|
return ret
|
|
|
|
|
|
def gen_hdr(
|
|
h_pos: Optional[int],
|
|
z64: bool,
|
|
fn: str,
|
|
sz: int,
|
|
lastmod: int,
|
|
utf8: bool,
|
|
icrc32: int,
|
|
pre_crc: bool,
|
|
) -> bytes:
|
|
"""
|
|
does regular file headers
|
|
and the central directory meme if h_pos is set
|
|
(h_pos = absolute position of the regular header)
|
|
"""
|
|
|
|
# appnote 4.5 / zip 3.0 (2008) / unzip 6.0 (2009) says to add z64
|
|
# extinfo for values which exceed H, but that becomes an off-by-one
|
|
# (can't tell if it was clamped or exactly maxval), make it obvious
|
|
z64v = [sz, sz] if z64 else []
|
|
if h_pos and h_pos >= 0xFFFFFFFF:
|
|
# central, also consider ptr to original header
|
|
z64v.append(h_pos)
|
|
|
|
# confusingly this doesn't bump if h_pos
|
|
req_ver = b"\x2d\x00" if z64 else b"\x0a\x00"
|
|
|
|
if icrc32:
|
|
crc32 = spack(b"<L", icrc32)
|
|
else:
|
|
crc32 = b"\x00" * 4
|
|
|
|
if h_pos is None:
|
|
# 4b magic, 2b min-ver
|
|
ret = b"\x50\x4b\x03\x04" + req_ver
|
|
else:
|
|
# 4b magic, 2b spec-ver (1b compat, 1b os (00 dos, 03 unix)), 2b min-ver
|
|
ret = b"\x50\x4b\x01\x02\x1e\x03" + req_ver
|
|
|
|
ret += b"\x00" if pre_crc else b"\x08" # streaming
|
|
ret += b"\x08" if utf8 else b"\x00" # appnote 6.3.2 (2007)
|
|
|
|
# 2b compression, 4b time, 4b crc
|
|
ret += b"\x00\x00" + unixtime2dos(lastmod) + crc32
|
|
|
|
# spec says to put zeros when !crc if bit3 (streaming)
|
|
# however infozip does actual sz and it even works on winxp
|
|
# (same reasoning for z64 extradata later)
|
|
vsz = 0xFFFFFFFF if z64 else sz
|
|
ret += spack(b"<LL", vsz, vsz)
|
|
|
|
# windows support (the "?" replace below too)
|
|
fn = sanitize_fn(fn, "/")
|
|
bfn = fn.encode("utf-8" if utf8 else "cp437", "replace").replace(b"?", b"_")
|
|
|
|
# add ntfs (0x24) and/or unix (0x10) extrafields for utc, add z64 if requested
|
|
z64_len = len(z64v) * 8 + 4 if z64v else 0
|
|
ret += spack(b"<HH", len(bfn), 0x10 + z64_len)
|
|
|
|
if h_pos is not None:
|
|
# 2b comment, 2b diskno
|
|
ret += b"\x00" * 4
|
|
|
|
# 2b internal.attr, 4b external.attr
|
|
# infozip-macos: 0100 0000 a481 (spec-ver 1e03) file:644
|
|
# infozip-macos: 0100 0100 0080 (spec-ver 1e03) file:000
|
|
# win10-zip: 0000 2000 0000 (spec-ver xx00) FILE_ATTRIBUTE_ARCHIVE
|
|
ret += b"\x00\x00\x00\x00\xa4\x81" # unx
|
|
# ret += b"\x00\x00\x20\x00\x00\x00" # fat
|
|
|
|
# 4b local-header-ofs
|
|
ret += spack(b"<L", min(h_pos, 0xFFFFFFFF))
|
|
|
|
ret += bfn
|
|
|
|
# ntfs: type 0a, size 20, rsvd, attr1, len 18, mtime, atime, ctime
|
|
# b"\xa3\x2f\x82\x41\x55\x68\xd8\x01" 1652616838.798941100 ~5.861518 132970904387989411 ~58615181
|
|
# nt = int((lastmod + 11644473600) * 10000000)
|
|
# ret += spack(b"<HHLHHQQQ", 0xA, 0x20, 0, 1, 0x18, nt, nt, nt)
|
|
|
|
# unix: type 0d, size 0c, atime, mtime, uid, gid
|
|
ret += spack(b"<HHLLHH", 0xD, 0xC, int(lastmod), int(lastmod), 1000, 1000)
|
|
|
|
if z64v:
|
|
ret += spack(b"<HH" + b"Q" * len(z64v), 1, len(z64v) * 8, *z64v)
|
|
|
|
return ret
|
|
|
|
|
|
def gen_ecdr(
|
|
items: list[tuple[str, int, int, int, int]], cdir_pos: int, cdir_end: int
|
|
) -> tuple[bytes, bool]:
|
|
"""
|
|
summary of all file headers,
|
|
usually the zipfile footer unless something clamps
|
|
"""
|
|
|
|
ret = b"\x50\x4b\x05\x06"
|
|
|
|
# 2b ndisk, 2b disk0
|
|
ret += b"\x00" * 4
|
|
|
|
cdir_sz = cdir_end - cdir_pos
|
|
|
|
nitems = min(0xFFFF, len(items))
|
|
csz = min(0xFFFFFFFF, cdir_sz)
|
|
cpos = min(0xFFFFFFFF, cdir_pos)
|
|
|
|
need_64 = nitems == 0xFFFF or 0xFFFFFFFF in [csz, cpos]
|
|
|
|
# 2b tnfiles, 2b dnfiles, 4b dir sz, 4b dir pos
|
|
ret += spack(b"<HHLL", nitems, nitems, csz, cpos)
|
|
|
|
# 2b comment length
|
|
ret += b"\x00\x00"
|
|
|
|
return ret, need_64
|
|
|
|
|
|
def gen_ecdr64(
|
|
items: list[tuple[str, int, int, int, int]], cdir_pos: int, cdir_end: int
|
|
) -> bytes:
|
|
"""
|
|
z64 end of central directory
|
|
added when numfiles or a headerptr clamps
|
|
"""
|
|
|
|
ret = b"\x50\x4b\x06\x06"
|
|
|
|
# 8b own length from hereon
|
|
ret += b"\x2c" + b"\x00" * 7
|
|
|
|
# 2b spec-ver, 2b min-ver
|
|
ret += b"\x1e\x03\x2d\x00"
|
|
|
|
# 4b ndisk, 4b disk0
|
|
ret += b"\x00" * 8
|
|
|
|
# 8b tnfiles, 8b dnfiles, 8b dir sz, 8b dir pos
|
|
cdir_sz = cdir_end - cdir_pos
|
|
ret += spack(b"<QQQQ", len(items), len(items), cdir_sz, cdir_pos)
|
|
|
|
return ret
|
|
|
|
|
|
def gen_ecdr64_loc(ecdr64_pos: int) -> bytes:
|
|
"""
|
|
z64 end of central directory locator
|
|
points to ecdr64
|
|
why
|
|
"""
|
|
|
|
ret = b"\x50\x4b\x06\x07"
|
|
|
|
# 4b cdisk, 8b start of ecdr64, 4b ndisks
|
|
ret += spack(b"<LQL", 0, ecdr64_pos, 1)
|
|
|
|
return ret
|
|
|
|
|
|
class StreamZip(StreamArc):
|
|
def __init__(
|
|
self,
|
|
log: "NamedLogger",
|
|
asrv: AuthSrv,
|
|
fgen: Generator[dict[str, Any], None, None],
|
|
utf8: bool = False,
|
|
pre_crc: bool = False,
|
|
**kwargs: Any
|
|
) -> None:
|
|
super(StreamZip, self).__init__(log, asrv, fgen)
|
|
|
|
self.utf8 = utf8
|
|
self.pre_crc = pre_crc
|
|
|
|
self.pos = 0
|
|
self.items: list[tuple[str, int, int, int, int]] = []
|
|
|
|
def _ct(self, buf: bytes) -> bytes:
|
|
self.pos += len(buf)
|
|
return buf
|
|
|
|
def ser(self, f: dict[str, Any]) -> Generator[bytes, None, None]:
|
|
name = f["vp"]
|
|
src = f["ap"]
|
|
st = f["st"]
|
|
|
|
if stat.S_ISDIR(st.st_mode):
|
|
return
|
|
|
|
sz = st.st_size
|
|
ts = st.st_mtime
|
|
h_pos = self.pos
|
|
|
|
crc = 0
|
|
if self.pre_crc:
|
|
for buf in yieldfile(src, self.args.iobuf):
|
|
crc = zlib.crc32(buf, crc)
|
|
|
|
crc &= 0xFFFFFFFF
|
|
|
|
# some unzip-programs expect a 64bit data-descriptor
|
|
# even if the only 32bit-exceeding value is the offset,
|
|
# so force that by placeholdering the filesize too
|
|
z64 = h_pos >= 0xFFFFFFFF or sz >= 0xFFFFFFFF
|
|
|
|
buf = gen_hdr(None, z64, name, sz, ts, self.utf8, crc, self.pre_crc)
|
|
yield self._ct(buf)
|
|
|
|
for buf in yieldfile(src, self.args.iobuf):
|
|
if not self.pre_crc:
|
|
crc = zlib.crc32(buf, crc)
|
|
|
|
yield self._ct(buf)
|
|
|
|
crc &= 0xFFFFFFFF
|
|
|
|
self.items.append((name, sz, ts, crc, h_pos))
|
|
|
|
if z64 or not self.pre_crc:
|
|
buf = gen_fdesc(sz, crc, z64)
|
|
yield self._ct(buf)
|
|
|
|
def gen(self) -> Generator[bytes, None, None]:
|
|
errf: dict[str, Any] = {}
|
|
errors = []
|
|
mbuf = b""
|
|
try:
|
|
for f in self.fgen:
|
|
if "err" in f:
|
|
errors.append((f["vp"], f["err"]))
|
|
continue
|
|
|
|
try:
|
|
for x in self.ser(f):
|
|
mbuf += x
|
|
if len(mbuf) >= 16384:
|
|
yield mbuf
|
|
mbuf = b""
|
|
except GeneratorExit:
|
|
raise
|
|
except:
|
|
ex = min_ex(5, True).replace("\n", "\n-- ")
|
|
errors.append((f["vp"], ex))
|
|
|
|
if mbuf:
|
|
yield mbuf
|
|
mbuf = b""
|
|
|
|
if errors:
|
|
errf, txt = errdesc(self.asrv.vfs, errors)
|
|
self.log("\n".join(([repr(errf)] + txt[1:])))
|
|
for x in self.ser(errf):
|
|
yield x
|
|
|
|
cdir_pos = self.pos
|
|
for name, sz, ts, crc, h_pos in self.items:
|
|
z64 = h_pos >= 0xFFFFFFFF or sz >= 0xFFFFFFFF
|
|
buf = gen_hdr(h_pos, z64, name, sz, ts, self.utf8, crc, self.pre_crc)
|
|
mbuf += self._ct(buf)
|
|
if len(mbuf) >= 16384:
|
|
yield mbuf
|
|
mbuf = b""
|
|
cdir_end = self.pos
|
|
|
|
_, need_64 = gen_ecdr(self.items, cdir_pos, cdir_end)
|
|
if need_64:
|
|
ecdir64_pos = self.pos
|
|
buf = gen_ecdr64(self.items, cdir_pos, cdir_end)
|
|
mbuf += self._ct(buf)
|
|
|
|
buf = gen_ecdr64_loc(ecdir64_pos)
|
|
mbuf += self._ct(buf)
|
|
|
|
ecdr, _ = gen_ecdr(self.items, cdir_pos, cdir_end)
|
|
yield mbuf + self._ct(ecdr)
|
|
finally:
|
|
if errf:
|
|
bos.unlink(errf["ap"])
|