From 36fc87aa6f7fc9091b6d064b2084c95937e8ca27 Mon Sep 17 00:00:00 2001 From: ed Date: Mon, 10 Jun 2019 01:23:13 +0000 Subject: [PATCH] file access --- copyparty/authsrv.py | 36 ++++++++++++++++++++++++------------ copyparty/httpcli.py | 9 +++++++-- copyparty/web/browser.html | 7 +++---- tests/test_vfs.py | 30 ++++++++++++++++-------------- 4 files changed, 50 insertions(+), 32 deletions(-) diff --git a/copyparty/authsrv.py b/copyparty/authsrv.py index 0e328d30..0885f998 100644 --- a/copyparty/authsrv.py +++ b/copyparty/authsrv.py @@ -6,7 +6,7 @@ import os import threading from .__init__ import PY2 -from .util import undot +from .util import undot, Pebkac class VFS(object): @@ -67,32 +67,44 @@ class VFS(object): return [self, vpath] - def can_access(self, vpath, user): + def can_access(self, vpath, uname): """return [readable,writable]""" vn, _ = self._find(vpath) - return [user in vn.uread, user in vn.uwrite] + return [uname in vn.uread, uname in vn.uwrite] - def ls(self, vpath, user): - """return user-readable [fsdir,real,virt] items at vpath""" + def get(self, vpath, uname, will_read, will_write): + """returns [vfsnode,fs_remainder] if user has the requested permissions""" vn, rem = self._find(vpath) - if user not in vn.uread: - return [vn.realpath, [], []] + if will_read and uname not in vn.uread: + raise Pebkac("you don't have read-access for this location") - rp = vn.realpath + if will_write and uname not in vn.uwrite: + raise Pebkac("you don't have write-access for this location") + + return vn, rem + + def canonical(self, rem): + """returns the canonical path (fully-resolved absolute fs path)""" + rp = self.realpath if rem: rp += "/" + rem - real = os.listdir(rp) + return os.path.realpath(rp) + + def ls(self, rem, uname): + """return user-readable [fsdir,real,virt] items at vpath""" + abspath = self.canonical(rem) + real = os.listdir(abspath) real.sort() if rem: virt_vis = [] else: virt_all = [] # all nodes that exist virt_vis = [] # nodes readable by user - for name, vn2 in sorted(vn.nodes.items()): + for name, vn2 in sorted(self.nodes.items()): virt_all.append(name) - if user in vn2.uread: + if uname in vn2.uread: virt_vis.append(name) for name in virt_all: @@ -101,7 +113,7 @@ class VFS(object): except ValueError: pass - return [rp, real, virt_vis] + return [abspath, real, virt_vis] def user_tree(self, uname, readable=False, writable=False): ret = [] diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index b6160367..648cef64 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -98,7 +98,7 @@ class HttpCli(object): args[k.lower()] = True self.args = args - self.vpath = vpath + self.vpath = unquote_plus(vpath) try: if mode == "GET": @@ -305,7 +305,12 @@ class HttpCli(object): vpath += u"/" + node vpnodes.append([cgi.escape(vpath) + "/", cgi.escape(node)]) - fsroot, vfs_ls, vfs_virt = self.auth.vfs.ls(self.vpath, self.uname) + vn, rem = self.auth.vfs.get(self.vpath, self.uname, True, False) + abspath = vn.canonical(rem) + if not os.path.isdir(abspath): + return self.tx_file(abspath) + + fsroot, vfs_ls, vfs_virt = vn.ls(rem, self.uname) vfs_ls.extend(vfs_virt) dirs = [] diff --git a/copyparty/web/browser.html b/copyparty/web/browser.html index 7d91c43c..466b2dec 100644 --- a/copyparty/web/browser.html +++ b/copyparty/web/browser.html @@ -11,10 +11,10 @@

- {% for n in vpnodes[:-1] %} + {%- for n in vpnodes[:-1] %} {{ n[1] }} {%- endfor %} - {{ vpnodes[-1][1] }} + {{ vpnodes[-1][1] }}

@@ -27,13 +27,12 @@ -{% for f in files %} +{%- for f in files %} {%- endfor %}
{{ f[0] }}{{ f[2] }}{{ f[3] }}{{ f[4] }}
-
=
diff --git a/tests/test_vfs.py b/tests/test_vfs.py index 07310778..18baad21 100644 --- a/tests/test_vfs.py +++ b/tests/test_vfs.py @@ -33,6 +33,11 @@ class TestVFS(unittest.TestCase): def absify(self, root, names): return ["{}/{}".format(root, x).replace("//", "/") for x in names] + def ls(self, vfs, vpath, uname): + """helper for resolving and listing a folder""" + vn, rem = vfs.get(vpath, uname, True, False) + return vn.ls(rem, uname) + def runcmd(self, *argv): p = sp.Popen(argv, stdout=sp.PIPE, stderr=sp.PIPE) stdout, stderr = p.communicate() @@ -131,37 +136,34 @@ class TestVFS(unittest.TestCase): self.assertEqual(n.uread, ["k"]) self.assertEqual(n.uwrite, ["*", "k"]) - fsdir, real, virt = vfs.ls("/", "*") + fsdir, real, virt = self.ls(vfs, "/", "*") self.assertEqual(fsdir, td) self.assertEqual(real, ["b", "c"]) self.assertEqual(virt, ["a"]) - fsdir, real, virt = vfs.ls("a", "*") + fsdir, real, virt = self.ls(vfs, "a", "*") self.assertEqual(fsdir, td + "/a") self.assertEqual(real, ["aa", "ab"]) self.assertEqual(virt, ["ac"]) - fsdir, real, virt = vfs.ls("a/ab", "*") + fsdir, real, virt = self.ls(vfs, "a/ab", "*") self.assertEqual(fsdir, td + "/a/ab") self.assertEqual(real, ["aba", "abb", "abc"]) self.assertEqual(virt, []) - fsdir, real, virt = vfs.ls("a/ac", "*") + fsdir, real, virt = self.ls(vfs, "a/ac", "*") self.assertEqual(fsdir, td + "/a/ac") self.assertEqual(real, ["aca", "acc"]) self.assertEqual(virt, []) - fsdir, real, virt = vfs.ls("a/ac", "k") + fsdir, real, virt = self.ls(vfs, "a/ac", "k") self.assertEqual(fsdir, td + "/a/ac") self.assertEqual(real, ["aca", "acc"]) self.assertEqual(virt, ["acb"]) - fsdir, real, virt = vfs.ls("a/ac/acb", "*") - self.assertEqual(fsdir, td + "/a/ac/acb") - self.assertEqual(real, []) - self.assertEqual(virt, []) + self.assertRaises(util.Pebkac, vfs.get, "a/ac/acb", "*", True, False) - fsdir, real, virt = vfs.ls("a/ac/acb", "k") + fsdir, real, virt = self.ls(vfs, "a/ac/acb", "k") self.assertEqual(fsdir, td + "/a/ac/acb") self.assertEqual(real, ["acba", "acbb", "acbc"]) self.assertEqual(virt, []) @@ -193,18 +195,18 @@ class TestVFS(unittest.TestCase): # shadowing vfs = AuthSrv(Namespace(c=None, a=[], v=[".::r", "b:a/ac:r"]), None).vfs - fsp, r1, v1 = vfs.ls("", "*") + fsp, r1, v1 = self.ls(vfs, "", "*") self.assertEqual(fsp, td) self.assertEqual(r1, ["b", "c"]) self.assertEqual(v1, ["a"]) - fsp, r1, v1 = vfs.ls("a", "*") + fsp, r1, v1 = self.ls(vfs, "a", "*") self.assertEqual(fsp, td + "/a") self.assertEqual(r1, ["aa", "ab"]) self.assertEqual(v1, ["ac"]) - fsp1, r1, v1 = vfs.ls("a/ac", "*") - fsp2, r2, v2 = vfs.ls("b", "*") + fsp1, r1, v1 = self.ls(vfs, "a/ac", "*") + fsp2, r2, v2 = self.ls(vfs, "b", "*") self.assertEqual(fsp1, td + "/b") self.assertEqual(fsp2, td + "/b") self.assertEqual(r1, ["ba", "bb", "bc"])