copyparty/tests/test_vfs.py
2019-06-12 16:39:43 +00:00

252 lines
8.4 KiB
Python

#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, unicode_literals
import os
import json
import shutil
import unittest
import subprocess as sp # nosec
from textwrap import dedent
from argparse import Namespace
from copyparty.authsrv import AuthSrv
from copyparty import util
class TestVFS(unittest.TestCase):
def dump(self, vfs):
print(json.dumps(vfs, indent=4, sort_keys=True, default=lambda o: o.__dict__))
def unfoo(self, foo):
for k, v in {"foo": "a", "bar": "b", "baz": "c", "qux": "d"}.items():
foo = foo.replace(k, v)
return foo
def undot(self, vfs, query, response):
self.assertEqual(util.undot(query), response)
query = self.unfoo(query)
response = self.unfoo(response)
self.assertEqual(util.undot(query), response)
def absify(self, root, names):
return ["{}/{}".format(root, x).replace("//", "/") for x in names]
def ls(self, vfs, vpath, uname):
"""helper for resolving and listing a folder"""
vn, rem = vfs.get(vpath, uname, True, False)
return vn.ls(rem, uname)
def runcmd(self, *argv):
p = sp.Popen(argv, stdout=sp.PIPE, stderr=sp.PIPE)
stdout, stderr = p.communicate()
stdout = stdout.decode("utf-8")
stderr = stderr.decode("utf-8")
return [p.returncode, stdout, stderr]
def chkcmd(self, *argv):
ok, sout, serr = self.runcmd(*argv)
if ok != 0:
raise Exception(serr)
return sout, serr
def get_ramdisk(self):
for vol in ["/dev/shm", "/Volumes/cptd"]: # nosec (singleton test)
if os.path.exists(vol):
return vol
if os.path.exists("/Volumes"):
devname, _ = self.chkcmd("hdiutil", "attach", "-nomount", "ram://8192")
_, _ = self.chkcmd("diskutil", "eraseVolume", "HFS+", "cptd", devname)
return "/Volumes/cptd"
raise Exception("TODO support windows")
def test(self):
td = self.get_ramdisk() + "/vfs"
try:
shutil.rmtree(td)
except OSError:
pass
os.mkdir(td)
os.chdir(td)
for a in "abc":
for b in "abc":
for c in "abc":
folder = "{0}/{0}{1}/{0}{1}{2}".format(a, b, c)
os.makedirs(folder)
for d in "abc":
fn = "{}/{}{}{}{}".format(folder, a, b, c, d)
with open(fn, "w") as f:
f.write(fn)
# defaults
vfs = AuthSrv(Namespace(c=None, a=[], v=[]), None).vfs
self.assertEqual(vfs.nodes, {})
self.assertEqual(vfs.vpath, "")
self.assertEqual(vfs.realpath, td)
self.assertEqual(vfs.uread, ["*"])
self.assertEqual(vfs.uwrite, ["*"])
# single read-only rootfs (relative path)
vfs = AuthSrv(Namespace(c=None, a=[], v=["a/ab/::r"]), None).vfs
self.assertEqual(vfs.nodes, {})
self.assertEqual(vfs.vpath, "")
self.assertEqual(vfs.realpath, td + "/a/ab")
self.assertEqual(vfs.uread, ["*"])
self.assertEqual(vfs.uwrite, [])
# single read-only rootfs (absolute path)
vfs = AuthSrv(Namespace(c=None, a=[], v=[td + "//a/ac/../aa//::r"]), None).vfs
self.assertEqual(vfs.nodes, {})
self.assertEqual(vfs.vpath, "")
self.assertEqual(vfs.realpath, td + "/a/aa")
self.assertEqual(vfs.uread, ["*"])
self.assertEqual(vfs.uwrite, [])
# read-only rootfs with write-only subdirectory (read-write for k)
vfs = AuthSrv(
Namespace(c=None, a=["k:k"], v=[".::r:ak", "a/ac/acb:a/ac/acb:w:ak"]), None
).vfs
self.assertEqual(len(vfs.nodes), 1)
self.assertEqual(vfs.vpath, "")
self.assertEqual(vfs.realpath, td)
self.assertEqual(vfs.uread, ["*", "k"])
self.assertEqual(vfs.uwrite, ["k"])
n = vfs.nodes["a"]
self.assertEqual(len(vfs.nodes), 1)
self.assertEqual(n.vpath, "a")
self.assertEqual(n.realpath, td + "/a")
self.assertEqual(n.uread, ["*", "k"])
self.assertEqual(n.uwrite, ["k"])
n = n.nodes["ac"]
self.assertEqual(len(vfs.nodes), 1)
self.assertEqual(n.vpath, "a/ac")
self.assertEqual(n.realpath, td + "/a/ac")
self.assertEqual(n.uread, ["*", "k"])
self.assertEqual(n.uwrite, ["k"])
n = n.nodes["acb"]
self.assertEqual(n.nodes, {})
self.assertEqual(n.vpath, "a/ac/acb")
self.assertEqual(n.realpath, td + "/a/ac/acb")
self.assertEqual(n.uread, ["k"])
self.assertEqual(n.uwrite, ["*", "k"])
fsdir, real, virt = self.ls(vfs, "/", "*")
self.assertEqual(fsdir, td)
self.assertEqual(real, ["b", "c"])
self.assertEqual(virt, ["a"])
fsdir, real, virt = self.ls(vfs, "a", "*")
self.assertEqual(fsdir, td + "/a")
self.assertEqual(real, ["aa", "ab"])
self.assertEqual(virt, ["ac"])
fsdir, real, virt = self.ls(vfs, "a/ab", "*")
self.assertEqual(fsdir, td + "/a/ab")
self.assertEqual(real, ["aba", "abb", "abc"])
self.assertEqual(virt, [])
fsdir, real, virt = self.ls(vfs, "a/ac", "*")
self.assertEqual(fsdir, td + "/a/ac")
self.assertEqual(real, ["aca", "acc"])
self.assertEqual(virt, [])
fsdir, real, virt = self.ls(vfs, "a/ac", "k")
self.assertEqual(fsdir, td + "/a/ac")
self.assertEqual(real, ["aca", "acc"])
self.assertEqual(virt, ["acb"])
self.assertRaises(util.Pebkac, vfs.get, "a/ac/acb", "*", True, False)
fsdir, real, virt = self.ls(vfs, "a/ac/acb", "k")
self.assertEqual(fsdir, td + "/a/ac/acb")
self.assertEqual(real, ["acba", "acbb", "acbc"])
self.assertEqual(virt, [])
# breadth-first construction
vfs = AuthSrv(
Namespace(
c=None,
a=[],
v=[
"a/ac/acb:a/ac/acb:w",
"a:a:w",
".::r",
"abacdfasdq:abacdfasdq:w",
"a/ac:a/ac:w",
],
),
None,
).vfs
# sanitizing relative paths
self.undot(vfs, "foo/bar/../baz/qux", "foo/baz/qux")
self.undot(vfs, "foo/../bar", "bar")
self.undot(vfs, "foo/../../bar", "bar")
self.undot(vfs, "foo/../../", "")
self.undot(vfs, "./.././foo/", "foo")
self.undot(vfs, "./.././foo/..", "")
# shadowing
vfs = AuthSrv(Namespace(c=None, a=[], v=[".::r", "b:a/ac:r"]), None).vfs
fsp, r1, v1 = self.ls(vfs, "", "*")
self.assertEqual(fsp, td)
self.assertEqual(r1, ["b", "c"])
self.assertEqual(v1, ["a"])
fsp, r1, v1 = self.ls(vfs, "a", "*")
self.assertEqual(fsp, td + "/a")
self.assertEqual(r1, ["aa", "ab"])
self.assertEqual(v1, ["ac"])
fsp1, r1, v1 = self.ls(vfs, "a/ac", "*")
fsp2, r2, v2 = self.ls(vfs, "b", "*")
self.assertEqual(fsp1, td + "/b")
self.assertEqual(fsp2, td + "/b")
self.assertEqual(r1, ["ba", "bb", "bc"])
self.assertEqual(r1, r2)
self.assertEqual(v1, v2)
# config file parser
cfg_path = self.get_ramdisk() + "/test.cfg"
with open(cfg_path, "wb") as f:
f.write(
dedent(
"""
u a:123
u asd:fgh:jkl
./src
/dst
r a
a asd
"""
).encode("utf-8")
)
au = AuthSrv(Namespace(c=[cfg_path], a=[], v=[]), None)
self.assertEqual(au.user["a"], "123")
self.assertEqual(au.user["asd"], "fgh:jkl")
n = au.vfs
# root was not defined, so PWD with no access to anyone
self.assertEqual(n.vpath, "")
self.assertEqual(n.realpath, td)
self.assertEqual(n.uread, [])
self.assertEqual(n.uwrite, [])
self.assertEqual(len(n.nodes), 1)
n = n.nodes["dst"]
self.assertEqual(n.vpath, "dst")
self.assertEqual(n.realpath, td + "/src")
self.assertEqual(n.uread, ["a", "asd"])
self.assertEqual(n.uwrite, ["asd"])
self.assertEqual(len(n.nodes), 0)
shutil.rmtree(td)
os.unlink(cfg_path)