copyparty/copyparty/star.py
ed 5919607ad0 sanitize fs-paths in archive error summary
also gets rid of a dumb debug print i forgot
2024-05-30 23:55:37 +00:00

157 lines
4.1 KiB
Python

# coding: utf-8
from __future__ import print_function, unicode_literals
import re
import stat
import tarfile
from queue import Queue
from .authsrv import AuthSrv
from .bos import bos
from .sutil import StreamArc, errdesc
from .util import Daemon, fsenc, min_ex
if True: # pylint: disable=using-constant-test
from typing import Any, Generator, Optional
from .util import NamedLogger
class QFile(object): # inherit io.StringIO for painful typing
"""file-like object which buffers writes into a queue"""
def __init__(self) -> None:
self.q: Queue[Optional[bytes]] = Queue(64)
self.bq: list[bytes] = []
self.nq = 0
def write(self, buf: Optional[bytes]) -> None:
if buf is None or self.nq >= 240 * 1024:
self.q.put(b"".join(self.bq))
self.bq = []
self.nq = 0
if buf is None:
self.q.put(None)
else:
self.bq.append(buf)
self.nq += len(buf)
class StreamTar(StreamArc):
"""construct in-memory tar file from the given path"""
def __init__(
self,
log: "NamedLogger",
asrv: AuthSrv,
fgen: Generator[dict[str, Any], None, None],
cmp: str = "",
**kwargs: Any
):
super(StreamTar, self).__init__(log, asrv, fgen)
self.ci = 0
self.co = 0
self.qfile = QFile()
self.errf: dict[str, Any] = {}
# python 3.8 changed to PAX_FORMAT as default;
# slower, bigger, and no particular advantage
fmt = tarfile.GNU_FORMAT
if "pax" in cmp:
# unless a client asks for it (currently
# gnu-tar has wider support than pax-tar)
fmt = tarfile.PAX_FORMAT
cmp = re.sub(r"[^a-z0-9]*pax[^a-z0-9]*", "", cmp)
try:
cmp, zs = cmp.replace(":", ",").split(",")
lv = int(zs)
except:
lv = -1
arg = {"name": None, "fileobj": self.qfile, "mode": "w", "format": fmt}
if cmp == "gz":
fun = tarfile.TarFile.gzopen
arg["compresslevel"] = lv if lv >= 0 else 3
elif cmp == "bz2":
fun = tarfile.TarFile.bz2open
arg["compresslevel"] = lv if lv >= 0 else 2
elif cmp == "xz":
fun = tarfile.TarFile.xzopen
arg["preset"] = lv if lv >= 0 else 1
else:
fun = tarfile.open
arg["mode"] = "w|"
self.tar = fun(**arg)
Daemon(self._gen, "star-gen")
def gen(self) -> Generator[Optional[bytes], None, None]:
buf = b""
try:
while True:
buf = self.qfile.q.get()
if not buf:
break
self.co += len(buf)
yield buf
yield None
finally:
while buf:
try:
buf = self.qfile.q.get()
except:
pass
if self.errf:
bos.unlink(self.errf["ap"])
def ser(self, f: dict[str, Any]) -> None:
name = f["vp"]
src = f["ap"]
fsi = f["st"]
if stat.S_ISDIR(fsi.st_mode):
return
inf = tarfile.TarInfo(name=name)
inf.mode = fsi.st_mode
inf.size = fsi.st_size
inf.mtime = fsi.st_mtime
inf.uid = 0
inf.gid = 0
self.ci += inf.size
with open(fsenc(src), "rb", self.args.iobuf) as fo:
self.tar.addfile(inf, fo)
def _gen(self) -> None:
errors = []
for f in self.fgen:
if "err" in f:
errors.append((f["vp"], f["err"]))
continue
if self.stopped:
break
try:
self.ser(f)
except:
ex = min_ex(5, True).replace("\n", "\n-- ")
errors.append((f["vp"], ex))
if errors:
self.errf, txt = errdesc(self.asrv.vfs, errors)
self.log("\n".join(([repr(self.errf)] + txt[1:])))
self.ser(self.errf)
self.tar.close()
self.qfile.write(None)