copyparty/tests/test_vfs.py
ed fff7291dcf show h vols in ls and tree,
and compensate with some optimizations
2025-10-23 21:44:28 +00:00

279 lines
9.6 KiB
Python

#!/usr/bin/env python3
# coding: utf-8
from __future__ import print_function, unicode_literals
import json
import os
import shutil
import tempfile
import unittest
from copyparty import util
from copyparty.authsrv import VFS, AuthSrv
from tests import util as tu
from tests.util import Cfg
class TestVFS(unittest.TestCase):
def setUp(self):
self.td = tu.get_ramdisk()
def tearDown(self):
os.chdir(tempfile.gettempdir())
shutil.rmtree(self.td)
def dump(self, vfs):
print(json.dumps(vfs, indent=4, sort_keys=True, default=lambda o: o.__dict__))
def unfoo(self, foo):
for k, v in {"foo": "a", "bar": "b", "baz": "c", "qux": "d"}.items():
foo = foo.replace(k, v)
return foo
def undot(self, vfs, query, response):
self.assertEqual(util.undot(query), response)
query = self.unfoo(query)
response = self.unfoo(response)
self.assertEqual(util.undot(query), response)
def ls(self, vfs, vpath, uname):
# type: (VFS, str, str) -> tuple[str, str, str]
"""helper for resolving and listing a folder"""
vn, rem = vfs.get(vpath, uname, True, False)
r1 = vn.ls(rem, uname, False, [[True]])
r2 = vn.ls(rem, uname, False, [[True]])
self.assertEqual(r1, r2)
fsdir, real, virt = r1
real = [x[0] for x in real]
return fsdir, real, virt
def log(self, src, msg, c=0):
pass
def assertAxs(self, dct, lst):
t1 = list(sorted(dct))
t2 = list(sorted(lst))
self.assertEqual(t1, t2)
def wipe_vfs(self, td):
os.chdir("..")
if os.path.exists(td):
shutil.rmtree(td)
os.mkdir(td)
os.chdir(td)
for a in "abc":
for b in "abc":
for c in "abc":
folder = "{0}/{0}{1}/{0}{1}{2}".format(a, b, c)
os.makedirs(folder)
for d in "abc":
fn = "{}/{}{}{}{}".format(folder, a, b, c, d)
with open(fn, "w") as f:
f.write(fn)
def test(self):
td = os.path.join(self.td, "vfs")
self.wipe_vfs(td)
# defaults
self.wipe_vfs(td)
vfs = AuthSrv(Cfg(), self.log).vfs
self.assertEqual(vfs.nodes, {})
self.assertEqual(vfs.vpath, "")
self.assertEqual(vfs.realpath, td)
self.assertAxs(vfs.axs.uread, ["*"])
self.assertAxs(vfs.axs.uwrite, ["*"])
# single read-only rootfs (relative path)
self.wipe_vfs(td)
vfs = AuthSrv(Cfg(v=["a/ab/::r"]), self.log).vfs
self.assertEqual(vfs.nodes, {})
self.assertEqual(vfs.vpath, "")
self.assertEqual(vfs.realpath, os.path.join(td, "a", "ab"))
self.assertAxs(vfs.axs.uread, ["*"])
self.assertAxs(vfs.axs.uwrite, [])
# single read-only rootfs (absolute path)
self.wipe_vfs(td)
vfs = AuthSrv(Cfg(v=[td + "//a/ac/../aa//::r"]), self.log).vfs
self.assertEqual(vfs.nodes, {})
self.assertEqual(vfs.vpath, "")
self.assertEqual(vfs.realpath, os.path.join(td, "a", "aa"))
self.assertAxs(vfs.axs.uread, ["*"])
self.assertAxs(vfs.axs.uwrite, [])
# read-only rootfs with write-only subdirectory (read-write for k)
self.wipe_vfs(td)
vfs = AuthSrv(
Cfg(a=["k:k"], v=[".::r:rw,k", "a/ac/acb:a/ac/acb:w:rw,k"]),
self.log,
).vfs
self.assertEqual(len(vfs.nodes), 1)
self.assertEqual(vfs.vpath, "")
self.assertEqual(vfs.realpath, td)
self.assertAxs(vfs.axs.uread, ["*", "k"])
self.assertAxs(vfs.axs.uwrite, ["k"])
n = vfs.nodes["a"]
self.assertEqual(len(vfs.nodes), 1)
self.assertEqual(n.vpath, "a")
self.assertEqual(n.realpath, os.path.join(td, "a"))
self.assertAxs(n.axs.uread, ["*", "k"])
self.assertAxs(n.axs.uwrite, ["k"])
n = n.nodes["ac"]
self.assertEqual(len(vfs.nodes), 1)
self.assertEqual(n.vpath, "a/ac")
self.assertEqual(n.realpath, os.path.join(td, "a", "ac"))
self.assertAxs(n.axs.uread, ["*", "k"])
self.assertAxs(n.axs.uwrite, ["k"])
n = n.nodes["acb"]
self.assertEqual(n.nodes, {})
self.assertEqual(n.vpath, "a/ac/acb")
self.assertEqual(n.realpath, os.path.join(td, "a", "ac", "acb"))
self.assertAxs(n.axs.uread, ["k"])
self.assertAxs(n.axs.uwrite, ["*", "k"])
# something funky about the windows path normalization,
# doesn't really matter but makes the test messy, TODO?
fsdir, real, virt = self.ls(vfs, "/", "*")
self.assertEqual(fsdir, td)
self.assertEqual(real, ["b", "c"])
self.assertEqual(list(virt), ["a"])
fsdir, real, virt = self.ls(vfs, "a", "*")
self.assertEqual(fsdir, os.path.join(td, "a"))
self.assertEqual(real, ["aa", "ab"])
self.assertEqual(list(virt), ["ac"])
fsdir, real, virt = self.ls(vfs, "a/ab", "*")
self.assertEqual(fsdir, os.path.join(td, "a", "ab"))
self.assertEqual(real, ["aba", "abb", "abc"])
self.assertEqual(list(virt), [])
fsdir, real, virt = self.ls(vfs, "a/ac", "*")
self.assertEqual(fsdir, os.path.join(td, "a", "ac"))
self.assertEqual(real, ["aca", "acc"])
self.assertEqual(list(virt), [])
fsdir, real, virt = self.ls(vfs, "a/ac", "k")
self.assertEqual(fsdir, os.path.join(td, "a", "ac"))
self.assertEqual(real, ["aca", "acc"])
self.assertEqual(list(virt), ["acb"])
self.assertRaises(util.Pebkac, vfs.get, "a/ac/acb", "*", True, False)
fsdir, real, virt = self.ls(vfs, "a/ac/acb", "k")
self.assertEqual(fsdir, os.path.join(td, "a", "ac", "acb"))
self.assertEqual(real, ["acba", "acbb", "acbc"])
self.assertEqual(list(virt), [])
# admin-only rootfs with all-read-only subfolder
self.wipe_vfs(td)
vfs = AuthSrv(
Cfg(a=["k:k"], v=[".::rw,k", "a:a:r"]),
self.log,
).vfs
self.assertEqual(len(vfs.nodes), 1)
self.assertEqual(vfs.vpath, "")
self.assertEqual(vfs.realpath, td)
self.assertAxs(vfs.axs.uread, ["k"])
self.assertAxs(vfs.axs.uwrite, ["k"])
n = vfs.nodes["a"]
self.assertEqual(len(vfs.nodes), 1)
self.assertEqual(n.vpath, "a")
self.assertEqual(n.realpath, os.path.join(td, "a"))
self.assertAxs(n.axs.uread, ["*", "k"])
self.assertAxs(n.axs.uwrite, [])
perm_na = (False, False, False, False, False, False, False, False, False)
perm_rw = (True, True, False, False, False, False, False, False, False)
perm_ro = (True, False, False, False, False, False, False, False, False)
self.assertEqual(vfs.can_access("/", "*"), perm_na)
self.assertEqual(vfs.can_access("/", "k"), perm_rw)
self.assertEqual(vfs.can_access("/a", "*"), perm_ro)
self.assertEqual(vfs.can_access("/a", "k"), perm_ro)
# breadth-first construction
self.wipe_vfs(td)
vfs = AuthSrv(
Cfg(
v=[
"a/ac/acb:a/ac/acb:w",
"a:a:w",
".::r",
"abacdfasdq:abacdfasdq:w",
"a/ac:a/ac:w",
],
),
self.log,
).vfs
# sanitizing relative paths
self.undot(vfs, "foo/bar/../baz/qux", "foo/baz/qux")
self.undot(vfs, "foo/../bar", "bar")
self.undot(vfs, "foo/../../bar", "bar")
self.undot(vfs, "foo/../../", "")
self.undot(vfs, "./.././foo/", "foo")
self.undot(vfs, "./.././foo/..", "")
# shadowing
self.wipe_vfs(td)
vfs = AuthSrv(Cfg(v=[".::r", "b:a/ac:r"]), self.log).vfs
fsp, r1, v1 = self.ls(vfs, "", "*")
self.assertEqual(fsp, td)
self.assertEqual(r1, ["b", "c"])
self.assertEqual(list(v1), ["a"])
fsp, r1, v1 = self.ls(vfs, "a", "*")
self.assertEqual(fsp, os.path.join(td, "a"))
self.assertEqual(r1, ["aa", "ab"])
self.assertEqual(list(v1), ["ac"])
fsp1, r1, v1 = self.ls(vfs, "a/ac", "*")
fsp2, r2, v2 = self.ls(vfs, "b", "*")
self.assertEqual(fsp1, os.path.join(td, "b"))
self.assertEqual(fsp2, os.path.join(td, "b"))
self.assertEqual(r1, ["ba", "bb", "bc"])
self.assertEqual(r1, r2)
self.assertEqual(list(v1), list(v2))
# config file parser
cfg_path = os.path.join(self.td, "test.cfg")
with open(cfg_path, "wb") as f:
f.write(
util.dedent(
"""
u a:123
u asd:fgh:jkl
./src
/dst
r a
rw asd
"""
).encode("utf-8")
)
self.wipe_vfs(td)
au = AuthSrv(Cfg(c=[cfg_path]), self.log)
self.assertEqual(au.acct["a"], "123")
self.assertEqual(au.acct["asd"], "fgh:jkl")
n = au.vfs
# root was not defined, so PWD with no access to anyone
self.assertEqual(n.vpath, "")
self.assertEqual(n.realpath, "")
self.assertAxs(n.axs.uread, [])
self.assertAxs(n.axs.uwrite, [])
self.assertEqual(len(n.nodes), 1)
n = n.nodes["dst"]
self.assertEqual(n.vpath, "dst")
self.assertEqual(n.realpath, os.path.join(td, "src"))
self.assertAxs(n.axs.uread, ["a", "asd"])
self.assertAxs(n.axs.uwrite, ["asd"])
self.assertEqual(len(n.nodes), 0)
os.unlink(cfg_path)