From 4339dbeb8d73738ab813ca2813336b023a805579 Mon Sep 17 00:00:00 2001 From: ed Date: Fri, 23 Jul 2021 01:14:49 +0200 Subject: [PATCH] mv/rm handlers --- copyparty/__main__.py | 2 ++ copyparty/authsrv.py | 43 ++++++++++++++++-------------- copyparty/httpcli.py | 61 ++++++++++++++++++++++++++++++++++++++++--- tests/test_vfs.py | 4 +-- 4 files changed, 85 insertions(+), 25 deletions(-) diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 22b45356..04f76219 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -277,6 +277,8 @@ def run_argparse(argv, formatter): ap2 = ap.add_argument_group('opt-outs') ap2.add_argument("-nw", action="store_true", help="disable writes (benchmark)") + ap2.add_argument("--no-del", action="store_true", help="disable delete operations") + ap2.add_argument("--no-mv", action="store_true", help="disable move/rename operations") ap2.add_argument("-nih", action="store_true", help="no info hostname") ap2.add_argument("-nid", action="store_true", help="no info disk-usage") ap2.add_argument("--no-zip", action="store_true", help="disable download as zip/tar") diff --git a/copyparty/authsrv.py b/copyparty/authsrv.py index 742e73a0..caf2ff9c 100644 --- a/copyparty/authsrv.py +++ b/copyparty/authsrv.py @@ -207,37 +207,41 @@ class VFS(object): # return os.path.realpath(rp) - def ls(self, rem, uname, scandir, incl_wo=False, lstat=False): - # type: (str, str, bool, bool, bool) -> tuple[str, str, dict[str, VFS]] + def ls(self, rem, uname, scandir, permsets, lstat=False): + # type: (str, str, bool, list[list[bool]], bool) -> tuple[str, str, dict[str, VFS]] """return user-readable [fsdir,real,virt] items at vpath""" virt_vis = {} # nodes readable by user abspath = self.canonical(rem) real = list(statdir(self.log, scandir, lstat, abspath)) real.sort() if not rem: - for name, vn2 in sorted(self.nodes.items()): - ok = uname in vn2.axs.uread or "*" in vn2.axs.uread + # no vfs nodes in the list of real inodes + real = [x for x in real if x[0] not in self.nodes] - if not ok and incl_wo: - ok = uname in vn2.axs.uwrite or "*" in vn2.axs.uwrite + for name, vn2 in sorted(self.nodes.items()): + ok = False + axs = vn2.axs + axs = [axs.uread, axs.uwrite, axs.umove, axs.udel] + for pset in permsets: + ok = True + for req, lst in zip(pset, axs): + if req and uname not in lst and "*" not in lst: + ok = False + if ok: + break if ok: virt_vis[name] = vn2 - # no vfs nodes in the list of real inodes - real = [x for x in real if x[0] not in self.nodes] - return [abspath, real, virt_vis] - def walk(self, rel, rem, seen, uname, dots, scandir, lstat): + def walk(self, rel, rem, seen, uname, permsets, dots, scandir, lstat): """ recursively yields from ./rem; rel is a unix-style user-defined vpath (not vfs-related) """ - fsroot, vfs_ls, vfs_virt = self.ls( - rem, uname, scandir, incl_wo=False, lstat=lstat - ) + fsroot, vfs_ls, vfs_virt = self.ls(rem, uname, scandir, permsets, lstat=lstat) if ( seen @@ -263,7 +267,7 @@ class VFS(object): wrel = (rel + "/" + rdir).lstrip("/") wrem = (rem + "/" + rdir).lstrip("/") - for x in self.walk(wrel, wrem, seen, uname, dots, scandir, lstat): + for x in self.walk(wrel, wrem, seen, uname, permsets, dots, scandir, lstat): yield x for n, vfs in sorted(vfs_virt.items()): @@ -271,7 +275,7 @@ class VFS(object): continue wrel = (rel + "/" + n).lstrip("/") - for x in vfs.walk(wrel, "", seen, uname, dots, scandir, lstat): + for x in vfs.walk(wrel, "", seen, uname, permsets, dots, scandir, lstat): yield x def zipgen(self, vrem, flt, uname, dots, scandir): @@ -282,9 +286,8 @@ class VFS(object): f2a = os.sep + "dir.txt" f2b = "{0}.hist{0}".format(os.sep) - for vpath, apath, files, rd, vd in self.walk( - "", vrem, [], uname, dots, scandir, False - ): + g = self.walk("", vrem, [], uname, [[True]], dots, scandir, False) + for vpath, apath, files, rd, vd in g: if flt: files = [x for x in files if x[0] in flt] @@ -789,7 +792,9 @@ class AuthSrv(object): continue atop = vn.realpath - g = vn.walk("", "", [], u, True, not self.args.no_scandir, False) + g = vn.walk( + "", "", [], u, True, [[True]], not self.args.no_scandir, False + ) for vpath, apath, files, _, _ in g: fnames = [n[0] for n in files] vpaths = [vpath + "/" + n for n in fnames] if vpath else fnames diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 261d0eff..16fac16b 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -370,6 +370,12 @@ class HttpCli(object): self.uparam = {"h": False} + if "delete" in self.uparam: + return self.handle_rm() + + if "move" in self.uparam: + return self.handle_mv() + if "h" in self.uparam: self.vpath = None return self.tx_mounts() @@ -1451,7 +1457,7 @@ class HttpCli(object): def scanvol(self): if not self.can_read or not self.can_write: - raise Pebkac(403, "not admin") + raise Pebkac(403, "not allowed for user " + self.uname) if self.args.no_rescan: raise Pebkac(403, "disabled by argv") @@ -1470,7 +1476,7 @@ class HttpCli(object): def tx_stack(self): if not [x for x in self.wvol if x in self.rvol]: - raise Pebkac(403, "not admin") + raise Pebkac(403, "not allowed for user " + self.uname) if self.args.no_stack: raise Pebkac(403, "disabled by argv") @@ -1508,7 +1514,7 @@ class HttpCli(object): try: vn, rem = self.asrv.vfs.get(top, self.uname, True, False) fsroot, vfs_ls, vfs_virt = vn.ls( - rem, self.uname, not self.args.no_scandir, incl_wo=True + rem, self.uname, not self.args.no_scandir, [[True], [False, True]] ) except: vfs_ls = [] @@ -1535,6 +1541,53 @@ class HttpCli(object): ret["a"] = dirs return ret + def handle_rm(self): + if not self.can_delete: + raise Pebkac(403, "not allowed for user " + self.uname) + + if self.args.no_del: + raise Pebkac(403, "disabled by argv") + + permsets = [[True, False, False, True]] + vn, rem = self.asrv.vfs.get(self.vpath, self.uname, *permsets[0]) + abspath = vn.canonical(rem) + + fun = os.lstat if os.supports_follow_symlinks else os.stat + st = fun(fsenc(abspath)) + if stat.S_ISLNK(st.st_mode) or stat.S_ISREG(st.st_mode): + self.log("rm file " + abspath) + return + + scandir = not self.args.no_scandir + g = vn.walk("", rem, [], self.uname, permsets, True, scandir, True) + for vpath, apath, files, rd, vd in g: + for fn in files: + m = "rm file {} / {}".format(apath, fn[0]) + if "yt" in m: + self.log(m) + + # build list of folders to rmdir after + + self.loud_reply("k") + + def handle_mv(self): + if not self.can_move: + raise Pebkac(403, "not allowed for user " + self.uname) + + if self.args.no_mv: + raise Pebkac(403, "disabled by argv") + + dst = self.uparam.get("to") + if dst is None: + raise Pebkac(400, "need dst vpath") + + svn, srem = self.asrv.vfs.get(self.vpath, self.uname, True, False, True) + dvn, drem = self.asrv.vfs.get(dst, self.uname, False, True) + src = svn.canonical(srem) + dst = dvn.canonical(drem) + + self.loud_reply("mv [{}] to [{}]".format(src, dst)) + def tx_browser(self): vpath = "" vpnodes = [["", "/"]] @@ -1695,7 +1748,7 @@ class HttpCli(object): return self.tx_zip(k, v, vn, rem, [], self.args.ed) fsroot, vfs_ls, vfs_virt = vn.ls( - rem, self.uname, not self.args.no_scandir, incl_wo=True + rem, self.uname, not self.args.no_scandir, [[True], [False, True]] ) stats = {k: v for k, v in vfs_ls} vfs_ls = [x[0] for x in vfs_ls] diff --git a/tests/test_vfs.py b/tests/test_vfs.py index c2ed83d5..285bbc54 100644 --- a/tests/test_vfs.py +++ b/tests/test_vfs.py @@ -57,8 +57,8 @@ class TestVFS(unittest.TestCase): # type: (VFS, str, str) -> tuple[str, str, str] """helper for resolving and listing a folder""" vn, rem = vfs.get(vpath, uname, True, False) - r1 = vn.ls(rem, uname, False) - r2 = vn.ls(rem, uname, False) + r1 = vn.ls(rem, uname, False, [[True]]) + r2 = vn.ls(rem, uname, False, [[True]]) self.assertEqual(r1, r2) fsdir, real, virt = r1