From 3a2381ff2dd54752622e1325b433f05230bebd37 Mon Sep 17 00:00:00 2001 From: ed Date: Sun, 14 Sep 2025 23:27:33 +0000 Subject: [PATCH] webdav: fix depth:0 in rootless vfs; also safeguards against potential issues with invalid paths if the api is used incorrectly from a plugin --- copyparty/authsrv.py | 26 ++++++++++++++++++++++---- copyparty/httpcli.py | 16 ++++++++++++++-- copyparty/util.py | 10 ++++++---- 3 files changed, 42 insertions(+), 10 deletions(-) diff --git a/copyparty/authsrv.py b/copyparty/authsrv.py index 517eb4af..be08e577 100644 --- a/copyparty/authsrv.py +++ b/copyparty/authsrv.py @@ -426,15 +426,17 @@ class VFS(object): self.all_nodes[vpath] = self self.all_aps = [(rp, [self])] self.all_vps = [(vp, self)] + self.canonical = self._canonical + self.dcanonical = self._dcanonical else: self.histpath = self.dbpath = "" self.all_aps = [] self.all_vps = [] + self.canonical = self._canonical_null + self.dcanonical = self._dcanonical_null self.get_dbv = self._get_dbv self.ls = self._ls - self.canonical = self._canonical - self.dcanonical = self._dcanonical def __repr__(self) -> str: return "VFS(%s)" % ( @@ -628,6 +630,12 @@ class VFS(object): vrem = vjoin(self.vpath[len(dbv.vpath) :].lstrip("/"), vrem) return dbv, vrem + def _canonical_null(self, rem: str, resolve: bool = True) -> str: + return "" + + def _dcanonical_null(self, rem: str) -> str: + return "" + def _canonical(self, rem: str, resolve: bool = True) -> str: """returns the canonical path (fully-resolved absolute fs path)""" ap = self.realpath @@ -715,8 +723,12 @@ class VFS(object): """return user-readable [fsdir,real,virt] items at vpath""" virt_vis = {} # nodes readable by user abspath = self.canonical(rem) - real = list(statdir(self.log, scandir, lstat, abspath, throw)) - real.sort() + if abspath: + real = list(statdir(self.log, scandir, lstat, abspath, throw)) + real.sort() + else: + real = [] + if not rem: # no vfs nodes in the list of real inodes real = [x for x in real if x[0] not in self.nodes] @@ -2010,6 +2022,8 @@ class AuthSrv(object): promote = [] demote = [] for vol in vfs.all_vols.values(): + if not vol.realpath: + continue hid = self.hid_cache.get(vol.realpath) if not hid: zb = hashlib.sha512(afsenc(vol.realpath)).digest() @@ -2048,6 +2062,8 @@ class AuthSrv(object): vol.histpath = absreal(vol.histpath) for vol in vfs.all_vols.values(): + if not vol.realpath: + continue hid = self.hid_cache[vol.realpath] vflag = vol.flags.get("dbpath") if vflag == "-": @@ -2807,6 +2823,8 @@ class AuthSrv(object): shn.dcanonical = shn._dcanonical_shr else: shn.ls = shn._ls + shn.canonical = shn._canonical + shn.dcanonical = shn._dcanonical shn.shr_owner = s_un shn.shr_src = (s_vfs, s_rem) diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 2b53f00d..5b278887 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -1551,6 +1551,7 @@ class HttpCli(object): if xtag is not None: props = set([y.tag.split("}")[-1] for y in xtag]) # assume otherwise; nobody ever gonna + self.hint = "" zi = int(time.time()) vst = os.stat_result((16877, -1, -1, 1, 1000, 1000, 8, zi, zi, zi)) @@ -1560,7 +1561,9 @@ class HttpCli(object): except OSError as ex: if ex.errno not in (errno.ENOENT, errno.ENOTDIR): raise - raise Pebkac(404) + if tap: + raise Pebkac(404) + st = vst topdir = {"vp": "", "st": st} fgen: Iterable[dict[str, Any]] = [] @@ -1600,6 +1603,9 @@ class HttpCli(object): ) elif depth == "0" or not stat.S_ISDIR(st.st_mode): + if depth == "0" and not self.vpath and not vn.realpath: + # rootless server; give dummy listing + self.can_read = True # propfind on a file; return as topdir if not self.can_read and not self.can_get: self.log("inaccessible: %r" % ("/" + self.vpath,)) @@ -1632,7 +1638,11 @@ class HttpCli(object): self.log("inaccessible: %r" % ("/" + self.vpath,)) raise Pebkac(401, "authenticate") - zi = vn.flags["du_iwho"] if "quota-available-bytes" in props else 0 + zi = ( + vn.flags["du_iwho"] + if vn.realpath and "quota-available-bytes" in props + else 0 + ) if zi and ( zi == 9 or (zi == 7 and self.uname != "*") @@ -1767,6 +1777,7 @@ class HttpCli(object): assert xprop # !rm for ze in xprop: ze.clear() + self.hint = "" txt = """HTTP/1.1 403 Forbidden""" xroot = parse_xml(txt) @@ -1824,6 +1835,7 @@ class HttpCli(object): ET.register_namespace("D", "DAV:") lk = parse_xml(txt) assert lk.tag == "{DAV:}lockinfo" + self.hint = "" token = str(uuid.uuid4()) diff --git a/copyparty/util.py b/copyparty/util.py index fcaa7f1b..271c5e93 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -1577,10 +1577,12 @@ def vol_san(vols: list["VFS"], txt: bytes) -> bytes: bvp = vol.vpath.encode("utf-8") bvph = b"$hist(/" + bvp + b")" - txt = txt.replace(bap, bvp) - txt = txt.replace(bhp, bvph) - txt = txt.replace(bap.replace(b"\\", b"\\\\"), bvp) - txt = txt.replace(bhp.replace(b"\\", b"\\\\"), bvph) + if bap: + txt = txt.replace(bap, bvp) + txt = txt.replace(bap.replace(b"\\", b"\\\\"), bvp) + if bhp: + txt = txt.replace(bhp, bvph) + txt = txt.replace(bhp.replace(b"\\", b"\\\\"), bvph) if vol.histpath != vol.dbpath: bdp = vol.dbpath.encode("utf-8")