mirror of
https://github.com/9001/copyparty.git
synced 2025-08-17 00:52:16 -06:00
mv/rm handlers
This commit is contained in:
parent
5b0605774c
commit
4339dbeb8d
|
@ -277,6 +277,8 @@ def run_argparse(argv, formatter):
|
|||
|
||||
ap2 = ap.add_argument_group('opt-outs')
|
||||
ap2.add_argument("-nw", action="store_true", help="disable writes (benchmark)")
|
||||
ap2.add_argument("--no-del", action="store_true", help="disable delete operations")
|
||||
ap2.add_argument("--no-mv", action="store_true", help="disable move/rename operations")
|
||||
ap2.add_argument("-nih", action="store_true", help="no info hostname")
|
||||
ap2.add_argument("-nid", action="store_true", help="no info disk-usage")
|
||||
ap2.add_argument("--no-zip", action="store_true", help="disable download as zip/tar")
|
||||
|
|
|
@ -207,37 +207,41 @@ class VFS(object):
|
|||
#
|
||||
return os.path.realpath(rp)
|
||||
|
||||
def ls(self, rem, uname, scandir, incl_wo=False, lstat=False):
|
||||
# type: (str, str, bool, bool, bool) -> tuple[str, str, dict[str, VFS]]
|
||||
def ls(self, rem, uname, scandir, permsets, lstat=False):
|
||||
# type: (str, str, bool, list[list[bool]], bool) -> tuple[str, str, dict[str, VFS]]
|
||||
"""return user-readable [fsdir,real,virt] items at vpath"""
|
||||
virt_vis = {} # nodes readable by user
|
||||
abspath = self.canonical(rem)
|
||||
real = list(statdir(self.log, scandir, lstat, abspath))
|
||||
real.sort()
|
||||
if not rem:
|
||||
for name, vn2 in sorted(self.nodes.items()):
|
||||
ok = uname in vn2.axs.uread or "*" in vn2.axs.uread
|
||||
# no vfs nodes in the list of real inodes
|
||||
real = [x for x in real if x[0] not in self.nodes]
|
||||
|
||||
if not ok and incl_wo:
|
||||
ok = uname in vn2.axs.uwrite or "*" in vn2.axs.uwrite
|
||||
for name, vn2 in sorted(self.nodes.items()):
|
||||
ok = False
|
||||
axs = vn2.axs
|
||||
axs = [axs.uread, axs.uwrite, axs.umove, axs.udel]
|
||||
for pset in permsets:
|
||||
ok = True
|
||||
for req, lst in zip(pset, axs):
|
||||
if req and uname not in lst and "*" not in lst:
|
||||
ok = False
|
||||
if ok:
|
||||
break
|
||||
|
||||
if ok:
|
||||
virt_vis[name] = vn2
|
||||
|
||||
# no vfs nodes in the list of real inodes
|
||||
real = [x for x in real if x[0] not in self.nodes]
|
||||
|
||||
return [abspath, real, virt_vis]
|
||||
|
||||
def walk(self, rel, rem, seen, uname, dots, scandir, lstat):
|
||||
def walk(self, rel, rem, seen, uname, permsets, dots, scandir, lstat):
|
||||
"""
|
||||
recursively yields from ./rem;
|
||||
rel is a unix-style user-defined vpath (not vfs-related)
|
||||
"""
|
||||
|
||||
fsroot, vfs_ls, vfs_virt = self.ls(
|
||||
rem, uname, scandir, incl_wo=False, lstat=lstat
|
||||
)
|
||||
fsroot, vfs_ls, vfs_virt = self.ls(rem, uname, scandir, permsets, lstat=lstat)
|
||||
|
||||
if (
|
||||
seen
|
||||
|
@ -263,7 +267,7 @@ class VFS(object):
|
|||
|
||||
wrel = (rel + "/" + rdir).lstrip("/")
|
||||
wrem = (rem + "/" + rdir).lstrip("/")
|
||||
for x in self.walk(wrel, wrem, seen, uname, dots, scandir, lstat):
|
||||
for x in self.walk(wrel, wrem, seen, uname, permsets, dots, scandir, lstat):
|
||||
yield x
|
||||
|
||||
for n, vfs in sorted(vfs_virt.items()):
|
||||
|
@ -271,7 +275,7 @@ class VFS(object):
|
|||
continue
|
||||
|
||||
wrel = (rel + "/" + n).lstrip("/")
|
||||
for x in vfs.walk(wrel, "", seen, uname, dots, scandir, lstat):
|
||||
for x in vfs.walk(wrel, "", seen, uname, permsets, dots, scandir, lstat):
|
||||
yield x
|
||||
|
||||
def zipgen(self, vrem, flt, uname, dots, scandir):
|
||||
|
@ -282,9 +286,8 @@ class VFS(object):
|
|||
f2a = os.sep + "dir.txt"
|
||||
f2b = "{0}.hist{0}".format(os.sep)
|
||||
|
||||
for vpath, apath, files, rd, vd in self.walk(
|
||||
"", vrem, [], uname, dots, scandir, False
|
||||
):
|
||||
g = self.walk("", vrem, [], uname, [[True]], dots, scandir, False)
|
||||
for vpath, apath, files, rd, vd in g:
|
||||
if flt:
|
||||
files = [x for x in files if x[0] in flt]
|
||||
|
||||
|
@ -789,7 +792,9 @@ class AuthSrv(object):
|
|||
continue
|
||||
|
||||
atop = vn.realpath
|
||||
g = vn.walk("", "", [], u, True, not self.args.no_scandir, False)
|
||||
g = vn.walk(
|
||||
"", "", [], u, True, [[True]], not self.args.no_scandir, False
|
||||
)
|
||||
for vpath, apath, files, _, _ in g:
|
||||
fnames = [n[0] for n in files]
|
||||
vpaths = [vpath + "/" + n for n in fnames] if vpath else fnames
|
||||
|
|
|
@ -370,6 +370,12 @@ class HttpCli(object):
|
|||
|
||||
self.uparam = {"h": False}
|
||||
|
||||
if "delete" in self.uparam:
|
||||
return self.handle_rm()
|
||||
|
||||
if "move" in self.uparam:
|
||||
return self.handle_mv()
|
||||
|
||||
if "h" in self.uparam:
|
||||
self.vpath = None
|
||||
return self.tx_mounts()
|
||||
|
@ -1451,7 +1457,7 @@ class HttpCli(object):
|
|||
|
||||
def scanvol(self):
|
||||
if not self.can_read or not self.can_write:
|
||||
raise Pebkac(403, "not admin")
|
||||
raise Pebkac(403, "not allowed for user " + self.uname)
|
||||
|
||||
if self.args.no_rescan:
|
||||
raise Pebkac(403, "disabled by argv")
|
||||
|
@ -1470,7 +1476,7 @@ class HttpCli(object):
|
|||
|
||||
def tx_stack(self):
|
||||
if not [x for x in self.wvol if x in self.rvol]:
|
||||
raise Pebkac(403, "not admin")
|
||||
raise Pebkac(403, "not allowed for user " + self.uname)
|
||||
|
||||
if self.args.no_stack:
|
||||
raise Pebkac(403, "disabled by argv")
|
||||
|
@ -1508,7 +1514,7 @@ class HttpCli(object):
|
|||
try:
|
||||
vn, rem = self.asrv.vfs.get(top, self.uname, True, False)
|
||||
fsroot, vfs_ls, vfs_virt = vn.ls(
|
||||
rem, self.uname, not self.args.no_scandir, incl_wo=True
|
||||
rem, self.uname, not self.args.no_scandir, [[True], [False, True]]
|
||||
)
|
||||
except:
|
||||
vfs_ls = []
|
||||
|
@ -1535,6 +1541,53 @@ class HttpCli(object):
|
|||
ret["a"] = dirs
|
||||
return ret
|
||||
|
||||
def handle_rm(self):
|
||||
if not self.can_delete:
|
||||
raise Pebkac(403, "not allowed for user " + self.uname)
|
||||
|
||||
if self.args.no_del:
|
||||
raise Pebkac(403, "disabled by argv")
|
||||
|
||||
permsets = [[True, False, False, True]]
|
||||
vn, rem = self.asrv.vfs.get(self.vpath, self.uname, *permsets[0])
|
||||
abspath = vn.canonical(rem)
|
||||
|
||||
fun = os.lstat if os.supports_follow_symlinks else os.stat
|
||||
st = fun(fsenc(abspath))
|
||||
if stat.S_ISLNK(st.st_mode) or stat.S_ISREG(st.st_mode):
|
||||
self.log("rm file " + abspath)
|
||||
return
|
||||
|
||||
scandir = not self.args.no_scandir
|
||||
g = vn.walk("", rem, [], self.uname, permsets, True, scandir, True)
|
||||
for vpath, apath, files, rd, vd in g:
|
||||
for fn in files:
|
||||
m = "rm file {} / {}".format(apath, fn[0])
|
||||
if "yt" in m:
|
||||
self.log(m)
|
||||
|
||||
# build list of folders to rmdir after
|
||||
|
||||
self.loud_reply("k")
|
||||
|
||||
def handle_mv(self):
|
||||
if not self.can_move:
|
||||
raise Pebkac(403, "not allowed for user " + self.uname)
|
||||
|
||||
if self.args.no_mv:
|
||||
raise Pebkac(403, "disabled by argv")
|
||||
|
||||
dst = self.uparam.get("to")
|
||||
if dst is None:
|
||||
raise Pebkac(400, "need dst vpath")
|
||||
|
||||
svn, srem = self.asrv.vfs.get(self.vpath, self.uname, True, False, True)
|
||||
dvn, drem = self.asrv.vfs.get(dst, self.uname, False, True)
|
||||
src = svn.canonical(srem)
|
||||
dst = dvn.canonical(drem)
|
||||
|
||||
self.loud_reply("mv [{}] to [{}]".format(src, dst))
|
||||
|
||||
def tx_browser(self):
|
||||
vpath = ""
|
||||
vpnodes = [["", "/"]]
|
||||
|
@ -1695,7 +1748,7 @@ class HttpCli(object):
|
|||
return self.tx_zip(k, v, vn, rem, [], self.args.ed)
|
||||
|
||||
fsroot, vfs_ls, vfs_virt = vn.ls(
|
||||
rem, self.uname, not self.args.no_scandir, incl_wo=True
|
||||
rem, self.uname, not self.args.no_scandir, [[True], [False, True]]
|
||||
)
|
||||
stats = {k: v for k, v in vfs_ls}
|
||||
vfs_ls = [x[0] for x in vfs_ls]
|
||||
|
|
|
@ -57,8 +57,8 @@ class TestVFS(unittest.TestCase):
|
|||
# type: (VFS, str, str) -> tuple[str, str, str]
|
||||
"""helper for resolving and listing a folder"""
|
||||
vn, rem = vfs.get(vpath, uname, True, False)
|
||||
r1 = vn.ls(rem, uname, False)
|
||||
r2 = vn.ls(rem, uname, False)
|
||||
r1 = vn.ls(rem, uname, False, [[True]])
|
||||
r2 = vn.ls(rem, uname, False, [[True]])
|
||||
self.assertEqual(r1, r2)
|
||||
|
||||
fsdir, real, virt = r1
|
||||
|
|
Loading…
Reference in a new issue