fix detection of unsafe IdP volumes; closes #147

was overly aggressive until now, thinking the following was unsafe:

-v 'x::'                                  # no-anonymous-access
-v 'x/${u}:${u}:r:A,${u}'           # world-readable,user-admin
-v 'x/${u}/priv:${u}/priv:A,${u}'             # only-user-admin

now it realizes that this is safe because both IdP volumes
will be created/owned by the same user

however, if the first volume is 'x::r' then this is NOT safe,
and is now still correctly detected as being dangerous

also add a separate warning if `${g}` and `${u}` is mixed
in a volpath, since that is PROBABLY (not provably) unsafe
This commit is contained in:
ed 2025-03-14 21:08:21 +00:00
parent 815d46f2c4
commit 29a17ae2b7
2 changed files with 85 additions and 44 deletions

View file

@ -343,12 +343,14 @@ class VFS(object):
log: Optional["RootLogger"], log: Optional["RootLogger"],
realpath: str, realpath: str,
vpath: str, vpath: str,
vpath0: str,
axs: AXS, axs: AXS,
flags: dict[str, Any], flags: dict[str, Any],
) -> None: ) -> None:
self.log = log self.log = log
self.realpath = realpath # absolute path on host filesystem self.realpath = realpath # absolute path on host filesystem
self.vpath = vpath # absolute path in the virtual filesystem self.vpath = vpath # absolute path in the virtual filesystem
self.vpath0 = vpath0 # original vpath (before idp expansion)
self.axs = axs self.axs = axs
self.flags = flags # config options self.flags = flags # config options
self.root = self self.root = self
@ -417,7 +419,7 @@ class VFS(object):
for v in self.nodes.values(): for v in self.nodes.values():
v.get_all_vols(vols, nodes, aps, vps) v.get_all_vols(vols, nodes, aps, vps)
def add(self, src: str, dst: str) -> "VFS": def add(self, src: str, dst: str, dst0: str) -> "VFS":
"""get existing, or add new path to the vfs""" """get existing, or add new path to the vfs"""
assert src == "/" or not src.endswith("/") # nosec assert src == "/" or not src.endswith("/") # nosec
assert not dst.endswith("/") # nosec assert not dst.endswith("/") # nosec
@ -425,20 +427,22 @@ class VFS(object):
if "/" in dst: if "/" in dst:
# requires breadth-first population (permissions trickle down) # requires breadth-first population (permissions trickle down)
name, dst = dst.split("/", 1) name, dst = dst.split("/", 1)
name0, dst0 = dst0.split("/", 1)
if name in self.nodes: if name in self.nodes:
# exists; do not manipulate permissions # exists; do not manipulate permissions
return self.nodes[name].add(src, dst) return self.nodes[name].add(src, dst, dst0)
vn = VFS( vn = VFS(
self.log, self.log,
os.path.join(self.realpath, name) if self.realpath else "", os.path.join(self.realpath, name) if self.realpath else "",
"{}/{}".format(self.vpath, name).lstrip("/"), "{}/{}".format(self.vpath, name).lstrip("/"),
"{}/{}".format(self.vpath0, name0).lstrip("/"),
self.axs, self.axs,
self._copy_flags(name), self._copy_flags(name),
) )
vn.dbv = self.dbv or self vn.dbv = self.dbv or self
self.nodes[name] = vn self.nodes[name] = vn
return vn.add(src, dst) return vn.add(src, dst, dst0)
if dst in self.nodes: if dst in self.nodes:
# leaf exists; return as-is # leaf exists; return as-is
@ -446,7 +450,8 @@ class VFS(object):
# leaf does not exist; create and keep permissions blank # leaf does not exist; create and keep permissions blank
vp = "{}/{}".format(self.vpath, dst).lstrip("/") vp = "{}/{}".format(self.vpath, dst).lstrip("/")
vn = VFS(self.log, src, vp, AXS(), {}) vp0 = "{}/{}".format(self.vpath0, dst0).lstrip("/")
vn = VFS(self.log, src, vp, vp0, AXS(), {})
vn.dbv = self.dbv or self vn.dbv = self.dbv or self
self.nodes[dst] = vn self.nodes[dst] = vn
return vn return vn
@ -863,7 +868,7 @@ class AuthSrv(object):
self.indent = "" self.indent = ""
# fwd-decl # fwd-decl
self.vfs = VFS(log_func, "", "", AXS(), {}) self.vfs = VFS(log_func, "", "", "", AXS(), {})
self.acct: dict[str, str] = {} # uname->pw self.acct: dict[str, str] = {} # uname->pw
self.iacct: dict[str, str] = {} # pw->uname self.iacct: dict[str, str] = {} # pw->uname
self.ases: dict[str, str] = {} # uname->session self.ases: dict[str, str] = {} # uname->session
@ -931,7 +936,7 @@ class AuthSrv(object):
self, self,
src: str, src: str,
dst: str, dst: str,
mount: dict[str, str], mount: dict[str, tuple[str, str]],
daxs: dict[str, AXS], daxs: dict[str, AXS],
mflags: dict[str, dict[str, Any]], mflags: dict[str, dict[str, Any]],
un_gns: dict[str, list[str]], un_gns: dict[str, list[str]],
@ -969,7 +974,7 @@ class AuthSrv(object):
continue continue
visited.add(label) visited.add(label)
src, dst = self._map_volume(src, dst, mount, daxs, mflags) src, dst = self._map_volume(src, dst, dst0, mount, daxs, mflags)
if src: if src:
ret.append((src, dst, un, gn)) ret.append((src, dst, un, gn))
if un or gn: if un or gn:
@ -981,7 +986,8 @@ class AuthSrv(object):
self, self,
src: str, src: str,
dst: str, dst: str,
mount: dict[str, str], dst0: str,
mount: dict[str, tuple[str, str]],
daxs: dict[str, AXS], daxs: dict[str, AXS],
mflags: dict[str, dict[str, Any]], mflags: dict[str, dict[str, Any]],
) -> tuple[str, str]: ) -> tuple[str, str]:
@ -991,13 +997,13 @@ class AuthSrv(object):
if dst in mount: if dst in mount:
t = "multiple filesystem-paths mounted at [/{}]:\n [{}]\n [{}]" t = "multiple filesystem-paths mounted at [/{}]:\n [{}]\n [{}]"
self.log(t.format(dst, mount[dst], src), c=1) self.log(t.format(dst, mount[dst][0], src), c=1)
raise Exception(BAD_CFG) raise Exception(BAD_CFG)
if src in mount.values(): if src in mount.values():
t = "filesystem-path [{}] mounted in multiple locations:" t = "filesystem-path [{}] mounted in multiple locations:"
t = t.format(src) t = t.format(src)
for v in [k for k, v in mount.items() if v == src] + [dst]: for v in [k for k, v in mount.items() if v[0] == src] + [dst]:
t += "\n /{}".format(v) t += "\n /{}".format(v)
self.log(t, c=3) self.log(t, c=3)
@ -1006,7 +1012,7 @@ class AuthSrv(object):
if not bos.path.isdir(src): if not bos.path.isdir(src):
self.log("warning: filesystem-path does not exist: {}".format(src), 3) self.log("warning: filesystem-path does not exist: {}".format(src), 3)
mount[dst] = src mount[dst] = (src, dst0)
daxs[dst] = AXS() daxs[dst] = AXS()
mflags[dst] = {} mflags[dst] = {}
return (src, dst) return (src, dst)
@ -1067,7 +1073,7 @@ class AuthSrv(object):
grps: dict[str, list[str]], grps: dict[str, list[str]],
daxs: dict[str, AXS], daxs: dict[str, AXS],
mflags: dict[str, dict[str, Any]], mflags: dict[str, dict[str, Any]],
mount: dict[str, str], mount: dict[str, tuple[str, str]],
) -> None: ) -> None:
self.line_ctr = 0 self.line_ctr = 0
@ -1092,7 +1098,7 @@ class AuthSrv(object):
grps: dict[str, list[str]], grps: dict[str, list[str]],
daxs: dict[str, AXS], daxs: dict[str, AXS],
mflags: dict[str, dict[str, Any]], mflags: dict[str, dict[str, Any]],
mount: dict[str, str], mount: dict[str, tuple[str, str]],
npass: int, npass: int,
) -> None: ) -> None:
self.line_ctr = 0 self.line_ctr = 0
@ -1451,8 +1457,8 @@ class AuthSrv(object):
acct: dict[str, str] = {} # username:password acct: dict[str, str] = {} # username:password
grps: dict[str, list[str]] = {} # groupname:usernames grps: dict[str, list[str]] = {} # groupname:usernames
daxs: dict[str, AXS] = {} daxs: dict[str, AXS] = {}
mflags: dict[str, dict[str, Any]] = {} # moutpoint:flags mflags: dict[str, dict[str, Any]] = {} # vpath:flags
mount: dict[str, str] = {} # dst:src (mountpoint:realpath) mount: dict[str, tuple[str, str]] = {} # dst:src (vp:(ap,vp0))
self.idp_vols = {} # yolo self.idp_vols = {} # yolo
@ -1531,8 +1537,8 @@ class AuthSrv(object):
# case-insensitive; normalize # case-insensitive; normalize
if WINDOWS: if WINDOWS:
cased = {} cased = {}
for k, v in mount.items(): for vp, (ap, vp0) in mount.items():
cased[k] = absreal(v) cased[vp] = (absreal(ap), vp0)
mount = cased mount = cased
@ -1547,25 +1553,26 @@ class AuthSrv(object):
t = "Read-access has been disabled due to failsafe: No volumes were defined by the config-file. This failsafe is to prevent unintended access if this is due to accidental loss of config. You can override this safeguard and allow read/write to the working-directory by adding the following arguments: -v .::rw" t = "Read-access has been disabled due to failsafe: No volumes were defined by the config-file. This failsafe is to prevent unintended access if this is due to accidental loss of config. You can override this safeguard and allow read/write to the working-directory by adding the following arguments: -v .::rw"
self.log(t, 1) self.log(t, 1)
axs = AXS() axs = AXS()
vfs = VFS(self.log_func, absreal("."), "", axs, {}) vfs = VFS(self.log_func, absreal("."), "", "", axs, {})
elif "" not in mount: elif "" not in mount:
# there's volumes but no root; make root inaccessible # there's volumes but no root; make root inaccessible
zsd = {"d2d": True, "tcolor": self.args.tcolor} zsd = {"d2d": True, "tcolor": self.args.tcolor}
vfs = VFS(self.log_func, "", "", AXS(), zsd) vfs = VFS(self.log_func, "", "", "", AXS(), zsd)
maxdepth = 0 maxdepth = 0
for dst in sorted(mount.keys(), key=lambda x: (x.count("/"), len(x))): for dst in sorted(mount.keys(), key=lambda x: (x.count("/"), len(x))):
depth = dst.count("/") depth = dst.count("/")
assert maxdepth <= depth # nosec assert maxdepth <= depth # nosec
maxdepth = depth maxdepth = depth
src, dst0 = mount[dst]
if dst == "": if dst == "":
# rootfs was mapped; fully replaces the default CWD vfs # rootfs was mapped; fully replaces the default CWD vfs
vfs = VFS(self.log_func, mount[dst], dst, daxs[dst], mflags[dst]) vfs = VFS(self.log_func, src, dst, dst0, daxs[dst], mflags[dst])
continue continue
assert vfs # type: ignore assert vfs # type: ignore
zv = vfs.add(mount[dst], dst) zv = vfs.add(src, dst, dst0)
zv.axs = daxs[dst] zv.axs = daxs[dst]
zv.flags = mflags[dst] zv.flags = mflags[dst]
zv.dbv = None zv.dbv = None
@ -1599,7 +1606,7 @@ class AuthSrv(object):
if enshare: if enshare:
import sqlite3 import sqlite3
shv = VFS(self.log_func, "", shr, AXS(), {}) shv = VFS(self.log_func, "", shr, shr, AXS(), {})
db_path = self.args.shr_db db_path = self.args.shr_db
db = sqlite3.connect(db_path) db = sqlite3.connect(db_path)
@ -1633,9 +1640,8 @@ class AuthSrv(object):
# don't know the abspath yet + wanna ensure the user # don't know the abspath yet + wanna ensure the user
# still has the privs they granted, so nullmap it # still has the privs they granted, so nullmap it
shv.nodes[s_k] = VFS( vp = "%s/%s" % (shr, s_k)
self.log_func, "", "%s/%s" % (shr, s_k), s_axs, shv.flags.copy() shv.nodes[s_k] = VFS(self.log_func, "", vp, vp, s_axs, shv.flags.copy())
)
vfs.nodes[shr] = vfs.all_vols[shr] = shv vfs.nodes[shr] = vfs.all_vols[shr] = shv
for vol in shv.nodes.values(): for vol in shv.nodes.values():
@ -2278,22 +2284,56 @@ class AuthSrv(object):
except Pebkac: except Pebkac:
self.warn_anonwrite = True self.warn_anonwrite = True
idp_err = "WARNING! The following IdP volumes are mounted directly below another volume where anonymous users can read and/or write files. This is a SECURITY HAZARD!! When copyparty is restarted, it will not know about these IdP volumes yet. These volumes will then be accessible by anonymous users UNTIL one of the users associated with their volume sends a request to the server. RECOMMENDATION: You should create a restricted volume where nobody can read/write files, and make sure that all IdP volumes are configured to appear somewhere below that volume." self.idp_warn = []
self.idp_err = []
for idp_vp in self.idp_vols: for idp_vp in self.idp_vols:
parent_vp = vsplit(idp_vp)[0] idp_vn, _ = vfs.get(idp_vp, "*", False, False)
vn, _ = vfs.get(parent_vp, "*", False, False) idp_vp0 = idp_vn.vpath0
zs = (
"READABLE" sigils = set(re.findall(r"(\${[ug]})", idp_vp0))
if "*" in vn.axs.uread if len(sigils) > 1:
else "WRITABLE" t = '\nWARNING: IdP-volume "/%s" created by "/%s" has multiple IdP placeholders: %s'
if "*" in vn.axs.uwrite self.idp_warn.append(t % (idp_vp, idp_vp0, list(sigils)))
else "" continue
)
if zs: sigil = sigils.pop()
t = '\nWARNING: Volume "/%s" appears below "/%s" and would be WORLD-%s' par_vp = idp_vp
idp_err += t % (idp_vp, vn.vpath, zs) while par_vp:
if "\n" in idp_err: par_vp = vsplit(par_vp)[0]
self.log(idp_err, 1) par_vn, _ = vfs.get(par_vp, "*", False, False)
if sigil in par_vn.vpath0:
continue # parent was spawned for and by same user
oth_read = []
oth_write = []
for usr in par_vn.axs.uread:
if usr not in idp_vn.axs.uread:
oth_read.append(usr)
for usr in par_vn.axs.uwrite:
if usr not in idp_vn.axs.uwrite:
oth_write.append(usr)
if "*" in oth_read:
taxs = "WORLD-READABLE"
elif "*" in oth_write:
taxs = "WORLD-WRITABLE"
elif oth_read:
taxs = "READABLE BY %r" % (oth_read,)
elif oth_write:
taxs = "WRITABLE BY %r" % (oth_write,)
else:
continue
t = '\nWARNING: IdP-volume "/%s" created by "/%s" has parent/grandparent "/%s" and would be %s'
self.idp_err.append(t % (idp_vp, idp_vp0, par_vn.vpath, taxs))
if self.idp_warn:
t = "WARNING! Some IdP volumes include multiple IdP placeholders; this is too complex to automatically determine if safe or not. To ensure that no users gain unintended access, please use only a single placeholder for each IdP volume."
self.log(t + "".join(self.idp_warn), 1)
if self.idp_err:
t = "WARNING! The following IdP volumes are mounted below another volume where other users can read and/or write files. This is a SECURITY HAZARD!! When copyparty is restarted, it will not know about these IdP volumes yet. These volumes will then be accessible by an unexpected set of permissions UNTIL one of the users associated with their volume sends a request to the server. RECOMMENDATION: You should create a restricted volume where nobody can read/write files, and make sure that all IdP volumes are configured to appear somewhere below that volume."
self.log(t + "".join(self.idp_err), 1)
self.vfs = vfs self.vfs = vfs
self.acct = acct self.acct = acct
@ -2375,7 +2415,7 @@ class AuthSrv(object):
continue # also fine continue # also fine
for zs in svn.nodes.keys(): for zs in svn.nodes.keys():
# hide subvolume # hide subvolume
vn.nodes[zs] = VFS(self.log_func, "", "", AXS(), {}) vn.nodes[zs] = VFS(self.log_func, "", "", "", AXS(), {})
cur2.close() cur2.close()
cur.close() cur.close()

View file

@ -78,7 +78,7 @@ class Fstab(object):
return vid return vid
def build_fallback(self) -> None: def build_fallback(self) -> None:
self.tab = VFS(self.log_func, "idk", "/", AXS(), {}) self.tab = VFS(self.log_func, "idk", "/", "/", AXS(), {})
self.trusted = False self.trusted = False
def build_tab(self) -> None: def build_tab(self) -> None:
@ -111,9 +111,10 @@ class Fstab(object):
tab1.sort(key=lambda x: (len(x[0]), x[0])) tab1.sort(key=lambda x: (len(x[0]), x[0]))
path1, fs1 = tab1[0] path1, fs1 = tab1[0]
tab = VFS(self.log_func, fs1, path1, AXS(), {}) tab = VFS(self.log_func, fs1, path1, path1, AXS(), {})
for path, fs in tab1[1:]: for path, fs in tab1[1:]:
tab.add(fs, path.lstrip("/")) zs = path.lstrip("/")
tab.add(fs, zs, zs)
self.tab = tab self.tab = tab
self.srctab = srctab self.srctab = srctab