mirror of
https://github.com/9001/copyparty.git
synced 2025-08-17 09:02:15 -06:00
show vfs nodes in browser
This commit is contained in:
parent
bf0aac2cbd
commit
eec3efd683
|
@ -97,24 +97,17 @@ class VFS(object):
|
||||||
|
|
||||||
def ls(self, rem, uname):
|
def ls(self, rem, uname):
|
||||||
"""return user-readable [fsdir,real,virt] items at vpath"""
|
"""return user-readable [fsdir,real,virt] items at vpath"""
|
||||||
|
virt_vis = {} # nodes readable by user
|
||||||
abspath = self.canonical(rem)
|
abspath = self.canonical(rem)
|
||||||
real = [fsdec(x) for x in os.listdir(fsenc(abspath))]
|
real = [fsdec(x) for x in os.listdir(fsenc(abspath))]
|
||||||
real.sort()
|
real.sort()
|
||||||
if rem:
|
if not rem:
|
||||||
virt_vis = []
|
|
||||||
else:
|
|
||||||
virt_all = [] # all nodes that exist
|
|
||||||
virt_vis = [] # nodes readable by user
|
|
||||||
for name, vn2 in sorted(self.nodes.items()):
|
for name, vn2 in sorted(self.nodes.items()):
|
||||||
virt_all.append(name)
|
|
||||||
if uname in vn2.uread:
|
if uname in vn2.uread:
|
||||||
virt_vis.append(name)
|
virt_vis[name] = vn2
|
||||||
|
|
||||||
for name in virt_all:
|
# no vfs nodes in the list of real inodes
|
||||||
try:
|
real = [x for x in real if x not in self.nodes]
|
||||||
real.remove(name)
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return [abspath, real, virt_vis]
|
return [abspath, real, virt_vis]
|
||||||
|
|
||||||
|
@ -274,6 +267,21 @@ class AuthSrv(object):
|
||||||
v.uread = mread[dst]
|
v.uread = mread[dst]
|
||||||
v.uwrite = mwrite[dst]
|
v.uwrite = mwrite[dst]
|
||||||
|
|
||||||
|
missing_users = {}
|
||||||
|
for d in [mread, mwrite]:
|
||||||
|
for _, ul in d.items():
|
||||||
|
for usr in ul:
|
||||||
|
if usr != "*" and usr not in user:
|
||||||
|
missing_users[usr] = 1
|
||||||
|
|
||||||
|
if missing_users:
|
||||||
|
self.log(
|
||||||
|
"\033[31myou must -a the following users: "
|
||||||
|
+ ", ".join(k for k in sorted(missing_users))
|
||||||
|
+ "\033[0m"
|
||||||
|
)
|
||||||
|
raise Exception("invalid config")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
v, _ = vfs.get("/", "*", False, True)
|
v, _ = vfs.get("/", "*", False, True)
|
||||||
if self.warn_anonwrite and os.getcwd() == v.realpath:
|
if self.warn_anonwrite and os.getcwd() == v.realpath:
|
||||||
|
|
|
@ -17,7 +17,7 @@ if not PY2:
|
||||||
unicode = str
|
unicode = str
|
||||||
from html import escape as html_escape
|
from html import escape as html_escape
|
||||||
else:
|
else:
|
||||||
from cgi import escape as html_escape
|
from cgi import escape as html_escape # pylint: disable=no-name-in-module
|
||||||
|
|
||||||
|
|
||||||
class HttpCli(object):
|
class HttpCli(object):
|
||||||
|
@ -682,7 +682,7 @@ class HttpCli(object):
|
||||||
return self.tx_file(abspath)
|
return self.tx_file(abspath)
|
||||||
|
|
||||||
fsroot, vfs_ls, vfs_virt = vn.ls(rem, self.uname)
|
fsroot, vfs_ls, vfs_virt = vn.ls(rem, self.uname)
|
||||||
vfs_ls.extend(vfs_virt)
|
vfs_ls.extend(vfs_virt.keys())
|
||||||
|
|
||||||
dirs = []
|
dirs = []
|
||||||
files = []
|
files = []
|
||||||
|
@ -691,10 +691,14 @@ class HttpCli(object):
|
||||||
if self.absolute_urls and vpath:
|
if self.absolute_urls and vpath:
|
||||||
href = "/" + vpath + "/" + fn
|
href = "/" + vpath + "/" + fn
|
||||||
|
|
||||||
fspath = fsroot + "/" + fn
|
if fn in vfs_virt:
|
||||||
|
fspath = vfs_virt[fn].realpath
|
||||||
|
else:
|
||||||
|
fspath = fsroot + "/" + fn
|
||||||
|
|
||||||
try:
|
try:
|
||||||
inf = os.stat(fsenc(fspath))
|
inf = os.stat(fsenc(fspath))
|
||||||
except FileNotFoundError as ex:
|
except FileNotFoundError:
|
||||||
self.log("broken symlink: {}".format(fspath))
|
self.log("broken symlink: {}".format(fspath))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
|
@ -64,6 +64,9 @@ class TestVFS(unittest.TestCase):
|
||||||
|
|
||||||
raise Exception("TODO support windows")
|
raise Exception("TODO support windows")
|
||||||
|
|
||||||
|
def log(self, src, msg):
|
||||||
|
pass
|
||||||
|
|
||||||
def test(self):
|
def test(self):
|
||||||
td = self.get_ramdisk() + "/vfs"
|
td = self.get_ramdisk() + "/vfs"
|
||||||
try:
|
try:
|
||||||
|
@ -85,7 +88,7 @@ class TestVFS(unittest.TestCase):
|
||||||
f.write(fn)
|
f.write(fn)
|
||||||
|
|
||||||
# defaults
|
# defaults
|
||||||
vfs = AuthSrv(Namespace(c=None, a=[], v=[]), None).vfs
|
vfs = AuthSrv(Namespace(c=None, a=[], v=[]), self.log).vfs
|
||||||
self.assertEqual(vfs.nodes, {})
|
self.assertEqual(vfs.nodes, {})
|
||||||
self.assertEqual(vfs.vpath, "")
|
self.assertEqual(vfs.vpath, "")
|
||||||
self.assertEqual(vfs.realpath, td)
|
self.assertEqual(vfs.realpath, td)
|
||||||
|
@ -93,7 +96,7 @@ class TestVFS(unittest.TestCase):
|
||||||
self.assertEqual(vfs.uwrite, ["*"])
|
self.assertEqual(vfs.uwrite, ["*"])
|
||||||
|
|
||||||
# single read-only rootfs (relative path)
|
# single read-only rootfs (relative path)
|
||||||
vfs = AuthSrv(Namespace(c=None, a=[], v=["a/ab/::r"]), None).vfs
|
vfs = AuthSrv(Namespace(c=None, a=[], v=["a/ab/::r"]), self.log).vfs
|
||||||
self.assertEqual(vfs.nodes, {})
|
self.assertEqual(vfs.nodes, {})
|
||||||
self.assertEqual(vfs.vpath, "")
|
self.assertEqual(vfs.vpath, "")
|
||||||
self.assertEqual(vfs.realpath, td + "/a/ab")
|
self.assertEqual(vfs.realpath, td + "/a/ab")
|
||||||
|
@ -101,7 +104,9 @@ class TestVFS(unittest.TestCase):
|
||||||
self.assertEqual(vfs.uwrite, [])
|
self.assertEqual(vfs.uwrite, [])
|
||||||
|
|
||||||
# single read-only rootfs (absolute path)
|
# single read-only rootfs (absolute path)
|
||||||
vfs = AuthSrv(Namespace(c=None, a=[], v=[td + "//a/ac/../aa//::r"]), None).vfs
|
vfs = AuthSrv(
|
||||||
|
Namespace(c=None, a=[], v=[td + "//a/ac/../aa//::r"]), self.log
|
||||||
|
).vfs
|
||||||
self.assertEqual(vfs.nodes, {})
|
self.assertEqual(vfs.nodes, {})
|
||||||
self.assertEqual(vfs.vpath, "")
|
self.assertEqual(vfs.vpath, "")
|
||||||
self.assertEqual(vfs.realpath, td + "/a/aa")
|
self.assertEqual(vfs.realpath, td + "/a/aa")
|
||||||
|
@ -110,7 +115,8 @@ class TestVFS(unittest.TestCase):
|
||||||
|
|
||||||
# read-only rootfs with write-only subdirectory (read-write for k)
|
# read-only rootfs with write-only subdirectory (read-write for k)
|
||||||
vfs = AuthSrv(
|
vfs = AuthSrv(
|
||||||
Namespace(c=None, a=["k:k"], v=[".::r:ak", "a/ac/acb:a/ac/acb:w:ak"]), None
|
Namespace(c=None, a=["k:k"], v=[".::r:ak", "a/ac/acb:a/ac/acb:w:ak"]),
|
||||||
|
self.log,
|
||||||
).vfs
|
).vfs
|
||||||
self.assertEqual(len(vfs.nodes), 1)
|
self.assertEqual(len(vfs.nodes), 1)
|
||||||
self.assertEqual(vfs.vpath, "")
|
self.assertEqual(vfs.vpath, "")
|
||||||
|
@ -139,34 +145,34 @@ class TestVFS(unittest.TestCase):
|
||||||
fsdir, real, virt = self.ls(vfs, "/", "*")
|
fsdir, real, virt = self.ls(vfs, "/", "*")
|
||||||
self.assertEqual(fsdir, td)
|
self.assertEqual(fsdir, td)
|
||||||
self.assertEqual(real, ["b", "c"])
|
self.assertEqual(real, ["b", "c"])
|
||||||
self.assertEqual(virt, ["a"])
|
self.assertEqual(list(virt), ["a"])
|
||||||
|
|
||||||
fsdir, real, virt = self.ls(vfs, "a", "*")
|
fsdir, real, virt = self.ls(vfs, "a", "*")
|
||||||
self.assertEqual(fsdir, td + "/a")
|
self.assertEqual(fsdir, td + "/a")
|
||||||
self.assertEqual(real, ["aa", "ab"])
|
self.assertEqual(real, ["aa", "ab"])
|
||||||
self.assertEqual(virt, ["ac"])
|
self.assertEqual(list(virt), ["ac"])
|
||||||
|
|
||||||
fsdir, real, virt = self.ls(vfs, "a/ab", "*")
|
fsdir, real, virt = self.ls(vfs, "a/ab", "*")
|
||||||
self.assertEqual(fsdir, td + "/a/ab")
|
self.assertEqual(fsdir, td + "/a/ab")
|
||||||
self.assertEqual(real, ["aba", "abb", "abc"])
|
self.assertEqual(real, ["aba", "abb", "abc"])
|
||||||
self.assertEqual(virt, [])
|
self.assertEqual(list(virt), [])
|
||||||
|
|
||||||
fsdir, real, virt = self.ls(vfs, "a/ac", "*")
|
fsdir, real, virt = self.ls(vfs, "a/ac", "*")
|
||||||
self.assertEqual(fsdir, td + "/a/ac")
|
self.assertEqual(fsdir, td + "/a/ac")
|
||||||
self.assertEqual(real, ["aca", "acc"])
|
self.assertEqual(real, ["aca", "acc"])
|
||||||
self.assertEqual(virt, [])
|
self.assertEqual(list(virt), [])
|
||||||
|
|
||||||
fsdir, real, virt = self.ls(vfs, "a/ac", "k")
|
fsdir, real, virt = self.ls(vfs, "a/ac", "k")
|
||||||
self.assertEqual(fsdir, td + "/a/ac")
|
self.assertEqual(fsdir, td + "/a/ac")
|
||||||
self.assertEqual(real, ["aca", "acc"])
|
self.assertEqual(real, ["aca", "acc"])
|
||||||
self.assertEqual(virt, ["acb"])
|
self.assertEqual(list(virt), ["acb"])
|
||||||
|
|
||||||
self.assertRaises(util.Pebkac, vfs.get, "a/ac/acb", "*", True, False)
|
self.assertRaises(util.Pebkac, vfs.get, "a/ac/acb", "*", True, False)
|
||||||
|
|
||||||
fsdir, real, virt = self.ls(vfs, "a/ac/acb", "k")
|
fsdir, real, virt = self.ls(vfs, "a/ac/acb", "k")
|
||||||
self.assertEqual(fsdir, td + "/a/ac/acb")
|
self.assertEqual(fsdir, td + "/a/ac/acb")
|
||||||
self.assertEqual(real, ["acba", "acbb", "acbc"])
|
self.assertEqual(real, ["acba", "acbb", "acbc"])
|
||||||
self.assertEqual(virt, [])
|
self.assertEqual(list(virt), [])
|
||||||
|
|
||||||
# breadth-first construction
|
# breadth-first construction
|
||||||
vfs = AuthSrv(
|
vfs = AuthSrv(
|
||||||
|
@ -181,7 +187,7 @@ class TestVFS(unittest.TestCase):
|
||||||
"a/ac:a/ac:w",
|
"a/ac:a/ac:w",
|
||||||
],
|
],
|
||||||
),
|
),
|
||||||
None,
|
self.log,
|
||||||
).vfs
|
).vfs
|
||||||
|
|
||||||
# sanitizing relative paths
|
# sanitizing relative paths
|
||||||
|
@ -193,17 +199,17 @@ class TestVFS(unittest.TestCase):
|
||||||
self.undot(vfs, "./.././foo/..", "")
|
self.undot(vfs, "./.././foo/..", "")
|
||||||
|
|
||||||
# shadowing
|
# shadowing
|
||||||
vfs = AuthSrv(Namespace(c=None, a=[], v=[".::r", "b:a/ac:r"]), None).vfs
|
vfs = AuthSrv(Namespace(c=None, a=[], v=[".::r", "b:a/ac:r"]), self.log).vfs
|
||||||
|
|
||||||
fsp, r1, v1 = self.ls(vfs, "", "*")
|
fsp, r1, v1 = self.ls(vfs, "", "*")
|
||||||
self.assertEqual(fsp, td)
|
self.assertEqual(fsp, td)
|
||||||
self.assertEqual(r1, ["b", "c"])
|
self.assertEqual(r1, ["b", "c"])
|
||||||
self.assertEqual(v1, ["a"])
|
self.assertEqual(list(v1), ["a"])
|
||||||
|
|
||||||
fsp, r1, v1 = self.ls(vfs, "a", "*")
|
fsp, r1, v1 = self.ls(vfs, "a", "*")
|
||||||
self.assertEqual(fsp, td + "/a")
|
self.assertEqual(fsp, td + "/a")
|
||||||
self.assertEqual(r1, ["aa", "ab"])
|
self.assertEqual(r1, ["aa", "ab"])
|
||||||
self.assertEqual(v1, ["ac"])
|
self.assertEqual(list(v1), ["ac"])
|
||||||
|
|
||||||
fsp1, r1, v1 = self.ls(vfs, "a/ac", "*")
|
fsp1, r1, v1 = self.ls(vfs, "a/ac", "*")
|
||||||
fsp2, r2, v2 = self.ls(vfs, "b", "*")
|
fsp2, r2, v2 = self.ls(vfs, "b", "*")
|
||||||
|
@ -211,7 +217,7 @@ class TestVFS(unittest.TestCase):
|
||||||
self.assertEqual(fsp2, td + "/b")
|
self.assertEqual(fsp2, td + "/b")
|
||||||
self.assertEqual(r1, ["ba", "bb", "bc"])
|
self.assertEqual(r1, ["ba", "bb", "bc"])
|
||||||
self.assertEqual(r1, r2)
|
self.assertEqual(r1, r2)
|
||||||
self.assertEqual(v1, v2)
|
self.assertEqual(list(v1), list(v2))
|
||||||
|
|
||||||
# config file parser
|
# config file parser
|
||||||
cfg_path = self.get_ramdisk() + "/test.cfg"
|
cfg_path = self.get_ramdisk() + "/test.cfg"
|
||||||
|
@ -230,7 +236,7 @@ class TestVFS(unittest.TestCase):
|
||||||
).encode("utf-8")
|
).encode("utf-8")
|
||||||
)
|
)
|
||||||
|
|
||||||
au = AuthSrv(Namespace(c=[cfg_path], a=[], v=[]), None)
|
au = AuthSrv(Namespace(c=[cfg_path], a=[], v=[]), self.log)
|
||||||
self.assertEqual(au.user["a"], "123")
|
self.assertEqual(au.user["a"], "123")
|
||||||
self.assertEqual(au.user["asd"], "fgh:jkl")
|
self.assertEqual(au.user["asd"], "fgh:jkl")
|
||||||
n = au.vfs
|
n = au.vfs
|
||||||
|
|
Loading…
Reference in a new issue