webdav: fix depth:0 in rootless vfs;

also safeguards against potential issues with invalid
paths if the api is used incorrectly from a plugin
This commit is contained in:
ed 2025-09-14 23:27:33 +00:00
parent 83bd197438
commit 3a2381ff2d
3 changed files with 42 additions and 10 deletions

View file

@ -426,15 +426,17 @@ class VFS(object):
self.all_nodes[vpath] = self
self.all_aps = [(rp, [self])]
self.all_vps = [(vp, self)]
self.canonical = self._canonical
self.dcanonical = self._dcanonical
else:
self.histpath = self.dbpath = ""
self.all_aps = []
self.all_vps = []
self.canonical = self._canonical_null
self.dcanonical = self._dcanonical_null
self.get_dbv = self._get_dbv
self.ls = self._ls
self.canonical = self._canonical
self.dcanonical = self._dcanonical
def __repr__(self) -> str:
return "VFS(%s)" % (
@ -628,6 +630,12 @@ class VFS(object):
vrem = vjoin(self.vpath[len(dbv.vpath) :].lstrip("/"), vrem)
return dbv, vrem
def _canonical_null(self, rem: str, resolve: bool = True) -> str:
return ""
def _dcanonical_null(self, rem: str) -> str:
return ""
def _canonical(self, rem: str, resolve: bool = True) -> str:
"""returns the canonical path (fully-resolved absolute fs path)"""
ap = self.realpath
@ -715,8 +723,12 @@ class VFS(object):
"""return user-readable [fsdir,real,virt] items at vpath"""
virt_vis = {} # nodes readable by user
abspath = self.canonical(rem)
real = list(statdir(self.log, scandir, lstat, abspath, throw))
real.sort()
if abspath:
real = list(statdir(self.log, scandir, lstat, abspath, throw))
real.sort()
else:
real = []
if not rem:
# no vfs nodes in the list of real inodes
real = [x for x in real if x[0] not in self.nodes]
@ -2010,6 +2022,8 @@ class AuthSrv(object):
promote = []
demote = []
for vol in vfs.all_vols.values():
if not vol.realpath:
continue
hid = self.hid_cache.get(vol.realpath)
if not hid:
zb = hashlib.sha512(afsenc(vol.realpath)).digest()
@ -2048,6 +2062,8 @@ class AuthSrv(object):
vol.histpath = absreal(vol.histpath)
for vol in vfs.all_vols.values():
if not vol.realpath:
continue
hid = self.hid_cache[vol.realpath]
vflag = vol.flags.get("dbpath")
if vflag == "-":
@ -2807,6 +2823,8 @@ class AuthSrv(object):
shn.dcanonical = shn._dcanonical_shr
else:
shn.ls = shn._ls
shn.canonical = shn._canonical
shn.dcanonical = shn._dcanonical
shn.shr_owner = s_un
shn.shr_src = (s_vfs, s_rem)

View file

@ -1551,6 +1551,7 @@ class HttpCli(object):
if xtag is not None:
props = set([y.tag.split("}")[-1] for y in xtag])
# assume <allprop/> otherwise; nobody ever gonna <propname/>
self.hint = ""
zi = int(time.time())
vst = os.stat_result((16877, -1, -1, 1, 1000, 1000, 8, zi, zi, zi))
@ -1560,7 +1561,9 @@ class HttpCli(object):
except OSError as ex:
if ex.errno not in (errno.ENOENT, errno.ENOTDIR):
raise
raise Pebkac(404)
if tap:
raise Pebkac(404)
st = vst
topdir = {"vp": "", "st": st}
fgen: Iterable[dict[str, Any]] = []
@ -1600,6 +1603,9 @@ class HttpCli(object):
)
elif depth == "0" or not stat.S_ISDIR(st.st_mode):
if depth == "0" and not self.vpath and not vn.realpath:
# rootless server; give dummy listing
self.can_read = True
# propfind on a file; return as topdir
if not self.can_read and not self.can_get:
self.log("inaccessible: %r" % ("/" + self.vpath,))
@ -1632,7 +1638,11 @@ class HttpCli(object):
self.log("inaccessible: %r" % ("/" + self.vpath,))
raise Pebkac(401, "authenticate")
zi = vn.flags["du_iwho"] if "quota-available-bytes" in props else 0
zi = (
vn.flags["du_iwho"]
if vn.realpath and "quota-available-bytes" in props
else 0
)
if zi and (
zi == 9
or (zi == 7 and self.uname != "*")
@ -1767,6 +1777,7 @@ class HttpCli(object):
assert xprop # !rm
for ze in xprop:
ze.clear()
self.hint = ""
txt = """<multistatus xmlns="DAV:"><response><propstat><status>HTTP/1.1 403 Forbidden</status></propstat></response></multistatus>"""
xroot = parse_xml(txt)
@ -1824,6 +1835,7 @@ class HttpCli(object):
ET.register_namespace("D", "DAV:")
lk = parse_xml(txt)
assert lk.tag == "{DAV:}lockinfo"
self.hint = ""
token = str(uuid.uuid4())

View file

@ -1577,10 +1577,12 @@ def vol_san(vols: list["VFS"], txt: bytes) -> bytes:
bvp = vol.vpath.encode("utf-8")
bvph = b"$hist(/" + bvp + b")"
txt = txt.replace(bap, bvp)
txt = txt.replace(bhp, bvph)
txt = txt.replace(bap.replace(b"\\", b"\\\\"), bvp)
txt = txt.replace(bhp.replace(b"\\", b"\\\\"), bvph)
if bap:
txt = txt.replace(bap, bvp)
txt = txt.replace(bap.replace(b"\\", b"\\\\"), bvp)
if bhp:
txt = txt.replace(bhp, bvph)
txt = txt.replace(bhp.replace(b"\\", b"\\\\"), bvph)
if vol.histpath != vol.dbpath:
bdp = vol.dbpath.encode("utf-8")