From 52e06226a2592d9eec78eb60f78f966bedab2979 Mon Sep 17 00:00:00 2001 From: ed Date: Sun, 16 Jun 2024 21:35:43 +0200 Subject: [PATCH] make thumbnails compatible with dirkeys/filekeys was intentionally skipped to avoid complexity but enough people have asked why it doesn't work that it's time to do something about it turns out it wasn't that bad --- README.md | 2 +- copyparty/httpcli.py | 70 ++++++++++---- tests/test_dots.py | 212 ++++++++++++++++++++++++++++++++++++++++++- tests/util.py | 4 +- 4 files changed, 264 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 41e26719..408a5e0e 100644 --- a/README.md +++ b/README.md @@ -1918,7 +1918,7 @@ volflag `dk` generates dirkeys (per-directory accesskeys) for all folders, grant volflag `dky` disables the actual key-check, meaning anyone can see the contents of a folder where they have `g` access, but not its subdirectories -* `dk` + `dky` gives the same behavior as if all users with `g` access have full read-access, but subfolders are hidden files (their names start with a dot), so `dky` is an alternative to renaming all the folders for that purpose, maybe just for some users +* `dk` + `dky` gives the same behavior as if all users with `g` access have full read-access, but subfolders are hidden files (as if their names start with a dot), so `dky` is an alternative to renaming all the folders for that purpose, maybe just for some users volflag `dks` lets people enter subfolders as well, and also enables download-as-zip/tar diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 4a72beb1..045b89c5 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -2032,7 +2032,7 @@ class HttpCli(object): v = self.uparam[k] - if self._use_dirkey(): + if self._use_dirkey(self.vn, ""): vn = self.vn rem = self.rem else: @@ -2953,22 +2953,24 @@ class HttpCli(object): return file_lastmod, True - def _use_dirkey(self, ap: str = "") -> bool: + def _use_dirkey(self, vn: VFS, ap: str) -> bool: if self.can_read or not self.can_get: return False - if self.vn.flags.get("dky"): + if vn.flags.get("dky"): return True req = self.uparam.get("k") or "" if not req: return False - dk_len = self.vn.flags.get("dk") + dk_len = vn.flags.get("dk") if not dk_len: return False - ap = ap or self.vn.canonical(self.rem) + if not ap: + ap = vn.canonical(self.rem) + zs = self.gen_fk(2, self.args.dk_salt, ap, 0, 0)[:dk_len] if req == zs: return True @@ -2977,6 +2979,34 @@ class HttpCli(object): self.log(t % (zs, req, self.req, ap), 6) return False + def _use_filekey(self, vn: VFS, ap: str, st: os.stat_result) -> bool: + if self.can_read or not self.can_get: + return False + + req = self.uparam.get("k") or "" + if not req: + return False + + fk_len = vn.flags.get("fk") + if not fk_len: + return False + + if not ap: + ap = self.vn.canonical(self.rem) + + alg = 2 if "fka" in vn.flags else 1 + + zs = self.gen_fk( + alg, self.args.fk_salt, ap, st.st_size, 0 if ANYWIN else st.st_ino + )[:fk_len] + + if req == zs: + return True + + t = "wrong filekey, want %s, got %s\n vp: %s\n ap: %s" + self.log(t % (zs, req, self.req, ap), 6) + return False + def _expand(self, txt: str, phs: list[str]) -> str: for ph in phs: if ph.startswith("hdr."): @@ -3864,7 +3894,7 @@ class HttpCli(object): dk_sz = False if dk: vn, rem = vfs.get(top, self.uname, False, False) - if vn.flags.get("dks") and self._use_dirkey(vn.canonical(rem)): + if vn.flags.get("dks") and self._use_dirkey(vn, vn.canonical(rem)): dk_sz = vn.flags.get("dk") dots = False @@ -4188,9 +4218,20 @@ class HttpCli(object): if idx and hasattr(idx, "p_end"): icur = idx.get_cur(dbv) + if "k" in self.uparam or "dky" in vn.flags: + if is_dir: + use_dirkey = self._use_dirkey(vn, abspath) + use_filekey = False + else: + use_filekey = self._use_filekey(vn, abspath, st) + use_dirkey = False + else: + use_dirkey = use_filekey = False + th_fmt = self.uparam.get("th") if self.can_read or ( - self.can_get and (vn.flags.get("dk") or "fk" not in vn.flags) + self.can_get + and (use_filekey or use_dirkey or (not is_dir and "fk" not in vn.flags)) ): if th_fmt is not None: nothumb = "dthumb" in dbv.flags @@ -4270,18 +4311,7 @@ class HttpCli(object): if not is_dir and (self.can_read or self.can_get): if not self.can_read and not fk_pass and "fk" in vn.flags: - alg = 2 if "fka" in vn.flags else 1 - correct = self.gen_fk( - alg, - self.args.fk_salt, - abspath, - st.st_size, - 0 if ANYWIN else st.st_ino, - )[: vn.flags["fk"]] - got = self.uparam.get("k") - if got != correct: - t = "wrong filekey, want %s, got %s\n vp: %s\n ap: %s" - self.log(t % (correct, got, self.req, abspath), 6) + if not use_filekey: return self.tx_404() if add_og: @@ -4312,7 +4342,7 @@ class HttpCli(object): ) elif is_dir and not self.can_read: - if self._use_dirkey(abspath): + if use_dirkey: is_dk = True elif not self.can_write: return self.tx_404(True) diff --git a/tests/test_dots.py b/tests/test_dots.py index dd568867..62a38f4f 100644 --- a/tests/test_dots.py +++ b/tests/test_dots.py @@ -23,7 +23,7 @@ def hdr(query, uname): return (h % (query, uname)).encode("utf-8") -class TestHttpCli(unittest.TestCase): +class TestDots(unittest.TestCase): def setUp(self): self.td = tu.get_ramdisk() @@ -31,7 +31,7 @@ class TestHttpCli(unittest.TestCase): os.chdir(tempfile.gettempdir()) shutil.rmtree(self.td) - def test(self): + def test_dots(self): td = os.path.join(self.td, "vfs") os.mkdir(td) os.chdir(td) @@ -118,6 +118,214 @@ class TestHttpCli(unittest.TestCase): url = "v?k=" + zj["dk"] self.assertEqual(self.tarsel(url, "u2", ["f1.txt", "a", ".b"]), "f1.txt") + shutil.rmtree("v") + + def test_dk_fk(self): + # python3 -m unittest tests.test_dots.TestDots.test_dk_fk + + td = os.path.join(self.td, "vfs") + os.mkdir(td) + os.chdir(td) + + vcfg = [] + for k in "dk dks dky fk fka dk,fk dks,fk".split(): + vcfg += ["{0}:{0}:r.,u1:g,u2:c,{0}".format(k)] + zs = "%s/s1/s2" % (k,) + os.makedirs(zs) + + with open("%s/f.t1" % (k,), "wb") as f: + f.write(b"f1") + + with open("%s/s1/f.t2" % (k,), "wb") as f: + f.write(b"f2") + + with open("%s/s1/s2/f.t3" % (k,), "wb") as f: + f.write(b"f3") + + self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"]) + self.asrv = AuthSrv(self.args, self.log) + + dk = {} + for d in "dk dks dk,fk dks,fk".split(): + zj = json.loads(self.curl("%s?ls" % (d,), "u1")[1]) + dk[d] = zj["dk"] + + ## + ## dk + + # should not be able to access dk with wrong dirkey, + zs = self.curl("dk?ls&k=%s" % (dk["dks"]), "u2")[1] + self.assertEqual(zs, "\nJ2EOT") + # so use the right key + zs = self.curl("dk?ls&k=%s" % (dk["dk"]), "u2")[1] + zj = json.loads(zs) + self.assertEqual(len(zj["dirs"]), 0) + self.assertEqual(len(zj["files"]), 1) + self.assertEqual(zj["files"][0]["href"], "f.t1") + + ## + ## dk thumbs + + self.assertIn('">folder', self.curl("dk?th=x", "u1")[1]) + self.assertIn('">e403', self.curl("dk?th=x", "u2")[1]) + + zs = "dk?th=x&k=%s" % (dk["dks"]) + self.assertIn('">e403', self.curl(zs, "u2")[1]) + + zs = "dk?th=x&k=%s" % (dk["dk"]) + self.assertIn('">folder', self.curl(zs, "u2")[1]) + + # fk not enabled, so this should work + self.assertIn('">t1', self.curl("dk/f.t1?th=x", "u2")[1]) + self.assertIn('">t2', self.curl("dk/s1/f.t2?th=x", "u2")[1]) + + ## + ## dks + + # should not be able to access dks with wrong dirkey, + zs = self.curl("dks?ls&k=%s" % (dk["dk"]), "u2")[1] + self.assertEqual(zs, "\nJ2EOT") + # so use the right key + zs = self.curl("dks?ls&k=%s" % (dk["dks"]), "u2")[1] + zj = json.loads(zs) + self.assertEqual(len(zj["dirs"]), 1) + self.assertEqual(len(zj["files"]), 1) + self.assertEqual(zj["files"][0]["href"], "f.t1") + # dks should return correct dirkey of subfolders; + s1 = zj["dirs"][0]["href"] + self.assertEqual(s1.split("/")[0], "s1") + zs = self.curl("dks/%s&ls" % (s1), "u2")[1] + self.assertIn('"s2/?k=', zs) + + ## + ## dks thumbs + + self.assertIn('">folder', self.curl("dks?th=x", "u1")[1]) + self.assertIn('">e403', self.curl("dks?th=x", "u2")[1]) + + zs = "dks?th=x&k=%s" % (dk["dk"]) + self.assertIn('">e403', self.curl(zs, "u2")[1]) + + zs = "dks?th=x&k=%s" % (dk["dks"]) + self.assertIn('">folder', self.curl(zs, "u2")[1]) + + # fk not enabled, so this should work + self.assertIn('">t1', self.curl("dks/f.t1?th=x", "u2")[1]) + self.assertIn('">t2', self.curl("dks/s1/f.t2?th=x", "u2")[1]) + + ## + ## dky + + # doesn't care about keys + zs = self.curl("dky?ls&k=ok", "u2")[1] + self.assertEqual(zs, self.curl("dky?ls", "u2")[1]) + zj = json.loads(zs) + self.assertEqual(len(zj["dirs"]), 0) + self.assertEqual(len(zj["files"]), 1) + self.assertEqual(zj["files"][0]["href"], "f.t1") + + ## + ## dky thumbs + + self.assertIn('">folder', self.curl("dky?th=x", "u1")[1]) + self.assertIn('">folder', self.curl("dky?th=x", "u2")[1]) + + zs = "dky?th=x&k=%s" % (dk["dk"]) + self.assertIn('">folder', self.curl(zs, "u2")[1]) + + # fk not enabled, so this should work + self.assertIn('">t1', self.curl("dky/f.t1?th=x", "u2")[1]) + self.assertIn('">t2', self.curl("dky/s1/f.t2?th=x", "u2")[1]) + + ## + ## dk+fk + + # should not be able to access dk with wrong dirkey, + zs = self.curl("dk,fk?ls&k=%s" % (dk["dk"]), "u2")[1] + self.assertEqual(zs, "\nJ2EOT") + # so use the right key + zs = self.curl("dk,fk?ls&k=%s" % (dk["dk,fk"]), "u2")[1] + zj = json.loads(zs) + self.assertEqual(len(zj["dirs"]), 0) + self.assertEqual(len(zj["files"]), 1) + self.assertEqual(zj["files"][0]["href"][:7], "f.t1?k=") + + ## + ## dk+fk thumbs + + self.assertIn('">folder', self.curl("dk,fk?th=x", "u1")[1]) + self.assertIn('">e403', self.curl("dk,fk?th=x", "u2")[1]) + + zs = "dk,fk?th=x&k=%s" % (dk["dk"]) + self.assertIn('">e403', self.curl(zs, "u2")[1]) + + zs = "dk,fk?th=x&k=%s" % (dk["dk,fk"]) + self.assertIn('">folder', self.curl(zs, "u2")[1]) + + # fk enabled, so this should fail + self.assertIn('">e404', self.curl("dk,fk/f.t1?th=x", "u2")[1]) + self.assertIn('">e404', self.curl("dk,fk/s1/f.t2?th=x", "u2")[1]) + + # but dk should return correct filekeys, so try that + zs = "dk,fk/%s&th=x" % (zj["files"][0]["href"]) + self.assertIn('">t1', self.curl(zs, "u2")[1]) + + ## + ## dks+fk + + # should not be able to access dk with wrong dirkey, + zs = self.curl("dks,fk?ls&k=%s" % (dk["dk"]), "u2")[1] + self.assertEqual(zs, "\nJ2EOT") + # so use the right key + zs = self.curl("dks,fk?ls&k=%s" % (dk["dks,fk"]), "u2")[1] + zj = json.loads(zs) + self.assertEqual(len(zj["dirs"]), 1) + self.assertEqual(len(zj["files"]), 1) + self.assertEqual(zj["dirs"][0]["href"][:6], "s1/?k=") + self.assertEqual(zj["files"][0]["href"][:7], "f.t1?k=") + + ## + ## dks+fk thumbs + + self.assertIn('">folder', self.curl("dks,fk?th=x", "u1")[1]) + self.assertIn('">e403', self.curl("dks,fk?th=x", "u2")[1]) + + zs = "dks,fk?th=x&k=%s" % (dk["dk"]) + self.assertIn('">e403', self.curl(zs, "u2")[1]) + + zs = "dks,fk?th=x&k=%s" % (dk["dks,fk"]) + self.assertIn('">folder', self.curl(zs, "u2")[1]) + + # subdir s1 without key + zs = "dks,fk/s1/?th=x" + self.assertIn('">e403', self.curl(zs, "u2")[1]) + + # subdir s1 with bad key + zs = "dks,fk/s1/?th=x&k=no" + self.assertIn('">e403', self.curl(zs, "u2")[1]) + + # subdir s1 with correct key + zs = "dks,fk/%s&th=x" % (zj["dirs"][0]["href"]) + self.assertIn('">folder', self.curl(zs, "u2")[1]) + + # fk enabled, so this should fail + self.assertIn('">e404', self.curl("dks,fk/f.t1?th=x", "u2")[1]) + self.assertIn('">e404', self.curl("dks,fk/s1/f.t2?th=x", "u2")[1]) + + # but dk should return correct filekeys, so try that + zs = "dks,fk/%s&th=x" % (zj["files"][0]["href"]) + self.assertIn('">t1', self.curl(zs, "u2")[1]) + + # subdir + self.assertIn('">e403', self.curl("dks,fk/s1/?th=x", "u2")[1]) + self.assertEqual("\nJ2EOT", self.curl("dks,fk/s1/?ls", "u2")[1]) + zs = "dks,fk/s1%s&th=x" % (zj["files"][0]["href"]) + zs = self.curl("dks,fk?ls&k=%s" % (dk["dks,fk"]), "u2")[1] + zj = json.loads(zs) + url = "dks,fk/%s" % zj["dirs"][0]["href"] + self.assertIn('"files"', self.curl(url + "&ls", "u2")[1]) + self.assertEqual("\nJ2EOT", self.curl(url + "x&ls", "u2")[1]) + def tardir(self, url, uname): top = url.split("?")[0] top = ("top" if not top else top.lstrip(".").split("/")[0]) + "/" diff --git a/tests/util.py b/tests/util.py index 56c78cae..aed6ca53 100644 --- a/tests/util.py +++ b/tests/util.py @@ -43,6 +43,7 @@ if MACOS: from copyparty.__init__ import E from copyparty.__main__ import init_E +from copyparty.ico import Ico from copyparty.u2idx import U2idx from copyparty.util import FHC, CachedDict, Garda, Unrecv @@ -161,6 +162,7 @@ class Cfg(Namespace): s_wr_sz=256 * 1024, sort="href", srch_hits=99999, + th_covers=["folder.png"], th_crop="y", th_size="320x256", th_x3="n", @@ -245,7 +247,7 @@ class VHttpConn(object): self.bans = {} self.freshen_pwd = 0.0 self.hsrv = VHttpSrv(args, asrv, log) - self.ico = None + self.ico = Ico(args) self.ipa_nm = None self.lf_url = None self.log_func = log