copyparty/tests/test_dots.py
ed 2ce8233921 webdav: auth-challenge clients correctly:
* return 403 instead of 404 in the following sitations:
  * viewing an RSS feed without necessary auth
  * accessing a file with the wrong filekey
  * accessing a file/folder without necessary auth
     (would previously 404 for intentional ambiguity)

* only allow PROPFIND if user has either read or write;
   previously a blank response was returned if user has
   get-access, but this could confuse webdav clients into
   skipping authentication (for example AuthPass)

* return 401 basic-challenge instead of 403 if the client
   appears to be non-graphical, because many webdav clients
   do not provide the credentials until they're challenged.
   There is a heavy bias towards assuming the client is a
   browser, because browsers must NEVER EVER get a 401
   (tricky state that is near-impossible to deal with)

* return 401 basic-challenge instead of 403 if a PUT
   is attempted without any credentials included; this
   should be safe, as graphical browsers never do that

this fixes the interoperability issues mentioned in
https://github.com/authpass/authpass/issues/379
where AuthPass would GET files without providing the
password because it expected a 401 instead of a 403;
AuthPass is behaving correctly, this is not a bug
2024-11-27 22:07:53 +00:00

385 lines
13 KiB
Python

#!/usr/bin/env python3
# coding: utf-8
from __future__ import print_function, unicode_literals
import io
import json
import os
import shutil
import tarfile
import tempfile
import unittest
from copyparty.authsrv import AuthSrv
from copyparty.httpcli import HttpCli
from copyparty.u2idx import U2idx
from copyparty.up2k import Up2k
from tests import util as tu
from tests.util import Cfg
try:
from typing import Optional
except:
pass
def hdr(query, uname):
h = "GET /%s HTTP/1.1\r\nPW: %s\r\nConnection: close\r\n\r\n"
return (h % (query, uname)).encode("utf-8")
class TestDots(unittest.TestCase):
def __init__(self, *a, **ka):
super(TestDots, self).__init__(*a, **ka)
self.is_dut = True
def setUp(self):
self.conn: Optional[tu.VHttpConn] = None
self.td = tu.get_ramdisk()
def tearDown(self):
if self.conn:
self.conn.shutdown()
os.chdir(tempfile.gettempdir())
shutil.rmtree(self.td)
def cinit(self):
if self.conn:
self.conn.shutdown()
self.conn = None
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
def test_dots(self):
td = os.path.join(self.td, "vfs")
os.mkdir(td)
os.chdir(td)
# topDir volA volA/*dirA .volB .volB/*dirB
spaths = " t .t a a/da a/.da .b .b/db .b/.db"
for n, dirpath in enumerate(spaths.split(" ")):
if dirpath:
os.makedirs(dirpath)
for pfx in "f", ".f":
filepath = pfx + str(n)
if dirpath:
filepath = os.path.join(dirpath, filepath)
with open(filepath, "wb") as f:
f.write(filepath.encode("utf-8"))
vcfg = [".::r,u1:r.,u2", "a:a:r,u1:r,u2", ".b:.b:r.,u1:r,u2"]
self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"], e2dsa=True)
self.asrv = AuthSrv(self.args, self.log)
self.cinit()
self.assertEqual(self.tardir("", "u1"), "f0 t/f1 a/f3 a/da/f4")
self.assertEqual(self.tardir(".t", "u1"), "f2")
self.assertEqual(self.tardir(".b", "u1"), ".f6 f6 .db/.f8 .db/f8 db/.f7 db/f7")
zs = ".f0 f0 .t/.f2 .t/f2 t/.f1 t/f1 .b/f6 .b/db/f7 a/f3 a/da/f4"
self.assertEqual(self.tardir("", "u2"), zs)
self.assertEqual(self.curl("?tar", "x")[1][:17], "\nJ2EOT")
##
## search
up2k = Up2k(self)
u2idx = U2idx(self)
allvols = list(self.asrv.vfs.all_vols.values())
x = u2idx.search("u1", allvols, "", 999)
x = " ".join(sorted([x["rp"] for x in x[0]]))
# u1 can see dotfiles in volB so they should be included
xe = ".b/.db/.f8 .b/.db/f8 .b/.f6 .b/db/.f7 .b/db/f7 .b/f6 a/da/f4 a/f3 f0 t/f1"
self.assertEqual(x, xe)
x = u2idx.search("u2", allvols, "", 999)
x = " ".join(sorted([x["rp"] for x in x[0]]))
self.assertEqual(x, ".f0 .t/.f2 .t/f2 a/da/f4 a/f3 f0 t/.f1 t/f1")
u2idx.shutdown()
self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"], dotsrch=False, e2d=True)
self.asrv = AuthSrv(self.args, self.log)
u2idx = U2idx(self)
self.cinit()
x = u2idx.search("u1", self.asrv.vfs.all_vols.values(), "", 999)
x = " ".join(sorted([x["rp"] for x in x[0]]))
# u1 can see dotfiles in volB so they should be included
xe = "a/da/f4 a/f3 f0 t/f1"
self.assertEqual(x, xe)
u2idx.shutdown()
up2k.shutdown()
##
## dirkeys
os.mkdir("v")
with open("v/f1.txt", "wb") as f:
f.write(b"a")
os.rename("a", "v/a")
os.rename(".b", "v/.b")
vcfg = [
".::r.,u1:g,u2:c,dks",
"v/a:v/a:r.,u1:g,u2:c,dks",
"v/.b:v/.b:r.,u1:g,u2:c,dks",
]
self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"])
self.asrv = AuthSrv(self.args, self.log)
self.cinit()
zj = json.loads(self.curl("?ls", "u1")[1])
url = "?k=" + zj["dk"]
# should descend into folders, but not other volumes:
self.assertEqual(self.tardir(url, "u2"), "f0 t/f1 v/f1.txt")
zj = json.loads(self.curl("v?ls", "u1")[1])
url = "v?k=" + zj["dk"]
self.assertEqual(self.tarsel(url, "u2", ["f1.txt", "a", ".b"]), "f1.txt")
shutil.rmtree("v")
def test_dk_fk(self):
# python3 -m unittest tests.test_dots.TestDots.test_dk_fk
td = os.path.join(self.td, "vfs")
os.mkdir(td)
os.chdir(td)
vcfg = []
for k in "dk dks dky fk fka dk,fk dks,fk".split():
vcfg += ["{0}:{0}:r.,u1:g,u2:c,{0}".format(k)]
zs = "%s/s1/s2" % (k,)
os.makedirs(zs)
with open("%s/f.t1" % (k,), "wb") as f:
f.write(b"f1")
with open("%s/s1/f.t2" % (k,), "wb") as f:
f.write(b"f2")
with open("%s/s1/s2/f.t3" % (k,), "wb") as f:
f.write(b"f3")
self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"])
self.asrv = AuthSrv(self.args, self.log)
self.cinit()
dk = {}
for d in "dk dks dk,fk dks,fk".split():
zj = json.loads(self.curl("%s?ls" % (d,), "u1")[1])
dk[d] = zj["dk"]
##
## dk
# should not be able to access dk with wrong dirkey,
zs = self.curl("dk?ls&k=%s" % (dk["dks"]), "u2")[1]
self.assertEqual(zs, "\nJ2EOT")
# so use the right key
zs = self.curl("dk?ls&k=%s" % (dk["dk"]), "u2")[1]
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 0)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["files"][0]["href"], "f.t1")
##
## dk thumbs
self.assertIn('">folder</text>', self.curl("dk?th=x", "u1")[1])
self.assertIn('">e403</text>', self.curl("dk?th=x", "u2")[1])
zs = "dk?th=x&k=%s" % (dk["dks"])
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
zs = "dk?th=x&k=%s" % (dk["dk"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk not enabled, so this should work
self.assertIn('">t1</text>', self.curl("dk/f.t1?th=x", "u2")[1])
self.assertIn('">t2</text>', self.curl("dk/s1/f.t2?th=x", "u2")[1])
##
## dks
# should not be able to access dks with wrong dirkey,
zs = self.curl("dks?ls&k=%s" % (dk["dk"]), "u2")[1]
self.assertEqual(zs, "\nJ2EOT")
# so use the right key
zs = self.curl("dks?ls&k=%s" % (dk["dks"]), "u2")[1]
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 1)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["files"][0]["href"], "f.t1")
# dks should return correct dirkey of subfolders;
s1 = zj["dirs"][0]["href"]
self.assertEqual(s1.split("/")[0], "s1")
zs = self.curl("dks/%s&ls" % (s1), "u2")[1]
self.assertIn('"s2/?k=', zs)
##
## dks thumbs
self.assertIn('">folder</text>', self.curl("dks?th=x", "u1")[1])
self.assertIn('">e403</text>', self.curl("dks?th=x", "u2")[1])
zs = "dks?th=x&k=%s" % (dk["dk"])
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
zs = "dks?th=x&k=%s" % (dk["dks"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk not enabled, so this should work
self.assertIn('">t1</text>', self.curl("dks/f.t1?th=x", "u2")[1])
self.assertIn('">t2</text>', self.curl("dks/s1/f.t2?th=x", "u2")[1])
##
## dky
# doesn't care about keys
zs = self.curl("dky?ls&k=ok", "u2")[1]
self.assertEqual(zs, self.curl("dky?ls", "u2")[1])
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 0)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["files"][0]["href"], "f.t1")
##
## dky thumbs
self.assertIn('">folder</text>', self.curl("dky?th=x", "u1")[1])
self.assertIn('">folder</text>', self.curl("dky?th=x", "u2")[1])
zs = "dky?th=x&k=%s" % (dk["dk"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk not enabled, so this should work
self.assertIn('">t1</text>', self.curl("dky/f.t1?th=x", "u2")[1])
self.assertIn('">t2</text>', self.curl("dky/s1/f.t2?th=x", "u2")[1])
##
## dk+fk
# should not be able to access dk with wrong dirkey,
zs = self.curl("dk,fk?ls&k=%s" % (dk["dk"]), "u2")[1]
self.assertEqual(zs, "\nJ2EOT")
# so use the right key
zs = self.curl("dk,fk?ls&k=%s" % (dk["dk,fk"]), "u2")[1]
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 0)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["files"][0]["href"][:7], "f.t1?k=")
##
## dk+fk thumbs
self.assertIn('">folder</text>', self.curl("dk,fk?th=x", "u1")[1])
self.assertIn('">e403</text>', self.curl("dk,fk?th=x", "u2")[1])
zs = "dk,fk?th=x&k=%s" % (dk["dk"])
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
zs = "dk,fk?th=x&k=%s" % (dk["dk,fk"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk enabled, so this should fail
self.assertIn('">e403</text>', self.curl("dk,fk/f.t1?th=x", "u2")[1])
self.assertIn('">e403</text>', self.curl("dk,fk/s1/f.t2?th=x", "u2")[1])
# but dk should return correct filekeys, so try that
zs = "dk,fk/%s&th=x" % (zj["files"][0]["href"])
self.assertIn('">t1</text>', self.curl(zs, "u2")[1])
##
## dks+fk
# should not be able to access dk with wrong dirkey,
zs = self.curl("dks,fk?ls&k=%s" % (dk["dk"]), "u2")[1]
self.assertEqual(zs, "\nJ2EOT")
# so use the right key
zs = self.curl("dks,fk?ls&k=%s" % (dk["dks,fk"]), "u2")[1]
zj = json.loads(zs)
self.assertEqual(len(zj["dirs"]), 1)
self.assertEqual(len(zj["files"]), 1)
self.assertEqual(zj["dirs"][0]["href"][:6], "s1/?k=")
self.assertEqual(zj["files"][0]["href"][:7], "f.t1?k=")
##
## dks+fk thumbs
self.assertIn('">folder</text>', self.curl("dks,fk?th=x", "u1")[1])
self.assertIn('">e403</text>', self.curl("dks,fk?th=x", "u2")[1])
zs = "dks,fk?th=x&k=%s" % (dk["dk"])
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
zs = "dks,fk?th=x&k=%s" % (dk["dks,fk"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# subdir s1 without key
zs = "dks,fk/s1/?th=x"
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
# subdir s1 with bad key
zs = "dks,fk/s1/?th=x&k=no"
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
# subdir s1 with correct key
zs = "dks,fk/%s&th=x" % (zj["dirs"][0]["href"])
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
# fk enabled, so this should fail
self.assertIn('">e403</text>', self.curl("dks,fk/f.t1?th=x", "u2")[1])
self.assertIn('">e403</text>', self.curl("dks,fk/s1/f.t2?th=x", "u2")[1])
# but dk should return correct filekeys, so try that
zs = "dks,fk/%s&th=x" % (zj["files"][0]["href"])
self.assertIn('">t1</text>', self.curl(zs, "u2")[1])
# subdir
self.assertIn('">e403</text>', self.curl("dks,fk/s1/?th=x", "u2")[1])
self.assertEqual("\nJ2EOT", self.curl("dks,fk/s1/?ls", "u2")[1])
zs = "dks,fk/s1%s&th=x" % (zj["files"][0]["href"])
zs = self.curl("dks,fk?ls&k=%s" % (dk["dks,fk"]), "u2")[1]
zj = json.loads(zs)
url = "dks,fk/%s" % zj["dirs"][0]["href"]
self.assertIn('"files"', self.curl(url + "&ls", "u2")[1])
self.assertEqual("\nJ2EOT", self.curl(url + "x&ls", "u2")[1])
def tardir(self, url, uname):
top = url.split("?")[0]
top = ("top" if not top else top.lstrip(".").split("/")[0]) + "/"
url += ("&" if "?" in url else "?") + "tar"
h, b = self.curl(url, uname, True)
tar = tarfile.open(fileobj=io.BytesIO(b), mode="r|").getnames()
if len(tar) != len([x for x in tar if x.startswith(top)]):
raise Exception("bad-prefix:", tar)
return " ".join([x[len(top) :] for x in tar])
def tarsel(self, url, uname, sel):
url += ("&" if "?" in url else "?") + "tar"
zs = '--XD\r\nContent-Disposition: form-data; name="act"\r\n\r\nzip\r\n--XD\r\nContent-Disposition: form-data; name="files"\r\n\r\n'
zs += "\r\n".join(sel) + "\r\n--XD--\r\n"
zb = zs.encode("utf-8")
hdr = "POST /%s HTTP/1.1\r\nPW: %s\r\nConnection: close\r\nContent-Type: multipart/form-data; boundary=XD\r\nContent-Length: %d\r\n\r\n"
req = (hdr % (url, uname, len(zb))).encode("utf-8") + zb
h, b = self.curl("/" + url, uname, True, req)
tar = tarfile.open(fileobj=io.BytesIO(b), mode="r|").getnames()
return " ".join(tar)
def curl(self, url, uname, binary=False, req=b""):
req = req or hdr(url, uname)
conn = self.conn.setbuf(req)
HttpCli(conn).run()
if binary:
h, b = conn.s._reply.split(b"\r\n\r\n", 1)
return [h.decode("utf-8"), b]
return conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
def log(self, src, msg, c=0):
print(msg, "\033[0m")