diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 0b383cb3..22b45356 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -200,23 +200,23 @@ def run_argparse(argv, formatter): """ -a takes username:password, -v takes src:dst:permset:permset:cflag:cflag:... - where "permset" is accesslevel followed by username (no separator) + where "permset" is "accesslevel,username" and "cflag" is config flags to set on this volume list of cflags: - "cnodupe" rejects existing files (instead of symlinking them) - "ce2d" sets -e2d (all -e2* args can be set using ce2* cflags) - "cd2t" disables metadata collection, overrides -e2t* - "cd2d" disables all database stuff, overrides -e2* + "c,nodupe" rejects existing files (instead of symlinking them) + "c,e2d" sets -e2d (all -e2* args can be set using ce2* cflags) + "c,d2t" disables metadata collection, overrides -e2t* + "c,d2d" disables all database stuff, overrides -e2* example:\033[35m - -a ed:hunter2 -v .::r:aed -v ../inc:dump:w:aed:cnodupe \033[36m + -a ed:hunter2 -v .::r:rw,ed -v ../inc:dump:w:rw,ed:c,nodupe \033[36m mount current directory at "/" with * r (read-only) for everyone - * a (read+write) for ed + * rw (read+write) for ed mount ../inc at "/dump" with * w (write-only) for everyone - * a (read+write) for ed + * rw (read+write) for ed * reject duplicate files \033[0m if no accounts or volumes are configured, @@ -377,6 +377,36 @@ def main(argv=None): except AssertionError: al = run_argparse(argv, Dodge11874) + nstrs = [] + anymod = False + for ostr in al.v: + mod = False + oa = ostr.split(":") + na = oa[:2] + for opt in oa[2:]: + if opt and (opt[0] == "a" or (len(opt) > 1 and "," not in opt)): + mod = True + perm = opt[0] + if perm == "a": + perm = "rw" + na.append(perm + "," + opt[1:]) + elif opt and opt.startswith("c") and not opt.startswith("c,"): + mod = True + na.append("c," + opt[2:]) + else: + na.append(opt) + + nstr = ":".join(na) + nstrs.append(nstr if mod else ostr) + if mod: + msg = "\033[1;31mWARNING:\033[0;1m\n -v {} \033[0;33mwas replaced with\033[0;1m\n -v {} \n\033[0m" + lprint(msg.format(ostr, nstr)) + anymod = True + + if anymod: + al.v = nstrs + time.sleep(2) + # propagate implications for k1, k2 in IMPLICATIONS: if getattr(al, k1): diff --git a/copyparty/authsrv.py b/copyparty/authsrv.py index 9021e57a..742e73a0 100644 --- a/copyparty/authsrv.py +++ b/copyparty/authsrv.py @@ -13,17 +13,31 @@ from .__init__ import WINDOWS from .util import IMPLICATIONS, uncyg, undot, Pebkac, fsdec, fsenc, statdir +class AXS(object): + def __init__(self, uread=None, uwrite=None, umove=None, udel=None): + self.uread = {} if uread is None else {k: 1 for k in uread} + self.uwrite = {} if uwrite is None else {k: 1 for k in uwrite} + self.umove = {} if umove is None else {k: 1 for k in umove} + self.udel = {} if udel is None else {k: 1 for k in udel} + + def __repr__(self): + return "AXS({})".format( + ", ".join( + "{}={!r}".format(k, self.__dict__[k]) + for k in "uread uwrite umove udel".split() + ) + ) + + class VFS(object): """single level in the virtual fs""" - def __init__(self, log, realpath, vpath, uread, uwrite, uadm, flags): + def __init__(self, log, realpath, vpath, axs, flags): self.log = log self.realpath = realpath # absolute path on host filesystem self.vpath = vpath # absolute path in the virtual filesystem - self.uread = uread # users who can read this - self.uwrite = uwrite # users who can write this - self.uadm = uadm # users who are regular admins - self.flags = flags # config switches + self.axs = axs # type: AXS + self.flags = flags # config options self.nodes = {} # child nodes self.histtab = None # all realpath->histpath self.dbv = None # closest full/non-jump parent @@ -31,15 +45,23 @@ class VFS(object): if realpath: self.histpath = os.path.join(realpath, ".hist") # db / thumbcache self.all_vols = {vpath: self} # flattened recursive + self.aread = {} + self.awrite = {} + self.amove = {} + self.adel = {} else: self.histpath = None self.all_vols = None + self.aread = None + self.awrite = None + self.amove = None + self.adel = None def __repr__(self): return "VFS({})".format( ", ".join( "{}={!r}".format(k, self.__dict__[k]) - for k in "realpath vpath uread uwrite uadm flags".split() + for k in "realpath vpath axs flags".split() ) ) @@ -66,9 +88,7 @@ class VFS(object): self.log, os.path.join(self.realpath, name) if self.realpath else None, "{}/{}".format(self.vpath, name).lstrip("/"), - self.uread, - self.uwrite, - self.uadm, + self.axs, self._copy_flags(name), ) vn.dbv = self.dbv or self @@ -81,7 +101,7 @@ class VFS(object): # leaf does not exist; create and keep permissions blank vp = "{}/{}".format(self.vpath, dst).lstrip("/") - vn = VFS(self.log, src, vp, [], [], [], {}) + vn = VFS(self.log, src, vp, AXS(), {}) vn.dbv = self.dbv or self self.nodes[dst] = vn return vn @@ -121,23 +141,32 @@ class VFS(object): return [self, vpath] def can_access(self, vpath, uname): - """return [readable,writable]""" + # type: (str, str) -> tuple[bool, bool, bool, bool] + """can Read,Write,Move,Delete""" vn, _ = self._find(vpath) + c = vn.axs return [ - uname in vn.uread or "*" in vn.uread, - uname in vn.uwrite or "*" in vn.uwrite, + uname in c.uread or "*" in c.uread, + uname in c.uwrite or "*" in c.uwrite, + uname in c.umove or "*" in c.umove, + uname in c.udel or "*" in c.udel, ] - def get(self, vpath, uname, will_read, will_write): - # type: (str, str, bool, bool) -> tuple[VFS, str] + def get(self, vpath, uname, will_read, will_write, will_move=False, will_del=False): + # type: (str, str, bool, bool, bool, bool) -> tuple[VFS, str] """returns [vfsnode,fs_remainder] if user has the requested permissions""" vn, rem = self._find(vpath) + c = vn.axs - if will_read and (uname not in vn.uread and "*" not in vn.uread): - raise Pebkac(403, "you don't have read-access for this location") - - if will_write and (uname not in vn.uwrite and "*" not in vn.uwrite): - raise Pebkac(403, "you don't have write-access for this location") + for req, d, msg in [ + [will_read, c.uread, "read"], + [will_write, c.uwrite, "write"], + [will_move, c.umove, "move"], + [will_del, c.udel, "delete"], + ]: + if req and (uname not in d and "*" not in d): + m = "you don't have {}-access for this location" + raise Pebkac(403, m.format(msg)) return vn, rem @@ -187,10 +216,10 @@ class VFS(object): real.sort() if not rem: for name, vn2 in sorted(self.nodes.items()): - ok = uname in vn2.uread or "*" in vn2.uread + ok = uname in vn2.axs.uread or "*" in vn2.axs.uread if not ok and incl_wo: - ok = uname in vn2.uwrite or "*" in vn2.uwrite + ok = uname in vn2.axs.uwrite or "*" in vn2.axs.uwrite if ok: virt_vis[name] = vn2 @@ -295,20 +324,6 @@ class VFS(object): for f in [{"vp": v, "ap": a, "st": n[1]} for v, a, n in files]: yield f - def user_tree(self, uname, readable, writable, admin): - is_readable = False - if uname in self.uread or "*" in self.uread: - readable.append(self.vpath) - is_readable = True - - if uname in self.uwrite or "*" in self.uwrite: - writable.append(self.vpath) - if is_readable: - admin.append(self.vpath) - - for _, vn in sorted(self.nodes.items()): - vn.user_tree(uname, readable, writable, admin) - class AuthSrv(object): """verifies users against given paths""" @@ -341,7 +356,8 @@ class AuthSrv(object): yield prev, True - def _parse_config_file(self, fd, user, mread, mwrite, madm, mflags, mount): + def _parse_config_file(self, fd, acct, daxs, mflags, mount): + # type: (any, str, dict[str, AXS], any, str) -> None vol_src = None vol_dst = None self.line_ctr = 0 @@ -357,7 +373,7 @@ class AuthSrv(object): if vol_src is None: if ln.startswith("u "): u, p = ln[2:].split(":", 1) - user[u] = p + acct[u] = p else: vol_src = ln continue @@ -371,47 +387,46 @@ class AuthSrv(object): vol_src = fsdec(os.path.abspath(fsenc(vol_src))) vol_dst = vol_dst.strip("/") mount[vol_dst] = vol_src - mread[vol_dst] = [] - mwrite[vol_dst] = [] - madm[vol_dst] = [] + daxs[vol_dst] = AXS() mflags[vol_dst] = {} continue - if len(ln) > 1: - lvl, uname = ln.split(" ") - else: + try: + lvl, uname = ln.split(" ", 1) + except: lvl = ln uname = "*" - self._read_vol_str( - lvl, - uname, - mread[vol_dst], - mwrite[vol_dst], - madm[vol_dst], - mflags[vol_dst], - ) + if lvl == "a": + m = "WARNING (config-file): permission flag 'a' is deprecated; please use 'rw' instead" + self.log(m, 1) - def _read_vol_str(self, lvl, uname, mr, mw, ma, mf): + self._read_vol_str(lvl, uname, daxs[vol_dst], mflags[vol_dst]) + + def _read_vol_str(self, lvl, uname, axs, flags): + # type: (str, str, AXS, any) -> None if lvl == "c": cval = True if "=" in uname: uname, cval = uname.split("=", 1) - self._read_volflag(mf, uname, cval, False) + self._read_volflag(flags, uname, cval, False) return if uname == "": uname = "*" - if lvl in "ra": - mr.append(uname) + if "r" in lvl: + axs.uread[uname] = 1 - if lvl in "wa": - mw.append(uname) + if "w" in lvl: + axs.uwrite[uname] = 1 - if lvl == "a": - ma.append(uname) + if "m" in lvl: + axs.umove[uname] = 1 + + if "d" in lvl: + axs.udel[uname] = 1 def _read_volflag(self, flags, name, value, is_list): if name not in ["mtp"]: @@ -433,21 +448,19 @@ class AuthSrv(object): before finally building the VFS """ - user = {} # username:password - mread = {} # mountpoint:[username] - mwrite = {} # mountpoint:[username] - madm = {} # mountpoint:[username] + acct = {} # username:password + daxs = {} # type: dict[str, AXS] mflags = {} # mountpoint:[flag] mount = {} # dst:src (mountpoint:realpath) if self.args.a: # list of username:password for u, p in [x.split(":", 1) for x in self.args.a]: - user[u] = p + acct[u] = p if self.args.v: # list of src:dst:permset:permset:... - # permset is [rwa]username or [c]flag + # permset is [,username][,username] or ,[=args] for v_str in self.args.v: m = self.re_vol.match(v_str) if not m: @@ -461,24 +474,18 @@ class AuthSrv(object): src = fsdec(os.path.abspath(fsenc(src))) dst = dst.strip("/") mount[dst] = src - mread[dst] = [] - mwrite[dst] = [] - madm[dst] = [] + daxs[dst] = AXS() mflags[dst] = {} - perms = perms.split(":") - for (lvl, uname) in [[x[0], x[1:]] for x in perms]: - self._read_vol_str( - lvl, uname, mread[dst], mwrite[dst], madm[dst], mflags[dst] - ) + for x in perms.split(":"): + lvl, uname = x.split(",", 1) if "," in x else [x, ""] + self._read_vol_str(lvl, uname, daxs[dst], mflags[dst]) if self.args.c: for cfg_fn in self.args.c: with open(cfg_fn, "rb") as f: try: - self._parse_config_file( - f, user, mread, mwrite, madm, mflags, mount - ) + self._parse_config_file(f, acct, daxs, mflags, mount) except: m = "\n\033[1;31m\nerror in config file {} on line {}:\n\033[0m" self.log(m.format(cfg_fn, self.line_ctr), 1) @@ -497,10 +504,11 @@ class AuthSrv(object): if not mount: # -h says our defaults are CWD at root and read/write for everyone - vfs = VFS(self.log_func, os.path.abspath("."), "", ["*"], ["*"], ["*"], {}) + axs = AXS(["*"], ["*"], None, None) + vfs = VFS(self.log_func, os.path.abspath("."), "", axs, {}) elif "" not in mount: # there's volumes but no root; make root inaccessible - vfs = VFS(self.log_func, None, "", [], [], [], {}) + vfs = VFS(self.log_func, None, "", AXS(), {}) vfs.flags["d2d"] = True maxdepth = 0 @@ -511,32 +519,34 @@ class AuthSrv(object): if dst == "": # rootfs was mapped; fully replaces the default CWD vfs - vfs = VFS( - self.log_func, - mount[dst], - dst, - mread[dst], - mwrite[dst], - madm[dst], - mflags[dst], - ) + vfs = VFS(self.log_func, mount[dst], dst, daxs[dst], mflags[dst]) continue v = vfs.add(mount[dst], dst) - v.uread = mread[dst] - v.uwrite = mwrite[dst] - v.uadm = madm[dst] + v.axs = daxs[dst] v.flags = mflags[dst] v.dbv = None vfs.all_vols = {} vfs.get_all_vols(vfs.all_vols) + for perm in "read write move del".split(): + axs_key = "u" + perm + unames = ["*"] + list(acct.keys()) + umap = {x: [] for x in unames} + for usr in unames: + for mp, vol in vfs.all_vols.items(): + if usr in getattr(vol.axs, axs_key): + umap[usr].append(mp) + setattr(vfs, "a" + perm, umap) + + all_users = {} missing_users = {} - for d in [mread, mwrite]: - for _, ul in d.items(): - for usr in ul: - if usr != "*" and usr not in user: + for axs in daxs.values(): + for d in [axs.uread, axs.uwrite, axs.umove, axs.udel]: + for usr in d.keys(): + all_users[usr] = 1 + if usr != "*" and usr not in acct: missing_users[usr] = 1 if missing_users: @@ -611,7 +621,7 @@ class AuthSrv(object): all_mte = {} errors = False for vol in vfs.all_vols.values(): - if (self.args.e2ds and vol.uwrite) or self.args.e2dsa: + if (self.args.e2ds and vol.axs.uwrite) or self.args.e2dsa: vol.flags["e2ds"] = True if self.args.e2d or "e2ds" in vol.flags: @@ -711,17 +721,14 @@ class AuthSrv(object): with self.mutex: self.vfs = vfs - self.user = user - self.iuser = {v: k for k, v in user.items()} + self.acct = acct + self.iacct = {v: k for k, v in acct.items()} self.re_pwd = None - pwds = [re.escape(x) for x in self.iuser.keys()] + pwds = [re.escape(x) for x in self.iacct.keys()] if pwds: self.re_pwd = re.compile("=(" + "|".join(pwds) + ")([]&; ]|$)") - # import pprint - # pprint.pprint({"usr": user, "rd": mread, "wr": mwrite, "mnt": mount}) - def dbg_ls(self): users = self.args.ls vols = "*" @@ -739,12 +746,12 @@ class AuthSrv(object): pass if users == "**": - users = list(self.user.keys()) + ["*"] + users = list(self.acct.keys()) + ["*"] else: users = [users] for u in users: - if u not in self.user and u != "*": + if u not in self.acct and u != "*": raise Exception("user not found: " + u) if vols == "*": @@ -760,8 +767,10 @@ class AuthSrv(object): raise Exception("volume not found: " + v) self.log({"users": users, "vols": vols, "flags": flags}) + m = "/{}: read({}) write({}) move({}) del({})" for k, v in self.vfs.all_vols.items(): - self.log("/{}: read({}) write({})".format(k, v.uread, v.uwrite)) + vc = v.axs + self.log(m.format(k, vc.uread, vc.uwrite, vc.umove, vc.udel)) flag_v = "v" in flags flag_ln = "ln" in flags @@ -775,7 +784,7 @@ class AuthSrv(object): for u in users: self.log("checking /{} as {}".format(v, u)) try: - vn, _ = self.vfs.get(v, u, True, False) + vn, _ = self.vfs.get(v, u, True, False, False, False) except: continue diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index fcf901b4..261d0eff 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -58,7 +58,7 @@ class HttpCli(object): def unpwd(self, m): a, b = m.groups() - return "=\033[7m {} \033[27m{}".format(self.asrv.iuser[a], b) + return "=\033[7m {} \033[27m{}".format(self.asrv.iacct[a], b) def _check_nonfatal(self, ex): return ex.code < 400 or ex.code in [404, 429] @@ -181,9 +181,11 @@ class HttpCli(object): self.vpath = unquotep(vpath) pwd = uparam.get("pw") - self.uname = self.asrv.iuser.get(pwd, "*") - self.rvol, self.wvol, self.avol = [[], [], []] - self.asrv.vfs.user_tree(self.uname, self.rvol, self.wvol, self.avol) + self.uname = self.asrv.iacct.get(pwd, "*") + self.rvol = self.asrv.vfs.aread[self.uname] + self.wvol = self.asrv.vfs.awrite[self.uname] + self.mvol = self.asrv.vfs.amove[self.uname] + self.dvol = self.asrv.vfs.adel[self.uname] if pwd and "pw" in self.ouparam and pwd != cookies.get("cppwd"): self.out_headers["Set-Cookie"] = self.get_pwd_cookie(pwd)[0] @@ -359,8 +361,9 @@ class HttpCli(object): self.redirect(vpath, flavor="redirecting to", use302=True) return True - self.readable, self.writable = self.asrv.vfs.can_access(self.vpath, self.uname) - if not self.readable and not self.writable: + x = self.asrv.vfs.can_access(self.vpath, self.uname) + self.can_read, self.can_write, self.can_move, self.can_delete = x + if not self.can_read and not self.can_write: if self.vpath: self.log("inaccessible: [{}]".format(self.vpath)) raise Pebkac(404) @@ -775,7 +778,7 @@ class HttpCli(object): return True def get_pwd_cookie(self, pwd): - if pwd in self.asrv.iuser: + if pwd in self.asrv.iacct: msg = "login ok" dt = datetime.utcfromtimestamp(time.time() + 60 * 60 * 24 * 365) exp = dt.strftime("%a, %d %b %Y %H:%M:%S GMT") @@ -994,13 +997,6 @@ class HttpCli(object): vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True) self._assert_safe_rem(rem) - # TODO: - # the per-volume read/write permissions must be replaced with permission flags - # which would decide how to handle uploads to filenames which are taken, - # current behavior of creating a new name is a good default for binary files - # but should also offer a flag to takeover the filename and rename the old one - # - # stopgap: if not rem.endswith(".md"): raise Pebkac(400, "only markdown pls") @@ -1051,7 +1047,6 @@ class HttpCli(object): self.reply(response.encode("utf-8")) return True - # TODO another hack re: pending permissions rework mdir, mfile = os.path.split(fp) mfile2 = "{}.{:.3f}.md".format(mfile[:-3], srv_lastmod) try: @@ -1424,12 +1419,13 @@ class HttpCli(object): def tx_mounts(self): suf = self.urlq({}, ["h"]) + avol = [x for x in self.wvol if x in self.rvol] rvol, wvol, avol = [ [("/" + x).rstrip("/") + "/" for x in y] - for y in [self.rvol, self.wvol, self.avol] + for y in [self.rvol, self.wvol, avol] ] - if self.avol and not self.args.no_rescan: + if avol and not self.args.no_rescan: x = self.conn.hsrv.broker.put(True, "up2k.get_state") vs = json.loads(x.get()) vstate = {("/" + k).rstrip("/") + "/": v for k, v in vs["volstate"].items()} @@ -1454,7 +1450,7 @@ class HttpCli(object): return True def scanvol(self): - if not self.readable or not self.writable: + if not self.can_read or not self.can_write: raise Pebkac(403, "not admin") if self.args.no_rescan: @@ -1473,7 +1469,7 @@ class HttpCli(object): raise Pebkac(500, x) def tx_stack(self): - if not self.avol: + if not [x for x in self.wvol if x in self.rvol]: raise Pebkac(403, "not admin") if self.args.no_stack: @@ -1551,9 +1547,7 @@ class HttpCli(object): vpnodes.append([quotep(vpath) + "/", html_escape(node, crlf=True)]) - vn, rem = self.asrv.vfs.get( - self.vpath, self.uname, self.readable, self.writable - ) + vn, rem = self.asrv.vfs.get(self.vpath, self.uname, False, False) abspath = vn.canonical(rem) dbv, vrem = vn.get_dbv(rem) @@ -1562,7 +1556,7 @@ class HttpCli(object): except: raise Pebkac(404) - if self.readable: + if self.can_read: if rem.startswith(".hist/up2k.") or ( rem.endswith("/dir.txt") and rem.startswith(".hist/th/") ): @@ -1629,10 +1623,14 @@ class HttpCli(object): srv_info = " /// ".join(srv_info) perms = [] - if self.readable: + if self.can_read: perms.append("read") - if self.writable: + if self.can_write: perms.append("write") + if self.can_move: + perms.append("move") + if self.can_delete: + perms.append("delete") url_suf = self.urlq({}, []) is_ls = "ls" in self.uparam @@ -1668,13 +1666,13 @@ class HttpCli(object): "have_up2k_idx": ("e2d" in vn.flags), "have_tags_idx": ("e2t" in vn.flags), "have_zip": (not self.args.no_zip), - "have_b_u": (self.writable and self.uparam.get("b") == "u"), + "have_b_u": (self.can_write and self.uparam.get("b") == "u"), "url_suf": url_suf, "logues": logues, "title": html_escape(self.vpath, crlf=True), "srv_info": srv_info, } - if not self.readable: + if not self.can_read: if is_ls: ret = json.dumps(ls_ret) self.reply( diff --git a/copyparty/web/browser.css b/copyparty/web/browser.css index 80a926bb..812ae768 100644 --- a/copyparty/web/browser.css +++ b/copyparty/web/browser.css @@ -229,21 +229,13 @@ a, #files tbody div a:last-child { right: 2em; color: #999; } -#acc_info span:before { - color: #f4c; - border-bottom: 1px solid rgba(255,68,204,0.6); +#acc_info span { + color: #999; margin-right: .6em; } -html.read #acc_info span:before { - content: 'Read-Only access'; -} -html.write #acc_info span:before { - content: 'Write-Only access'; -} -html.read.write #acc_info span:before { - content: 'Read-Write access'; - color: #999; - border: none; +#acc_info span.warn { + color: #f4c; + border-bottom: 1px solid rgba(255,68,204,0.6); } #files tbody a.play { color: #e70; diff --git a/copyparty/web/browser.js b/copyparty/web/browser.js index 03e3ed23..d8682c3d 100644 --- a/copyparty/web/browser.js +++ b/copyparty/web/browser.js @@ -2558,9 +2558,22 @@ function despin(sel) { function apply_perms(newperms) { perms = newperms || []; - ebi('acc_info').innerHTML = '' + (acct != '*' ? - 'Logout ' + acct + '' : - 'Login') + ''; + var axs = [], + aclass = '>', + chk = ['read', 'write', 'rename', 'delete']; + + for (var a = 0; a < chk.length; a++) + if (has(perms, chk[a])) + axs.push(chk[a].slice(0, 1).toUpperCase() + chk[a].slice(1)); + + axs = axs.join('-'); + if (perms.length == 1) { + aclass = ' class="warn">'; + axs += '-Only'; + } + + ebi('acc_info').innerHTML = '' + (acct != '*' ? + 'Logout ' + acct + '' : 'Login'); var o = QSA('#ops>a[data-perm], #u2footfoot'); for (var a = 0; a < o.length; a++) { diff --git a/docs/example.conf b/docs/example.conf index 6b36d025..858770dd 100644 --- a/docs/example.conf +++ b/docs/example.conf @@ -10,19 +10,25 @@ u k:k # share "." (the current directory) # as "/" (the webroot) for the following users: # "r" grants read-access for anyone -# "a ed" grants read-write to ed +# "rw ed" grants read-write to ed . / r -a ed +rw ed # custom permissions for the "priv" folder: -# user "k" can see/read the contents -# and "ed" gets read-write access +# user "k" can only see/read the contents +# user "ed" gets read-write access ./priv /priv r k -a ed +rw ed + +# this does the same thing: +./priv +/priv +r ed k +w ed # share /home/ed/Music/ as /music and let anyone read it # (this will replace any folder called "music" in the webroot) @@ -41,5 +47,5 @@ c e2d c nodupe # this entire config file can be replaced with these arguments: -# -u ed:123 -u k:k -v .::r:aed -v priv:priv:rk:aed -v /home/ed/Music:music:r -v /home/ed/inc:dump:w +# -u ed:123 -u k:k -v .::r:a,ed -v priv:priv:r,k:rw,ed -v /home/ed/Music:music:r -v /home/ed/inc:dump:w:c,e2d:c,nodupe # but note that the config file always wins in case of conflicts diff --git a/tests/test_httpcli.py b/tests/test_httpcli.py index fcd726dc..d2eda8de 100644 --- a/tests/test_httpcli.py +++ b/tests/test_httpcli.py @@ -90,7 +90,7 @@ class TestHttpCli(unittest.TestCase): if not vol.startswith(top): continue - mode = vol[-2] + mode = vol[-2].replace("a", "rw") usr = vol[-1] if usr == "a": usr = "" @@ -99,7 +99,7 @@ class TestHttpCli(unittest.TestCase): vol += "/" top, sub = vol.split("/", 1) - vcfg.append("{0}/{1}:{1}:{2}{3}".format(top, sub, mode, usr)) + vcfg.append("{0}/{1}:{1}:{2},{3}".format(top, sub, mode, usr)) pprint.pprint(vcfg) diff --git a/tests/test_vfs.py b/tests/test_vfs.py index 631c8d22..c2ed83d5 100644 --- a/tests/test_vfs.py +++ b/tests/test_vfs.py @@ -68,6 +68,11 @@ class TestVFS(unittest.TestCase): def log(self, src, msg, c=0): pass + def assertAxs(self, dct, lst): + t1 = list(sorted(dct.keys())) + t2 = list(sorted(lst)) + self.assertEqual(t1, t2) + def test(self): td = os.path.join(self.td, "vfs") os.mkdir(td) @@ -88,53 +93,53 @@ class TestVFS(unittest.TestCase): self.assertEqual(vfs.nodes, {}) self.assertEqual(vfs.vpath, "") self.assertEqual(vfs.realpath, td) - self.assertEqual(vfs.uread, ["*"]) - self.assertEqual(vfs.uwrite, ["*"]) + self.assertAxs(vfs.axs.uread, ["*"]) + self.assertAxs(vfs.axs.uwrite, ["*"]) # single read-only rootfs (relative path) vfs = AuthSrv(Cfg(v=["a/ab/::r"]), self.log).vfs self.assertEqual(vfs.nodes, {}) self.assertEqual(vfs.vpath, "") self.assertEqual(vfs.realpath, os.path.join(td, "a", "ab")) - self.assertEqual(vfs.uread, ["*"]) - self.assertEqual(vfs.uwrite, []) + self.assertAxs(vfs.axs.uread, ["*"]) + self.assertAxs(vfs.axs.uwrite, []) # single read-only rootfs (absolute path) vfs = AuthSrv(Cfg(v=[td + "//a/ac/../aa//::r"]), self.log).vfs self.assertEqual(vfs.nodes, {}) self.assertEqual(vfs.vpath, "") self.assertEqual(vfs.realpath, os.path.join(td, "a", "aa")) - self.assertEqual(vfs.uread, ["*"]) - self.assertEqual(vfs.uwrite, []) + self.assertAxs(vfs.axs.uread, ["*"]) + self.assertAxs(vfs.axs.uwrite, []) # read-only rootfs with write-only subdirectory (read-write for k) vfs = AuthSrv( - Cfg(a=["k:k"], v=[".::r:ak", "a/ac/acb:a/ac/acb:w:ak"]), + Cfg(a=["k:k"], v=[".::r:rw,k", "a/ac/acb:a/ac/acb:w:rw,k"]), self.log, ).vfs self.assertEqual(len(vfs.nodes), 1) self.assertEqual(vfs.vpath, "") self.assertEqual(vfs.realpath, td) - self.assertEqual(vfs.uread, ["*", "k"]) - self.assertEqual(vfs.uwrite, ["k"]) + self.assertAxs(vfs.axs.uread, ["*", "k"]) + self.assertAxs(vfs.axs.uwrite, ["k"]) n = vfs.nodes["a"] self.assertEqual(len(vfs.nodes), 1) self.assertEqual(n.vpath, "a") self.assertEqual(n.realpath, os.path.join(td, "a")) - self.assertEqual(n.uread, ["*", "k"]) - self.assertEqual(n.uwrite, ["k"]) + self.assertAxs(n.axs.uread, ["*", "k"]) + self.assertAxs(n.axs.uwrite, ["k"]) n = n.nodes["ac"] self.assertEqual(len(vfs.nodes), 1) self.assertEqual(n.vpath, "a/ac") self.assertEqual(n.realpath, os.path.join(td, "a", "ac")) - self.assertEqual(n.uread, ["*", "k"]) - self.assertEqual(n.uwrite, ["k"]) + self.assertAxs(n.axs.uread, ["*", "k"]) + self.assertAxs(n.axs.uwrite, ["k"]) n = n.nodes["acb"] self.assertEqual(n.nodes, {}) self.assertEqual(n.vpath, "a/ac/acb") self.assertEqual(n.realpath, os.path.join(td, "a", "ac", "acb")) - self.assertEqual(n.uread, ["k"]) - self.assertEqual(n.uwrite, ["*", "k"]) + self.assertAxs(n.axs.uread, ["k"]) + self.assertAxs(n.axs.uwrite, ["*", "k"]) # something funky about the windows path normalization, # doesn't really matter but makes the test messy, TODO? @@ -173,24 +178,24 @@ class TestVFS(unittest.TestCase): # admin-only rootfs with all-read-only subfolder vfs = AuthSrv( - Cfg(a=["k:k"], v=[".::ak", "a:a:r"]), + Cfg(a=["k:k"], v=[".::rw,k", "a:a:r"]), self.log, ).vfs self.assertEqual(len(vfs.nodes), 1) self.assertEqual(vfs.vpath, "") self.assertEqual(vfs.realpath, td) - self.assertEqual(vfs.uread, ["k"]) - self.assertEqual(vfs.uwrite, ["k"]) + self.assertAxs(vfs.axs.uread, ["k"]) + self.assertAxs(vfs.axs.uwrite, ["k"]) n = vfs.nodes["a"] self.assertEqual(len(vfs.nodes), 1) self.assertEqual(n.vpath, "a") self.assertEqual(n.realpath, os.path.join(td, "a")) - self.assertEqual(n.uread, ["*"]) - self.assertEqual(n.uwrite, []) - self.assertEqual(vfs.can_access("/", "*"), [False, False]) - self.assertEqual(vfs.can_access("/", "k"), [True, True]) - self.assertEqual(vfs.can_access("/a", "*"), [True, False]) - self.assertEqual(vfs.can_access("/a", "k"), [True, False]) + self.assertAxs(n.axs.uread, ["*"]) + self.assertAxs(n.axs.uwrite, []) + self.assertEqual(vfs.can_access("/", "*"), [False, False, False, False]) + self.assertEqual(vfs.can_access("/", "k"), [True, True, False, False]) + self.assertEqual(vfs.can_access("/a", "*"), [True, False, False, False]) + self.assertEqual(vfs.can_access("/a", "k"), [True, False, False, False]) # breadth-first construction vfs = AuthSrv( @@ -247,26 +252,26 @@ class TestVFS(unittest.TestCase): ./src /dst r a - a asd + rw asd """ ).encode("utf-8") ) au = AuthSrv(Cfg(c=[cfg_path]), self.log) - self.assertEqual(au.user["a"], "123") - self.assertEqual(au.user["asd"], "fgh:jkl") + self.assertEqual(au.acct["a"], "123") + self.assertEqual(au.acct["asd"], "fgh:jkl") n = au.vfs # root was not defined, so PWD with no access to anyone self.assertEqual(n.vpath, "") self.assertEqual(n.realpath, None) - self.assertEqual(n.uread, []) - self.assertEqual(n.uwrite, []) + self.assertAxs(n.axs.uread, []) + self.assertAxs(n.axs.uwrite, []) self.assertEqual(len(n.nodes), 1) n = n.nodes["dst"] self.assertEqual(n.vpath, "dst") self.assertEqual(n.realpath, os.path.join(td, "src")) - self.assertEqual(n.uread, ["a", "asd"]) - self.assertEqual(n.uwrite, ["asd"]) + self.assertAxs(n.axs.uread, ["a", "asd"]) + self.assertAxs(n.axs.uwrite, ["asd"]) self.assertEqual(len(n.nodes), 0) os.unlink(cfg_path)