mirror of
				https://github.com/9001/copyparty.git
				synced 2025-10-31 04:32:20 -06:00 
			
		
		
		
	
		
			
				
	
	
		
			310 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			310 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | |
| # coding: utf-8
 | |
| from __future__ import print_function, unicode_literals
 | |
| 
 | |
| import json
 | |
| import os
 | |
| import unittest
 | |
| 
 | |
| from copyparty.__init__ import ANYWIN
 | |
| from copyparty.authsrv import AuthSrv
 | |
| from tests.util import Cfg
 | |
| 
 | |
| 
 | |
| class TestVFS(unittest.TestCase):
 | |
|     def dump(self, vfs):
 | |
|         print(json.dumps(vfs, indent=4, sort_keys=True, default=lambda o: o.__dict__))
 | |
| 
 | |
|     def log(self, src, msg, c=0):
 | |
|         m = "%s" % (msg,)
 | |
|         if (
 | |
|             "warning: filesystem-path does not exist:" in m
 | |
|             or "you are sharing a system directory:" in m
 | |
|             or "reinitializing due to new user from IdP:" in m
 | |
|             or m.startswith("hint: argument")
 | |
|             or (m.startswith("loaded ") and " config files:" in m)
 | |
|         ):
 | |
|             return
 | |
| 
 | |
|         print(("[%s] %s" % (src, msg)).encode("ascii", "replace").decode("ascii"))
 | |
| 
 | |
|     def nav(self, au, vp):
 | |
|         return au.vfs.get(vp, "", False, False)[0]
 | |
| 
 | |
|     def assertAxs(self, axs, expected):
 | |
|         unpacked = []
 | |
|         zs = "uread uwrite umove udel uget upget uhtml uadmin udot"
 | |
|         for k in zs.split():
 | |
|             unpacked.append(list(sorted(getattr(axs, k))))
 | |
| 
 | |
|         pad = len(unpacked) - len(expected)
 | |
|         self.assertEqual(unpacked, expected + [[]] * pad)
 | |
| 
 | |
|     def assertAxsAt(self, au, vp, expected):
 | |
|         vn = self.nav(au, vp)
 | |
|         self.assertAxs(vn.axs, expected)
 | |
| 
 | |
|     def assertNodes(self, vfs, expected):
 | |
|         got = list(sorted(vfs.nodes.keys()))
 | |
|         self.assertEqual(got, expected)
 | |
| 
 | |
|     def assertNodesAt(self, au, vp, expected):
 | |
|         vn = self.nav(au, vp)
 | |
|         self.assertNodes(vn, expected)
 | |
| 
 | |
|     def assertApEq(self, ap, rhs):
 | |
|         if ANYWIN and len(ap) > 2 and ap[1] == ":":
 | |
|             ap = ap[2:].replace("\\", "/")
 | |
| 
 | |
|         return self.assertEqual(ap, rhs)
 | |
| 
 | |
|     def prep(self):
 | |
|         here = os.path.abspath(os.path.dirname(__file__))
 | |
|         cfgdir = os.path.join(here, "res", "idp")
 | |
| 
 | |
|         # globals are applied by main so need to cheat a little
 | |
|         xcfg = {"idp_h_usr": ["x-idp-user"], "idp_h_grp": "x-idp-group"}
 | |
| 
 | |
|         return here, cfgdir, xcfg
 | |
| 
 | |
|     # buckle up...
 | |
| 
 | |
|     def test_1(self):
 | |
|         """
 | |
|         trivial; volumes [/] and [/vb] with one user in [/] only
 | |
|         """
 | |
|         _, cfgdir, xcfg = self.prep()
 | |
|         au = AuthSrv(Cfg(c=[cfgdir + "/1.conf"], **xcfg), self.log)
 | |
| 
 | |
|         self.assertEqual(au.vfs.vpath, "")
 | |
|         self.assertApEq(au.vfs.realpath, "/")
 | |
|         self.assertNodes(au.vfs, ["vb"])
 | |
|         self.assertNodes(au.vfs.nodes["vb"], [])
 | |
| 
 | |
|         self.assertAxs(au.vfs.axs, [["ua"]])
 | |
|         self.assertAxs(au.vfs.nodes["vb"].axs, [])
 | |
| 
 | |
|     def test_2(self):
 | |
|         """
 | |
|         users ua/ub/uc, group ga (ua+ub) in basic combinations
 | |
|         """
 | |
|         _, cfgdir, xcfg = self.prep()
 | |
|         au = AuthSrv(Cfg(c=[cfgdir + "/2.conf"], **xcfg), self.log)
 | |
| 
 | |
|         self.assertEqual(au.vfs.vpath, "")
 | |
|         self.assertApEq(au.vfs.realpath, "/")
 | |
|         self.assertNodes(au.vfs, ["vb", "vc"])
 | |
|         self.assertNodes(au.vfs.nodes["vb"], [])
 | |
|         self.assertNodes(au.vfs.nodes["vc"], [])
 | |
| 
 | |
|         self.assertAxs(au.vfs.axs, [["ua", "ub"]])
 | |
|         self.assertAxsAt(au, "vb", [["ua", "ub"]])  # same as:
 | |
|         self.assertAxs(au.vfs.nodes["vb"].axs, [["ua", "ub"]])
 | |
|         self.assertAxs(au.vfs.nodes["vc"].axs, [["ua", "ub", "uc"]])
 | |
| 
 | |
|     def test_3(self):
 | |
|         """
 | |
|         IdP-only; dynamically created volumes for users/groups
 | |
|         """
 | |
|         _, cfgdir, xcfg = self.prep()
 | |
|         au = AuthSrv(Cfg(c=[cfgdir + "/3.conf"], **xcfg), self.log)
 | |
| 
 | |
|         self.assertEqual(au.vfs.vpath, "")
 | |
|         self.assertApEq(au.vfs.realpath, "")
 | |
|         self.assertNodes(au.vfs, [])
 | |
|         self.assertAxs(au.vfs.axs, [])
 | |
| 
 | |
|         au.idp_checkin(None, "iua", "iga")
 | |
|         self.assertNodes(au.vfs, ["vg", "vu"])
 | |
|         self.assertNodesAt(au, "vu", ["iua"])  # same as:
 | |
|         self.assertNodes(au.vfs.nodes["vu"], ["iua"])
 | |
|         self.assertNodes(au.vfs.nodes["vg"], ["iga"])
 | |
|         self.assertApEq(au.vfs.nodes["vu"].realpath, "")
 | |
|         self.assertApEq(au.vfs.nodes["vg"].realpath, "")
 | |
|         self.assertApEq(au.vfs.nodes["vu"].nodes["iua"].realpath, "/uiua")
 | |
|         self.assertApEq(au.vfs.nodes["vg"].nodes["iga"].realpath, "/giga")
 | |
|         self.assertAxs(au.vfs.axs, [])
 | |
|         self.assertAxsAt(au, "vu/iua", [["iua"]])  # same as:
 | |
|         self.assertAxs(self.nav(au, "vu/iua").axs, [["iua"]])
 | |
|         self.assertAxs(self.nav(au, "vg/iga").axs, [["iua"]])  # axs is unames
 | |
| 
 | |
|     def test_4(self):
 | |
|         """
 | |
|         IdP mixed with regular users
 | |
|         """
 | |
|         _, cfgdir, xcfg = self.prep()
 | |
|         au = AuthSrv(Cfg(c=[cfgdir + "/4.conf"], **xcfg), self.log)
 | |
| 
 | |
|         self.assertEqual(au.vfs.vpath, "")
 | |
|         self.assertApEq(au.vfs.realpath, "")
 | |
|         self.assertNodes(au.vfs, ["vu"])
 | |
|         self.assertNodesAt(au, "vu", ["ua", "ub"])
 | |
|         self.assertAxs(au.vfs.axs, [])
 | |
|         self.assertAxsAt(au, "vu", [])
 | |
|         self.assertAxsAt(au, "vu/ua", [["ua"]])
 | |
|         self.assertAxsAt(au, "vu/ub", [["ub"]])
 | |
| 
 | |
|         au.idp_checkin(None, "iua", "iga")
 | |
|         self.assertNodes(au.vfs, ["vg", "vu"])
 | |
|         self.assertNodesAt(au, "vu", ["iua", "ua", "ub"])
 | |
|         self.assertNodesAt(au, "vg", ["iga1", "iga2"])
 | |
|         self.assertAxs(au.vfs.axs, [])
 | |
|         self.assertAxsAt(au, "vu", [])
 | |
|         self.assertAxsAt(au, "vu/iua", [["iua"]])
 | |
|         self.assertAxsAt(au, "vu/ua", [["ua"]])
 | |
|         self.assertAxsAt(au, "vu/ub", [["ub"]])
 | |
|         self.assertAxsAt(au, "vg", [])
 | |
|         self.assertAxsAt(au, "vg/iga1", [["iua"]])
 | |
|         self.assertAxsAt(au, "vg/iga2", [["iua", "ua"]])
 | |
|         self.assertApEq(self.nav(au, "vu/ua").realpath, "/u-ua")
 | |
|         self.assertApEq(self.nav(au, "vu/iua").realpath, "/u-iua")
 | |
|         self.assertApEq(self.nav(au, "vg/iga1").realpath, "/g1-iga")
 | |
|         self.assertApEq(self.nav(au, "vg/iga2").realpath, "/g2-iga")
 | |
| 
 | |
|         au.idp_checkin(None, "iub", "iga")
 | |
|         self.assertAxsAt(au, "vu/iua", [["iua"]])
 | |
|         self.assertAxsAt(au, "vg/iga1", [["iua", "iub"]])
 | |
|         self.assertAxsAt(au, "vg/iga2", [["iua", "iub", "ua"]])
 | |
| 
 | |
|     def test_5(self):
 | |
|         """
 | |
|         one IdP user in multiple groups
 | |
|         """
 | |
|         _, cfgdir, xcfg = self.prep()
 | |
|         au = AuthSrv(Cfg(c=[cfgdir + "/5.conf"], **xcfg), self.log)
 | |
| 
 | |
|         self.assertEqual(au.vfs.vpath, "")
 | |
|         self.assertApEq(au.vfs.realpath, "")
 | |
|         self.assertNodes(au.vfs, ["g", "ga", "gb"])
 | |
|         self.assertAxs(au.vfs.axs, [])
 | |
| 
 | |
|         au.idp_checkin(None, "iua", "ga")
 | |
|         self.assertNodes(au.vfs, ["g", "ga", "gb"])
 | |
|         self.assertAxsAt(au, "g", [["iua"]])
 | |
|         self.assertAxsAt(au, "ga", [["iua"]])
 | |
|         self.assertAxsAt(au, "gb", [])
 | |
| 
 | |
|         au.idp_checkin(None, "iua", "gb")
 | |
|         self.assertNodes(au.vfs, ["g", "ga", "gb"])
 | |
|         self.assertAxsAt(au, "g", [["iua"]])
 | |
|         self.assertAxsAt(au, "ga", [])
 | |
|         self.assertAxsAt(au, "gb", [["iua"]])
 | |
| 
 | |
|         au.idp_checkin(None, "iua", "ga|gb")
 | |
|         self.assertNodes(au.vfs, ["g", "ga", "gb"])
 | |
|         self.assertAxsAt(au, "g", [["iua"]])
 | |
|         self.assertAxsAt(au, "ga", [["iua"]])
 | |
|         self.assertAxsAt(au, "gb", [["iua"]])
 | |
| 
 | |
|     def test_6(self):
 | |
|         """
 | |
|         IdP volumes with anon-get and other users/groups (github#79)
 | |
|         """
 | |
|         _, cfgdir, xcfg = self.prep()
 | |
|         au = AuthSrv(Cfg(c=[cfgdir + "/6.conf"], **xcfg), self.log)
 | |
| 
 | |
|         self.assertAxs(au.vfs.axs, [])
 | |
|         self.assertEqual(au.vfs.vpath, "")
 | |
|         self.assertApEq(au.vfs.realpath, "")
 | |
|         self.assertNodes(au.vfs, [])
 | |
| 
 | |
|         au.idp_checkin(None, "iua", "")
 | |
|         star = ["*", "iua"]
 | |
|         self.assertNodes(au.vfs, ["get", "priv"])
 | |
|         self.assertAxsAt(au, "get/iua", [["iua"], [], [], [], star])
 | |
|         self.assertAxsAt(au, "priv/iua", [["iua"], [], []])
 | |
| 
 | |
|         au.idp_checkin(None, "iub", "")
 | |
|         star = ["*", "iua", "iub"]
 | |
|         self.assertNodes(au.vfs, ["get", "priv"])
 | |
|         self.assertAxsAt(au, "get/iua", [["iua"], [], [], [], star])
 | |
|         self.assertAxsAt(au, "get/iub", [["iub"], [], [], [], star])
 | |
|         self.assertAxsAt(au, "priv/iua", [["iua"], [], []])
 | |
|         self.assertAxsAt(au, "priv/iub", [["iub"], [], []])
 | |
| 
 | |
|         au.idp_checkin(None, "iuc", "su")
 | |
|         star = ["*", "iua", "iub", "iuc"]
 | |
|         self.assertNodes(au.vfs, ["get", "priv", "team"])
 | |
|         self.assertAxsAt(au, "get/iua", [["iua", "iuc"], [], ["iuc"], [], star])
 | |
|         self.assertAxsAt(au, "get/iub", [["iub", "iuc"], [], ["iuc"], [], star])
 | |
|         self.assertAxsAt(au, "get/iuc", [["iuc"], [], ["iuc"], [], star])
 | |
|         self.assertAxsAt(au, "priv/iua", [["iua", "iuc"], [], ["iuc"]])
 | |
|         self.assertAxsAt(au, "priv/iub", [["iub", "iuc"], [], ["iuc"]])
 | |
|         self.assertAxsAt(au, "priv/iuc", [["iuc"], [], ["iuc"]])
 | |
|         self.assertAxsAt(au, "team/su/iuc", [["iuc"]])
 | |
| 
 | |
|         au.idp_checkin(None, "iud", "su")
 | |
|         self.assertAxsAt(au, "team/su/iuc", [["iuc", "iud"]])
 | |
|         self.assertAxsAt(au, "team/su/iud", [["iuc", "iud"]])
 | |
| 
 | |
|     def test_7(self):
 | |
|         """
 | |
|         conditional idp-vols
 | |
|         """
 | |
|         _, cfgdir, xcfg = self.prep()
 | |
|         au = AuthSrv(Cfg(c=[cfgdir + "/7.conf"], **xcfg), self.log)
 | |
|         au.idp_checkin(None, "iua", "ga")
 | |
|         au.idp_checkin(None, "iuab", "ga,gb")
 | |
|         au.idp_checkin(None, "iuabc", "ga,gb,gc")
 | |
|         au.idp_checkin(None, "iub", "gb")
 | |
|         au.idp_checkin(None, "iubc", "gb,gc")
 | |
|         au.idp_checkin(None, "iuc", "gc")
 | |
|         zs = """
 | |
| u/iua
 | |
| u/iuab
 | |
| u/iuabc
 | |
| u/iub
 | |
| u/iubc
 | |
| u/iuc
 | |
| uya/iua
 | |
| uya/iuab
 | |
| uya/iuabc
 | |
| uyab/iuab
 | |
| uyab/iuabc
 | |
| una/iub
 | |
| una/iubc
 | |
| una/iuc
 | |
| unab/iuc
 | |
| gya/ga
 | |
| gna/gb
 | |
| gna/gc
 | |
| gnab/gc
 | |
| """
 | |
|         zl1 = sorted(zs.strip().split("\n"))[:]
 | |
|         zl2 = sorted(list(au.vfs.all_vols))[:]
 | |
|         # print(" ".join(zl1))
 | |
|         # print(" ".join(zl2))
 | |
|         self.assertListEqual(zl1, zl2)
 | |
| 
 | |
|     def test_8(self):
 | |
|         """
 | |
|         conditional non-idp vols
 | |
|         """
 | |
|         _, cfgdir, xcfg = self.prep()
 | |
|         xcfg = {"vc": True}
 | |
|         au = AuthSrv(Cfg(c=[cfgdir + "/8.conf"], **xcfg), self.log)
 | |
|         zs = """
 | |
| u/iua
 | |
| u/iuab
 | |
| u/iuabc
 | |
| u/iub
 | |
| u/iubc
 | |
| u/iuc
 | |
| uya/iua
 | |
| uya/iuab
 | |
| uya/iuabc
 | |
| uyab/iuab
 | |
| uyab/iuabc
 | |
| una/iub
 | |
| una/iubc
 | |
| una/iuc
 | |
| unab/iuc
 | |
| gya/ga
 | |
| gna/gb
 | |
| gna/gc
 | |
| gnab/gc
 | |
| """
 | |
|         zl1 = sorted(zs.strip().split("\n"))[:]
 | |
|         zl2 = sorted(list(au.vfs.all_vols))[:]
 | |
|         self.assertListEqual(zl1, zl2)
 |