mirror of
https://github.com/9001/copyparty.git
synced 2025-08-17 09:02:15 -06:00
idp: precise expansion of ${u}
(fixes #79);
it is now possible to grant access to users other than `${u}` (the user which the volume belongs to) previously, permissions did not apply correctly to IdP volumes due to the way `${u}` and `${g}` was expanded, which was a funky iteration over all known users/groups instead of... just expanding them? also adds another sanchk that a volume's URL must contain a `${u}` to be allowed to mention `${u}` in the accs list, and similarly for `${g}` / `@${g}` since users can be in multiple groups
This commit is contained in:
parent
8e5c436bef
commit
d30ae8453d
|
@ -1224,7 +1224,9 @@ class AuthSrv(object):
|
||||||
if un.startswith("@"):
|
if un.startswith("@"):
|
||||||
grp = un[1:]
|
grp = un[1:]
|
||||||
uns = [x[0] for x in un_gns.items() if grp in x[1]]
|
uns = [x[0] for x in un_gns.items() if grp in x[1]]
|
||||||
if not uns and grp != "${g}" and not self.args.idp_h_grp:
|
if grp == "${g}":
|
||||||
|
unames.append(un)
|
||||||
|
elif not uns and not self.args.idp_h_grp:
|
||||||
t = "group [%s] must be defined with --grp argument (or in a [groups] config section)"
|
t = "group [%s] must be defined with --grp argument (or in a [groups] config section)"
|
||||||
raise CfgEx(t % (grp,))
|
raise CfgEx(t % (grp,))
|
||||||
|
|
||||||
|
@ -1234,31 +1236,28 @@ class AuthSrv(object):
|
||||||
|
|
||||||
# unames may still contain ${u} and ${g} so now expand those;
|
# unames may still contain ${u} and ${g} so now expand those;
|
||||||
un_gn = [(un, gn) for un, gns in un_gns.items() for gn in gns]
|
un_gn = [(un, gn) for un, gns in un_gns.items() for gn in gns]
|
||||||
if "*" not in un_gns:
|
|
||||||
# need ("*","") to match "*" in unames
|
|
||||||
un_gn.append(("*", ""))
|
|
||||||
|
|
||||||
for _, dst, vu, vg in vols:
|
for src, dst, vu, vg in vols:
|
||||||
unames2 = set()
|
unames2 = set(unames)
|
||||||
for un, gn in un_gn:
|
|
||||||
# if vu/vg (volume user/group) is non-null,
|
|
||||||
# then each non-null value corresponds to
|
|
||||||
# ${u}/${g}; consider this a filter to
|
|
||||||
# apply to unames, as well as un_gn
|
|
||||||
if (vu and vu != un) or (vg and vg != gn):
|
|
||||||
continue
|
|
||||||
|
|
||||||
for uname in unames + ([un] if vu or vg else []):
|
if "${u}" in unames:
|
||||||
if uname == "${u}":
|
if not vu:
|
||||||
uname = vu or un
|
t = "cannot use ${u} in accs of volume [%s] because the volume url does not contain ${u}"
|
||||||
elif uname in ("${g}", "@${g}"):
|
raise CfgEx(t % (src,))
|
||||||
uname = vg or gn
|
unames2.add(vu)
|
||||||
|
|
||||||
if vu and vu != uname:
|
if "@${g}" in unames:
|
||||||
continue
|
if not vg:
|
||||||
|
t = "cannot use @${g} in accs of volume [%s] because the volume url does not contain @${g}"
|
||||||
|
raise CfgEx(t % (src,))
|
||||||
|
unames2.update([un for un, gn in un_gn if gn == vg])
|
||||||
|
|
||||||
if uname:
|
if "${g}" in unames:
|
||||||
unames2.add(uname)
|
t = 'the accs of volume [%s] contains "${g}" but the only supported way of specifying that is "@${g}"'
|
||||||
|
raise CfgEx(t % (src,))
|
||||||
|
|
||||||
|
unames2.discard("${u}")
|
||||||
|
unames2.discard("@${g}")
|
||||||
|
|
||||||
self._read_vol_str(lvl, list(unames2), axs[dst])
|
self._read_vol_str(lvl, list(unames2), axs[dst])
|
||||||
|
|
||||||
|
|
24
tests/res/idp/6.conf
Normal file
24
tests/res/idp/6.conf
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
# -*- mode: yaml -*-
|
||||||
|
# vim: ft=yaml:
|
||||||
|
|
||||||
|
[global]
|
||||||
|
idp-h-usr: x-idp-user
|
||||||
|
idp-h-grp: x-idp-group
|
||||||
|
|
||||||
|
[/get/${u}]
|
||||||
|
/get/${u}
|
||||||
|
accs:
|
||||||
|
g: *
|
||||||
|
r: ${u}, @su
|
||||||
|
m: @su
|
||||||
|
|
||||||
|
[/priv/${u}]
|
||||||
|
/priv/${u}
|
||||||
|
accs:
|
||||||
|
r: ${u}, @su
|
||||||
|
m: @su
|
||||||
|
|
||||||
|
[/team/${g}/${u}]
|
||||||
|
/team/${g}/${u}
|
||||||
|
accs:
|
||||||
|
r: @${g}
|
|
@ -15,6 +15,16 @@ class TestVFS(unittest.TestCase):
|
||||||
print(json.dumps(vfs, indent=4, sort_keys=True, default=lambda o: o.__dict__))
|
print(json.dumps(vfs, indent=4, sort_keys=True, default=lambda o: o.__dict__))
|
||||||
|
|
||||||
def log(self, src, msg, c=0):
|
def log(self, src, msg, c=0):
|
||||||
|
m = "%s" % (msg,)
|
||||||
|
if (
|
||||||
|
"warning: filesystem-path does not exist:" in m
|
||||||
|
or "you are sharing a system directory:" in m
|
||||||
|
or "reinitializing due to new user from IdP:" in m
|
||||||
|
or m.startswith("hint: argument")
|
||||||
|
or (m.startswith("loaded ") and " config files:" in m)
|
||||||
|
):
|
||||||
|
return
|
||||||
|
|
||||||
print(("[%s] %s" % (src, msg)).encode("ascii", "replace").decode("ascii"))
|
print(("[%s] %s" % (src, msg)).encode("ascii", "replace").decode("ascii"))
|
||||||
|
|
||||||
def nav(self, au, vp):
|
def nav(self, au, vp):
|
||||||
|
@ -30,14 +40,16 @@ class TestVFS(unittest.TestCase):
|
||||||
self.assertEqual(unpacked, expected + [[]] * pad)
|
self.assertEqual(unpacked, expected + [[]] * pad)
|
||||||
|
|
||||||
def assertAxsAt(self, au, vp, expected):
|
def assertAxsAt(self, au, vp, expected):
|
||||||
self.assertAxs(self.nav(au, vp).axs, expected)
|
vn = self.nav(au, vp)
|
||||||
|
self.assertAxs(vn.axs, expected)
|
||||||
|
|
||||||
def assertNodes(self, vfs, expected):
|
def assertNodes(self, vfs, expected):
|
||||||
got = list(sorted(vfs.nodes.keys()))
|
got = list(sorted(vfs.nodes.keys()))
|
||||||
self.assertEqual(got, expected)
|
self.assertEqual(got, expected)
|
||||||
|
|
||||||
def assertNodesAt(self, au, vp, expected):
|
def assertNodesAt(self, au, vp, expected):
|
||||||
self.assertNodes(self.nav(au, vp), expected)
|
vn = self.nav(au, vp)
|
||||||
|
self.assertNodes(vn, expected)
|
||||||
|
|
||||||
def prep(self):
|
def prep(self):
|
||||||
here = os.path.abspath(os.path.dirname(__file__))
|
here = os.path.abspath(os.path.dirname(__file__))
|
||||||
|
@ -140,6 +152,11 @@ class TestVFS(unittest.TestCase):
|
||||||
self.assertEqual(self.nav(au, "vg/iga1").realpath, "/g1-iga")
|
self.assertEqual(self.nav(au, "vg/iga1").realpath, "/g1-iga")
|
||||||
self.assertEqual(self.nav(au, "vg/iga2").realpath, "/g2-iga")
|
self.assertEqual(self.nav(au, "vg/iga2").realpath, "/g2-iga")
|
||||||
|
|
||||||
|
au.idp_checkin(None, "iub", "iga")
|
||||||
|
self.assertAxsAt(au, "vu/iua", [["iua"]])
|
||||||
|
self.assertAxsAt(au, "vg/iga1", [["iua", "iub"]])
|
||||||
|
self.assertAxsAt(au, "vg/iga2", [["iua", "iub", "ua"]])
|
||||||
|
|
||||||
def test_5(self):
|
def test_5(self):
|
||||||
"""
|
"""
|
||||||
one IdP user in multiple groups
|
one IdP user in multiple groups
|
||||||
|
@ -169,3 +186,44 @@ class TestVFS(unittest.TestCase):
|
||||||
self.assertAxsAt(au, "g", [["iua"]])
|
self.assertAxsAt(au, "g", [["iua"]])
|
||||||
self.assertAxsAt(au, "ga", [["iua"]])
|
self.assertAxsAt(au, "ga", [["iua"]])
|
||||||
self.assertAxsAt(au, "gb", [["iua"]])
|
self.assertAxsAt(au, "gb", [["iua"]])
|
||||||
|
|
||||||
|
def test_6(self):
|
||||||
|
"""
|
||||||
|
IdP volumes with anon-get and other users/groups (github#79)
|
||||||
|
"""
|
||||||
|
_, cfgdir, xcfg = self.prep()
|
||||||
|
au = AuthSrv(Cfg(c=[cfgdir + "/6.conf"], **xcfg), self.log)
|
||||||
|
|
||||||
|
self.assertAxs(au.vfs.axs, [])
|
||||||
|
self.assertEqual(au.vfs.vpath, "")
|
||||||
|
self.assertEqual(au.vfs.realpath, "")
|
||||||
|
self.assertNodes(au.vfs, [])
|
||||||
|
|
||||||
|
au.idp_checkin(None, "iua", "")
|
||||||
|
star = ["*", "iua"]
|
||||||
|
self.assertNodes(au.vfs, ["get", "priv"])
|
||||||
|
self.assertAxsAt(au, "get/iua", [["iua"], [], [], [], star])
|
||||||
|
self.assertAxsAt(au, "priv/iua", [["iua"], [], []])
|
||||||
|
|
||||||
|
au.idp_checkin(None, "iub", "")
|
||||||
|
star = ["*", "iua", "iub"]
|
||||||
|
self.assertNodes(au.vfs, ["get", "priv"])
|
||||||
|
self.assertAxsAt(au, "get/iua", [["iua"], [], [], [], star])
|
||||||
|
self.assertAxsAt(au, "get/iub", [["iub"], [], [], [], star])
|
||||||
|
self.assertAxsAt(au, "priv/iua", [["iua"], [], []])
|
||||||
|
self.assertAxsAt(au, "priv/iub", [["iub"], [], []])
|
||||||
|
|
||||||
|
au.idp_checkin(None, "iuc", "su")
|
||||||
|
star = ["*", "iua", "iub", "iuc"]
|
||||||
|
self.assertNodes(au.vfs, ["get", "priv", "team"])
|
||||||
|
self.assertAxsAt(au, "get/iua", [["iua", "iuc"], [], ["iuc"], [], star])
|
||||||
|
self.assertAxsAt(au, "get/iub", [["iub", "iuc"], [], ["iuc"], [], star])
|
||||||
|
self.assertAxsAt(au, "get/iuc", [["iuc"], [], ["iuc"], [], star])
|
||||||
|
self.assertAxsAt(au, "priv/iua", [["iua", "iuc"], [], ["iuc"]])
|
||||||
|
self.assertAxsAt(au, "priv/iub", [["iub", "iuc"], [], ["iuc"]])
|
||||||
|
self.assertAxsAt(au, "priv/iuc", [["iuc"], [], ["iuc"]])
|
||||||
|
self.assertAxsAt(au, "team/su/iuc", [["iuc"]])
|
||||||
|
|
||||||
|
au.idp_checkin(None, "iud", "su")
|
||||||
|
self.assertAxsAt(au, "team/su/iuc", [["iuc", "iud"]])
|
||||||
|
self.assertAxsAt(au, "team/su/iud", [["iuc", "iud"]])
|
||||||
|
|
Loading…
Reference in a new issue