thumb/epub: handle missing covers

This commit is contained in:
AppleTheGolden 2025-09-27 23:26:25 +02:00
parent e3baf932f3
commit e8309c3e99
No known key found for this signature in database
GPG key ID: F6AC8A62154C42AA
2 changed files with 64 additions and 38 deletions

View file

@ -131,7 +131,7 @@ class MParser(object):
def au_unpk( def au_unpk(
log: "NamedLogger", fmt_map: dict[str, str], abspath: str, vn: Optional[VFS] = None log: "NamedLogger", fmt_map: dict[str, str], abspath: str, vn: Optional[VFS] = None
) -> str: ) -> Optional[str]:
ret = "" ret = ""
maxsz = 1024 * 1024 * 64 maxsz = 1024 * 1024 * 64
try: try:
@ -178,23 +178,26 @@ def au_unpk(
elif pk == "epub": elif pk == "epub":
fi = get_cover_from_epub(log, abspath) fi = get_cover_from_epub(log, abspath)
assert fi # !rm
else: else:
raise Exception("unknown compression %s" % (pk,)) raise Exception("unknown compression %s" % (pk,))
fsz = 0 if fi:
with os.fdopen(fd, "wb") as fo: fsz = 0
while True: with os.fdopen(fd, "wb") as fo:
buf = fi.read(32768) while True:
if not buf: buf = fi.read(32768)
break if not buf:
break
fsz += len(buf) fsz += len(buf)
if fsz > maxsz: if fsz > maxsz:
raise Exception("zipbomb defused") raise Exception("zipbomb defused")
fo.write(buf) fo.write(buf)
else:
wunlink(log, ret, vn.flags if vn else VF_CAREFUL)
return None
return ret return ret
@ -422,10 +425,20 @@ def get_cover_from_epub(log: "NamedLogger", abspath: str) -> Optional[IO[bytes]]
# This might be an EPUB2 file, try the legacy way of specifying covers # This might be an EPUB2 file, try the legacy way of specifying covers
coverimage_path = _get_cover_from_epub2(log, package_root, package_ns) coverimage_path = _get_cover_from_epub2(log, package_root, package_ns)
# This url is either absolute (in the .epub) or relative to the package document if coverimage_path:
adjusted_cover_path = urljoin(rootfile_path, coverimage_path) # This url is either absolute (in the .epub) or relative to the package document
adjusted_cover_path = urljoin(rootfile_path, coverimage_path)
return z.open(adjusted_cover_path) try:
return z.open(adjusted_cover_path)
except KeyError:
log(
"epub: cover specified in package document, but doesn't exist: %s"
% (adjusted_cover_path,)
)
else:
log("epub: no cover found in %s" % (abspath,))
return None
def _get_cover_from_epub2( def _get_cover_from_epub2(

View file

@ -378,6 +378,16 @@ class ThumbSrv(object):
if ext in self.args.au_unpk: if ext in self.args.au_unpk:
ap_unpk = au_unpk(self.log, self.args.au_unpk, abspath, vn) ap_unpk = au_unpk(self.log, self.args.au_unpk, abspath, vn)
if not ap_unpk:
# This file doesn't have a thumbnail
self.worker_cleanup(abspath, ap_unpk, tpath, None, vn)
try:
# create empty thumbnail, otherwise would try again next time
open(tpath, "ab").close()
except OSError:
pass
continue
else: else:
ap_unpk = abspath ap_unpk = abspath
@ -451,40 +461,43 @@ class ThumbSrv(object):
except: except:
pass pass
if abspath != ap_unpk:
wunlink(self.log, ap_unpk, vn.flags)
try: try:
atomic_move(self.log, ttpath, tpath, vn.flags) atomic_move(self.log, ttpath, tpath, vn.flags)
except Exception as ex: except Exception as ex:
if not os.path.exists(tpath): if not os.path.exists(tpath):
t = "failed to move [%s] to [%s]: %r" t = "failed to move [%s] to [%s]: %r"
self.log(t % (ttpath, tpath, ex), 3) self.log(t % (ttpath, tpath, ex), 3)
pass
untemp = [] self.worker_cleanup(abspath, ap_unpk, tpath, ttpath, vn)
with self.mutex:
subs = self.busy[tpath]
del self.busy[tpath]
self.ram.pop(ttpath, None)
untemp = self.untemp.pop(ttpath, None) or []
for ap in untemp:
try:
wunlink(self.log, ap, VF_CAREFUL)
except:
pass
for x in subs:
with x:
x.notify_all()
with self.memcond:
self.memcond.notify_all()
with self.mutex: with self.mutex:
self.nthr -= 1 self.nthr -= 1
def worker_cleanup(self, abspath, ap_unpk, tpath, ttpath, vn):
if abspath != ap_unpk and ap_unpk:
wunlink(self.log, ap_unpk, vn.flags)
untemp = []
with self.mutex:
subs = self.busy[tpath]
del self.busy[tpath]
if ttpath:
self.ram.pop(ttpath, None)
untemp = self.untemp.pop(ttpath, None) or []
for ap in untemp:
try:
wunlink(self.log, ap, VF_CAREFUL)
except:
pass
for x in subs:
with x:
x.notify_all()
with self.memcond:
self.memcond.notify_all()
def fancy_pillow(self, im: "Image.Image", fmt: str, vn: VFS) -> "Image.Image": def fancy_pillow(self, im: "Image.Image", fmt: str, vn: VFS) -> "Image.Image":
# exif_transpose is expensive (loads full image + unconditional copy) # exif_transpose is expensive (loads full image + unconditional copy)
res = self.getres(vn, fmt) res = self.getres(vn, fmt)