#!/usr/bin/env python3 # coding: utf-8 from __future__ import print_function, unicode_literals import os import json import shutil import tempfile import unittest from textwrap import dedent from argparse import Namespace from tests import util as tu from copyparty.authsrv import AuthSrv, VFS from copyparty import util class Cfg(Namespace): def __init__(self, a=None, v=None, c=None): ex = "nw e2d e2ds e2dsa e2t e2ts e2tsr no_logues no_readme no_acode force_js no_robots no_thumb no_athumb no_vthumb" ex = {k: False for k in ex.split()} ex2 = { "mtp": [], "mte": "a", "mth": "", "doctitle": "", "html_head": "", "hist": None, "no_idx": None, "no_hash": None, "js_browser": None, "css_browser": None, "no_voldump": True, "re_maxage": 0, "rproxy": 0, "rsp_slp": 0, "s_wr_slp": 0, "s_wr_sz": 512 * 1024, "lang": "eng", "theme": 0, "themes": 0, "turbo": 0, "logout": 573, } ex.update(ex2) super(Cfg, self).__init__(a=a or [], v=v or [], c=c, **ex) class TestVFS(unittest.TestCase): def setUp(self): self.td = tu.get_ramdisk() def tearDown(self): os.chdir(tempfile.gettempdir()) shutil.rmtree(self.td) def dump(self, vfs): print(json.dumps(vfs, indent=4, sort_keys=True, default=lambda o: o.__dict__)) def unfoo(self, foo): for k, v in {"foo": "a", "bar": "b", "baz": "c", "qux": "d"}.items(): foo = foo.replace(k, v) return foo def undot(self, vfs, query, response): self.assertEqual(util.undot(query), response) query = self.unfoo(query) response = self.unfoo(response) self.assertEqual(util.undot(query), response) def ls(self, vfs, vpath, uname): # type: (VFS, str, str) -> tuple[str, str, str] """helper for resolving and listing a folder""" vn, rem = vfs.get(vpath, uname, True, False) r1 = vn.ls(rem, uname, False, [[True]]) r2 = vn.ls(rem, uname, False, [[True]]) self.assertEqual(r1, r2) fsdir, real, virt = r1 real = [x[0] for x in real] return fsdir, real, virt def log(self, src, msg, c=0): pass def assertAxs(self, dct, lst): t1 = list(sorted(dct)) t2 = list(sorted(lst)) self.assertEqual(t1, t2) def test(self): td = os.path.join(self.td, "vfs") os.mkdir(td) os.chdir(td) for a in "abc": for b in "abc": for c in "abc": folder = "{0}/{0}{1}/{0}{1}{2}".format(a, b, c) os.makedirs(folder) for d in "abc": fn = "{}/{}{}{}{}".format(folder, a, b, c, d) with open(fn, "w") as f: f.write(fn) # defaults vfs = AuthSrv(Cfg(), self.log).vfs self.assertEqual(vfs.nodes, {}) self.assertEqual(vfs.vpath, "") self.assertEqual(vfs.realpath, td) self.assertAxs(vfs.axs.uread, ["*"]) self.assertAxs(vfs.axs.uwrite, ["*"]) # single read-only rootfs (relative path) vfs = AuthSrv(Cfg(v=["a/ab/::r"]), self.log).vfs self.assertEqual(vfs.nodes, {}) self.assertEqual(vfs.vpath, "") self.assertEqual(vfs.realpath, os.path.join(td, "a", "ab")) self.assertAxs(vfs.axs.uread, ["*"]) self.assertAxs(vfs.axs.uwrite, []) # single read-only rootfs (absolute path) vfs = AuthSrv(Cfg(v=[td + "//a/ac/../aa//::r"]), self.log).vfs self.assertEqual(vfs.nodes, {}) self.assertEqual(vfs.vpath, "") self.assertEqual(vfs.realpath, os.path.join(td, "a", "aa")) self.assertAxs(vfs.axs.uread, ["*"]) self.assertAxs(vfs.axs.uwrite, []) # read-only rootfs with write-only subdirectory (read-write for k) vfs = AuthSrv( Cfg(a=["k:k"], v=[".::r:rw,k", "a/ac/acb:a/ac/acb:w:rw,k"]), self.log, ).vfs self.assertEqual(len(vfs.nodes), 1) self.assertEqual(vfs.vpath, "") self.assertEqual(vfs.realpath, td) self.assertAxs(vfs.axs.uread, ["*", "k"]) self.assertAxs(vfs.axs.uwrite, ["k"]) n = vfs.nodes["a"] self.assertEqual(len(vfs.nodes), 1) self.assertEqual(n.vpath, "a") self.assertEqual(n.realpath, os.path.join(td, "a")) self.assertAxs(n.axs.uread, ["*", "k"]) self.assertAxs(n.axs.uwrite, ["k"]) n = n.nodes["ac"] self.assertEqual(len(vfs.nodes), 1) self.assertEqual(n.vpath, "a/ac") self.assertEqual(n.realpath, os.path.join(td, "a", "ac")) self.assertAxs(n.axs.uread, ["*", "k"]) self.assertAxs(n.axs.uwrite, ["k"]) n = n.nodes["acb"] self.assertEqual(n.nodes, {}) self.assertEqual(n.vpath, "a/ac/acb") self.assertEqual(n.realpath, os.path.join(td, "a", "ac", "acb")) self.assertAxs(n.axs.uread, ["k"]) self.assertAxs(n.axs.uwrite, ["*", "k"]) # something funky about the windows path normalization, # doesn't really matter but makes the test messy, TODO? fsdir, real, virt = self.ls(vfs, "/", "*") self.assertEqual(fsdir, td) self.assertEqual(real, ["b", "c"]) self.assertEqual(list(virt), ["a"]) fsdir, real, virt = self.ls(vfs, "a", "*") self.assertEqual(fsdir, os.path.join(td, "a")) self.assertEqual(real, ["aa", "ab"]) self.assertEqual(list(virt), ["ac"]) fsdir, real, virt = self.ls(vfs, "a/ab", "*") self.assertEqual(fsdir, os.path.join(td, "a", "ab")) self.assertEqual(real, ["aba", "abb", "abc"]) self.assertEqual(list(virt), []) fsdir, real, virt = self.ls(vfs, "a/ac", "*") self.assertEqual(fsdir, os.path.join(td, "a", "ac")) self.assertEqual(real, ["aca", "acc"]) self.assertEqual(list(virt), []) fsdir, real, virt = self.ls(vfs, "a/ac", "k") self.assertEqual(fsdir, os.path.join(td, "a", "ac")) self.assertEqual(real, ["aca", "acc"]) self.assertEqual(list(virt), ["acb"]) self.assertRaises(util.Pebkac, vfs.get, "a/ac/acb", "*", True, False) fsdir, real, virt = self.ls(vfs, "a/ac/acb", "k") self.assertEqual(fsdir, os.path.join(td, "a", "ac", "acb")) self.assertEqual(real, ["acba", "acbb", "acbc"]) self.assertEqual(list(virt), []) # admin-only rootfs with all-read-only subfolder vfs = AuthSrv( Cfg(a=["k:k"], v=[".::rw,k", "a:a:r"]), self.log, ).vfs self.assertEqual(len(vfs.nodes), 1) self.assertEqual(vfs.vpath, "") self.assertEqual(vfs.realpath, td) self.assertAxs(vfs.axs.uread, ["k"]) self.assertAxs(vfs.axs.uwrite, ["k"]) n = vfs.nodes["a"] self.assertEqual(len(vfs.nodes), 1) self.assertEqual(n.vpath, "a") self.assertEqual(n.realpath, os.path.join(td, "a")) self.assertAxs(n.axs.uread, ["*"]) self.assertAxs(n.axs.uwrite, []) self.assertEqual(vfs.can_access("/", "*"), (False, False, False, False, False)) self.assertEqual(vfs.can_access("/", "k"), (True, True, False, False, False)) self.assertEqual(vfs.can_access("/a", "*"), (True, False, False, False, False)) self.assertEqual(vfs.can_access("/a", "k"), (True, False, False, False, False)) # breadth-first construction vfs = AuthSrv( Cfg( v=[ "a/ac/acb:a/ac/acb:w", "a:a:w", ".::r", "abacdfasdq:abacdfasdq:w", "a/ac:a/ac:w", ], ), self.log, ).vfs # sanitizing relative paths self.undot(vfs, "foo/bar/../baz/qux", "foo/baz/qux") self.undot(vfs, "foo/../bar", "bar") self.undot(vfs, "foo/../../bar", "bar") self.undot(vfs, "foo/../../", "") self.undot(vfs, "./.././foo/", "foo") self.undot(vfs, "./.././foo/..", "") # shadowing vfs = AuthSrv(Cfg(v=[".::r", "b:a/ac:r"]), self.log).vfs fsp, r1, v1 = self.ls(vfs, "", "*") self.assertEqual(fsp, td) self.assertEqual(r1, ["b", "c"]) self.assertEqual(list(v1), ["a"]) fsp, r1, v1 = self.ls(vfs, "a", "*") self.assertEqual(fsp, os.path.join(td, "a")) self.assertEqual(r1, ["aa", "ab"]) self.assertEqual(list(v1), ["ac"]) fsp1, r1, v1 = self.ls(vfs, "a/ac", "*") fsp2, r2, v2 = self.ls(vfs, "b", "*") self.assertEqual(fsp1, os.path.join(td, "b")) self.assertEqual(fsp2, os.path.join(td, "b")) self.assertEqual(r1, ["ba", "bb", "bc"]) self.assertEqual(r1, r2) self.assertEqual(list(v1), list(v2)) # config file parser cfg_path = os.path.join(self.td, "test.cfg") with open(cfg_path, "wb") as f: f.write( dedent( """ u a:123 u asd:fgh:jkl ./src /dst r a rw asd """ ).encode("utf-8") ) au = AuthSrv(Cfg(c=[cfg_path]), self.log) self.assertEqual(au.acct["a"], "123") self.assertEqual(au.acct["asd"], "fgh:jkl") n = au.vfs # root was not defined, so PWD with no access to anyone self.assertEqual(n.vpath, "") self.assertEqual(n.realpath, "") self.assertAxs(n.axs.uread, []) self.assertAxs(n.axs.uwrite, []) self.assertEqual(len(n.nodes), 1) n = n.nodes["dst"] self.assertEqual(n.vpath, "dst") self.assertEqual(n.realpath, os.path.join(td, "src")) self.assertAxs(n.axs.uread, ["a", "asd"]) self.assertAxs(n.axs.uwrite, ["asd"]) self.assertEqual(len(n.nodes), 0) os.unlink(cfg_path)