make thumbnails compatible with dirkeys/filekeys

was intentionally skipped to avoid complexity but enough people have
asked why it doesn't work that it's time to do something about it

turns out it wasn't that bad
This commit is contained in:
ed 2024-06-16 21:35:43 +02:00
parent 452592519d
commit 52e06226a2
4 changed files with 264 additions and 24 deletions

View file

@ -1918,7 +1918,7 @@ volflag `dk` generates dirkeys (per-directory accesskeys) for all folders, grant
volflag `dky` disables the actual key-check, meaning anyone can see the contents of a folder where they have `g` access, but not its subdirectories volflag `dky` disables the actual key-check, meaning anyone can see the contents of a folder where they have `g` access, but not its subdirectories
* `dk` + `dky` gives the same behavior as if all users with `g` access have full read-access, but subfolders are hidden files (their names start with a dot), so `dky` is an alternative to renaming all the folders for that purpose, maybe just for some users * `dk` + `dky` gives the same behavior as if all users with `g` access have full read-access, but subfolders are hidden files (as if their names start with a dot), so `dky` is an alternative to renaming all the folders for that purpose, maybe just for some users
volflag `dks` lets people enter subfolders as well, and also enables download-as-zip/tar volflag `dks` lets people enter subfolders as well, and also enables download-as-zip/tar

View file

@ -2032,7 +2032,7 @@ class HttpCli(object):
v = self.uparam[k] v = self.uparam[k]
if self._use_dirkey(): if self._use_dirkey(self.vn, ""):
vn = self.vn vn = self.vn
rem = self.rem rem = self.rem
else: else:
@ -2953,22 +2953,24 @@ class HttpCli(object):
return file_lastmod, True return file_lastmod, True
def _use_dirkey(self, ap: str = "") -> bool: def _use_dirkey(self, vn: VFS, ap: str) -> bool:
if self.can_read or not self.can_get: if self.can_read or not self.can_get:
return False return False
if self.vn.flags.get("dky"): if vn.flags.get("dky"):
return True return True
req = self.uparam.get("k") or "" req = self.uparam.get("k") or ""
if not req: if not req:
return False return False
dk_len = self.vn.flags.get("dk") dk_len = vn.flags.get("dk")
if not dk_len: if not dk_len:
return False return False
ap = ap or self.vn.canonical(self.rem) if not ap:
ap = vn.canonical(self.rem)
zs = self.gen_fk(2, self.args.dk_salt, ap, 0, 0)[:dk_len] zs = self.gen_fk(2, self.args.dk_salt, ap, 0, 0)[:dk_len]
if req == zs: if req == zs:
return True return True
@ -2977,6 +2979,34 @@ class HttpCli(object):
self.log(t % (zs, req, self.req, ap), 6) self.log(t % (zs, req, self.req, ap), 6)
return False return False
def _use_filekey(self, vn: VFS, ap: str, st: os.stat_result) -> bool:
if self.can_read or not self.can_get:
return False
req = self.uparam.get("k") or ""
if not req:
return False
fk_len = vn.flags.get("fk")
if not fk_len:
return False
if not ap:
ap = self.vn.canonical(self.rem)
alg = 2 if "fka" in vn.flags else 1
zs = self.gen_fk(
alg, self.args.fk_salt, ap, st.st_size, 0 if ANYWIN else st.st_ino
)[:fk_len]
if req == zs:
return True
t = "wrong filekey, want %s, got %s\n vp: %s\n ap: %s"
self.log(t % (zs, req, self.req, ap), 6)
return False
def _expand(self, txt: str, phs: list[str]) -> str: def _expand(self, txt: str, phs: list[str]) -> str:
for ph in phs: for ph in phs:
if ph.startswith("hdr."): if ph.startswith("hdr."):
@ -3864,7 +3894,7 @@ class HttpCli(object):
dk_sz = False dk_sz = False
if dk: if dk:
vn, rem = vfs.get(top, self.uname, False, False) vn, rem = vfs.get(top, self.uname, False, False)
if vn.flags.get("dks") and self._use_dirkey(vn.canonical(rem)): if vn.flags.get("dks") and self._use_dirkey(vn, vn.canonical(rem)):
dk_sz = vn.flags.get("dk") dk_sz = vn.flags.get("dk")
dots = False dots = False
@ -4188,9 +4218,20 @@ class HttpCli(object):
if idx and hasattr(idx, "p_end"): if idx and hasattr(idx, "p_end"):
icur = idx.get_cur(dbv) icur = idx.get_cur(dbv)
if "k" in self.uparam or "dky" in vn.flags:
if is_dir:
use_dirkey = self._use_dirkey(vn, abspath)
use_filekey = False
else:
use_filekey = self._use_filekey(vn, abspath, st)
use_dirkey = False
else:
use_dirkey = use_filekey = False
th_fmt = self.uparam.get("th") th_fmt = self.uparam.get("th")
if self.can_read or ( if self.can_read or (
self.can_get and (vn.flags.get("dk") or "fk" not in vn.flags) self.can_get
and (use_filekey or use_dirkey or (not is_dir and "fk" not in vn.flags))
): ):
if th_fmt is not None: if th_fmt is not None:
nothumb = "dthumb" in dbv.flags nothumb = "dthumb" in dbv.flags
@ -4270,18 +4311,7 @@ class HttpCli(object):
if not is_dir and (self.can_read or self.can_get): if not is_dir and (self.can_read or self.can_get):
if not self.can_read and not fk_pass and "fk" in vn.flags: if not self.can_read and not fk_pass and "fk" in vn.flags:
alg = 2 if "fka" in vn.flags else 1 if not use_filekey:
correct = self.gen_fk(
alg,
self.args.fk_salt,
abspath,
st.st_size,
0 if ANYWIN else st.st_ino,
)[: vn.flags["fk"]]
got = self.uparam.get("k")
if got != correct:
t = "wrong filekey, want %s, got %s\n vp: %s\n ap: %s"
self.log(t % (correct, got, self.req, abspath), 6)
return self.tx_404() return self.tx_404()
if add_og: if add_og:
@ -4312,7 +4342,7 @@ class HttpCli(object):
) )
elif is_dir and not self.can_read: elif is_dir and not self.can_read:
if self._use_dirkey(abspath): if use_dirkey:
is_dk = True is_dk = True
elif not self.can_write: elif not self.can_write:
return self.tx_404(True) return self.tx_404(True)

View file

@ -23,7 +23,7 @@ def hdr(query, uname):
return (h % (query, uname)).encode("utf-8") return (h % (query, uname)).encode("utf-8")
class TestHttpCli(unittest.TestCase): class TestDots(unittest.TestCase):
def setUp(self): def setUp(self):
self.td = tu.get_ramdisk() self.td = tu.get_ramdisk()
@ -31,7 +31,7 @@ class TestHttpCli(unittest.TestCase):
os.chdir(tempfile.gettempdir()) os.chdir(tempfile.gettempdir())
shutil.rmtree(self.td) shutil.rmtree(self.td)
def test(self): def test_dots(self):
td = os.path.join(self.td, "vfs") td = os.path.join(self.td, "vfs")
os.mkdir(td) os.mkdir(td)
os.chdir(td) os.chdir(td)
@ -118,6 +118,214 @@ class TestHttpCli(unittest.TestCase):
url = "v?k=" + zj["dk"] url = "v?k=" + zj["dk"]
self.assertEqual(self.tarsel(url, "u2", ["f1.txt", "a", ".b"]), "f1.txt") self.assertEqual(self.tarsel(url, "u2", ["f1.txt", "a", ".b"]), "f1.txt")
shutil.rmtree("v")
def test_dk_fk(self):
# python3 -m unittest tests.test_dots.TestDots.test_dk_fk
td = os.path.join(self.td, "vfs")
os.mkdir(td)
os.chdir(td)
vcfg = []
for k in "dk dks dky fk fka dk,fk dks,fk".split():
vcfg += ["{0}:{0}:r.,u1:g,u2:c,{0}".format(k)]
zs = "%s/s1/s2" % (k,)
os.makedirs(zs)
with open("%s/f.t1" % (k,), "wb") as f:
f.write(b"f1")
with open("%s/s1/f.t2" % (k,), "wb") as f:
f.write(b"f2")
with open("%s/s1/s2/f.t3" % (k,), "wb") as f:
f.write(b"f3")
self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"])
self.asrv = AuthSrv(self.args, self.log)
dk = {}
for d in "dk dks dk,fk dks,fk".split():
zj = json.loads(self.curl("%s?ls" % (d,), "u1")[1])
dk[d] = zj["dk"]
##
## dk
# should not be able to access dk with wrong dirkey,
zs = self.curl("dk?ls&k=%s" % (dk["dks"]), "u2")[1]
self.assertEqual(zs, "\nJ2EOT")
# so use the right key
zs = self.curl("dk?ls&k=%s" % (dk["dk"]), "u2")[1]
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 0)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["files"][0]["href"], "f.t1")
##
## dk thumbs
self.assertIn('">folder</text>', self.curl("dk?th=x", "u1")[1])
self.assertIn('">e403</text>', self.curl("dk?th=x", "u2")[1])
zs = "dk?th=x&k=%s" % (dk["dks"])
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
zs = "dk?th=x&k=%s" % (dk["dk"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk not enabled, so this should work
self.assertIn('">t1</text>', self.curl("dk/f.t1?th=x", "u2")[1])
self.assertIn('">t2</text>', self.curl("dk/s1/f.t2?th=x", "u2")[1])
##
## dks
# should not be able to access dks with wrong dirkey,
zs = self.curl("dks?ls&k=%s" % (dk["dk"]), "u2")[1]
self.assertEqual(zs, "\nJ2EOT")
# so use the right key
zs = self.curl("dks?ls&k=%s" % (dk["dks"]), "u2")[1]
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 1)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["files"][0]["href"], "f.t1")
# dks should return correct dirkey of subfolders;
s1 = zj["dirs"][0]["href"]
self.assertEqual(s1.split("/")[0], "s1")
zs = self.curl("dks/%s&ls" % (s1), "u2")[1]
self.assertIn('"s2/?k=', zs)
##
## dks thumbs
self.assertIn('">folder</text>', self.curl("dks?th=x", "u1")[1])
self.assertIn('">e403</text>', self.curl("dks?th=x", "u2")[1])
zs = "dks?th=x&k=%s" % (dk["dk"])
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
zs = "dks?th=x&k=%s" % (dk["dks"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk not enabled, so this should work
self.assertIn('">t1</text>', self.curl("dks/f.t1?th=x", "u2")[1])
self.assertIn('">t2</text>', self.curl("dks/s1/f.t2?th=x", "u2")[1])
##
## dky
# doesn't care about keys
zs = self.curl("dky?ls&k=ok", "u2")[1]
self.assertEqual(zs, self.curl("dky?ls", "u2")[1])
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 0)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["files"][0]["href"], "f.t1")
##
## dky thumbs
self.assertIn('">folder</text>', self.curl("dky?th=x", "u1")[1])
self.assertIn('">folder</text>', self.curl("dky?th=x", "u2")[1])
zs = "dky?th=x&k=%s" % (dk["dk"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk not enabled, so this should work
self.assertIn('">t1</text>', self.curl("dky/f.t1?th=x", "u2")[1])
self.assertIn('">t2</text>', self.curl("dky/s1/f.t2?th=x", "u2")[1])
##
## dk+fk
# should not be able to access dk with wrong dirkey,
zs = self.curl("dk,fk?ls&k=%s" % (dk["dk"]), "u2")[1]
self.assertEqual(zs, "\nJ2EOT")
# so use the right key
zs = self.curl("dk,fk?ls&k=%s" % (dk["dk,fk"]), "u2")[1]
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 0)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["files"][0]["href"][:7], "f.t1?k=")
##
## dk+fk thumbs
self.assertIn('">folder</text>', self.curl("dk,fk?th=x", "u1")[1])
self.assertIn('">e403</text>', self.curl("dk,fk?th=x", "u2")[1])
zs = "dk,fk?th=x&k=%s" % (dk["dk"])
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
zs = "dk,fk?th=x&k=%s" % (dk["dk,fk"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk enabled, so this should fail
self.assertIn('">e404</text>', self.curl("dk,fk/f.t1?th=x", "u2")[1])
self.assertIn('">e404</text>', self.curl("dk,fk/s1/f.t2?th=x", "u2")[1])
# but dk should return correct filekeys, so try that
zs = "dk,fk/%s&th=x" % (zj["files"][0]["href"])
self.assertIn('">t1</text>', self.curl(zs, "u2")[1])
##
## dks+fk
# should not be able to access dk with wrong dirkey,
zs = self.curl("dks,fk?ls&k=%s" % (dk["dk"]), "u2")[1]
self.assertEqual(zs, "\nJ2EOT")
# so use the right key
zs = self.curl("dks,fk?ls&k=%s" % (dk["dks,fk"]), "u2")[1]
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 1)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["dirs"][0]["href"][:6], "s1/?k=")
self.assertEqual(zj["files"][0]["href"][:7], "f.t1?k=")
##
## dks+fk thumbs
self.assertIn('">folder</text>', self.curl("dks,fk?th=x", "u1")[1])
self.assertIn('">e403</text>', self.curl("dks,fk?th=x", "u2")[1])
zs = "dks,fk?th=x&k=%s" % (dk["dk"])
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
zs = "dks,fk?th=x&k=%s" % (dk["dks,fk"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# subdir s1 without key
zs = "dks,fk/s1/?th=x"
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
# subdir s1 with bad key
zs = "dks,fk/s1/?th=x&k=no"
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
# subdir s1 with correct key
zs = "dks,fk/%s&th=x" % (zj["dirs"][0]["href"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk enabled, so this should fail
self.assertIn('">e404</text>', self.curl("dks,fk/f.t1?th=x", "u2")[1])
self.assertIn('">e404</text>', self.curl("dks,fk/s1/f.t2?th=x", "u2")[1])
# but dk should return correct filekeys, so try that
zs = "dks,fk/%s&th=x" % (zj["files"][0]["href"])
self.assertIn('">t1</text>', self.curl(zs, "u2")[1])
# subdir
self.assertIn('">e403</text>', self.curl("dks,fk/s1/?th=x", "u2")[1])
self.assertEqual("\nJ2EOT", self.curl("dks,fk/s1/?ls", "u2")[1])
zs = "dks,fk/s1%s&th=x" % (zj["files"][0]["href"])
zs = self.curl("dks,fk?ls&k=%s" % (dk["dks,fk"]), "u2")[1]
zj = json.loads(zs)
url = "dks,fk/%s" % zj["dirs"][0]["href"]
self.assertIn('"files"', self.curl(url + "&ls", "u2")[1])
self.assertEqual("\nJ2EOT", self.curl(url + "x&ls", "u2")[1])
def tardir(self, url, uname): def tardir(self, url, uname):
top = url.split("?")[0] top = url.split("?")[0]
top = ("top" if not top else top.lstrip(".").split("/")[0]) + "/" top = ("top" if not top else top.lstrip(".").split("/")[0]) + "/"

View file

@ -43,6 +43,7 @@ if MACOS:
from copyparty.__init__ import E from copyparty.__init__ import E
from copyparty.__main__ import init_E from copyparty.__main__ import init_E
from copyparty.ico import Ico
from copyparty.u2idx import U2idx from copyparty.u2idx import U2idx
from copyparty.util import FHC, CachedDict, Garda, Unrecv from copyparty.util import FHC, CachedDict, Garda, Unrecv
@ -161,6 +162,7 @@ class Cfg(Namespace):
s_wr_sz=256 * 1024, s_wr_sz=256 * 1024,
sort="href", sort="href",
srch_hits=99999, srch_hits=99999,
th_covers=["folder.png"],
th_crop="y", th_crop="y",
th_size="320x256", th_size="320x256",
th_x3="n", th_x3="n",
@ -245,7 +247,7 @@ class VHttpConn(object):
self.bans = {} self.bans = {}
self.freshen_pwd = 0.0 self.freshen_pwd = 0.0
self.hsrv = VHttpSrv(args, asrv, log) self.hsrv = VHttpSrv(args, asrv, log)
self.ico = None self.ico = Ico(args)
self.ipa_nm = None self.ipa_nm = None
self.lf_url = None self.lf_url = None
self.log_func = log self.log_func = log