mirror of
https://github.com/9001/copyparty.git
synced 2025-08-17 09:02:15 -06:00
385 lines
13 KiB
Python
385 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
# coding: utf-8
|
|
from __future__ import print_function, unicode_literals
|
|
|
|
import io
|
|
import json
|
|
import os
|
|
import shutil
|
|
import tarfile
|
|
import tempfile
|
|
import unittest
|
|
|
|
from copyparty.authsrv import AuthSrv
|
|
from copyparty.httpcli import HttpCli
|
|
from copyparty.u2idx import U2idx
|
|
from copyparty.up2k import Up2k
|
|
from tests import util as tu
|
|
from tests.util import Cfg
|
|
|
|
try:
|
|
from typing import Optional
|
|
except:
|
|
pass
|
|
|
|
|
|
def hdr(query, uname):
|
|
h = "GET /%s HTTP/1.1\r\nPW: %s\r\nConnection: close\r\n\r\n"
|
|
return (h % (query, uname)).encode("utf-8")
|
|
|
|
|
|
class TestDots(unittest.TestCase):
|
|
def __init__(self, *a, **ka):
|
|
super(TestDots, self).__init__(*a, **ka)
|
|
self.is_dut = True
|
|
|
|
def setUp(self):
|
|
self.conn: Optional[tu.VHttpConn] = None
|
|
self.td = tu.get_ramdisk()
|
|
|
|
def tearDown(self):
|
|
if self.conn:
|
|
self.conn.shutdown()
|
|
os.chdir(tempfile.gettempdir())
|
|
shutil.rmtree(self.td)
|
|
|
|
def cinit(self):
|
|
if self.conn:
|
|
self.conn.shutdown()
|
|
self.conn = None
|
|
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"")
|
|
|
|
def test_dots(self):
|
|
td = os.path.join(self.td, "vfs")
|
|
os.mkdir(td)
|
|
os.chdir(td)
|
|
|
|
# topDir volA volA/*dirA .volB .volB/*dirB
|
|
spaths = " t .t a a/da a/.da .b .b/db .b/.db"
|
|
for n, dirpath in enumerate(spaths.split(" ")):
|
|
if dirpath:
|
|
os.makedirs(dirpath)
|
|
|
|
for pfx in "f", ".f":
|
|
filepath = pfx + str(n)
|
|
if dirpath:
|
|
filepath = os.path.join(dirpath, filepath)
|
|
|
|
with open(filepath, "wb") as f:
|
|
f.write(filepath.encode("utf-8"))
|
|
|
|
vcfg = [".::r,u1:r.,u2", "a:a:r,u1:r,u2", ".b:.b:r.,u1:r,u2"]
|
|
self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"], e2dsa=True)
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
self.cinit()
|
|
|
|
self.assertEqual(self.tardir("", "u1"), "f0 t/f1 a/f3 a/da/f4")
|
|
self.assertEqual(self.tardir(".t", "u1"), "f2")
|
|
self.assertEqual(self.tardir(".b", "u1"), ".f6 f6 .db/.f8 .db/f8 db/.f7 db/f7")
|
|
|
|
zs = ".f0 f0 .t/.f2 .t/f2 t/.f1 t/f1 .b/f6 .b/db/f7 a/f3 a/da/f4"
|
|
self.assertEqual(self.tardir("", "u2"), zs)
|
|
|
|
self.assertEqual(self.curl("?tar", "x")[1][:17], "\nJ2EOT")
|
|
|
|
##
|
|
## search
|
|
|
|
up2k = Up2k(self)
|
|
u2idx = U2idx(self)
|
|
allvols = list(self.asrv.vfs.all_vols.values())
|
|
|
|
x = u2idx.search("u1", allvols, "", 999)
|
|
x = " ".join(sorted([x["rp"] for x in x[0]]))
|
|
# u1 can see dotfiles in volB so they should be included
|
|
xe = ".b/.db/.f8 .b/.db/f8 .b/.f6 .b/db/.f7 .b/db/f7 .b/f6 a/da/f4 a/f3 f0 t/f1"
|
|
self.assertEqual(x, xe)
|
|
|
|
x = u2idx.search("u2", allvols, "", 999)
|
|
x = " ".join(sorted([x["rp"] for x in x[0]]))
|
|
self.assertEqual(x, ".f0 .t/.f2 .t/f2 a/da/f4 a/f3 f0 t/.f1 t/f1")
|
|
|
|
u2idx.shutdown()
|
|
self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"], dotsrch=False, e2d=True)
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
u2idx = U2idx(self)
|
|
self.cinit()
|
|
|
|
x = u2idx.search("u1", self.asrv.vfs.all_vols.values(), "", 999)
|
|
x = " ".join(sorted([x["rp"] for x in x[0]]))
|
|
# u1 can see dotfiles in volB so they should be included
|
|
xe = "a/da/f4 a/f3 f0 t/f1"
|
|
self.assertEqual(x, xe)
|
|
u2idx.shutdown()
|
|
up2k.shutdown()
|
|
|
|
##
|
|
## dirkeys
|
|
|
|
os.mkdir("v")
|
|
with open("v/f1.txt", "wb") as f:
|
|
f.write(b"a")
|
|
os.rename("a", "v/a")
|
|
os.rename(".b", "v/.b")
|
|
|
|
vcfg = [
|
|
".::r.,u1:g,u2:c,dks",
|
|
"v/a:v/a:r.,u1:g,u2:c,dks",
|
|
"v/.b:v/.b:r.,u1:g,u2:c,dks",
|
|
]
|
|
self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"])
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
self.cinit()
|
|
|
|
zj = json.loads(self.curl("?ls", "u1")[1])
|
|
url = "?k=" + zj["dk"]
|
|
# should descend into folders, but not other volumes:
|
|
self.assertEqual(self.tardir(url, "u2"), "f0 t/f1 v/f1.txt")
|
|
|
|
zj = json.loads(self.curl("v?ls", "u1")[1])
|
|
url = "v?k=" + zj["dk"]
|
|
self.assertEqual(self.tarsel(url, "u2", ["f1.txt", "a", ".b"]), "f1.txt")
|
|
|
|
shutil.rmtree("v")
|
|
|
|
def test_dk_fk(self):
|
|
# python3 -m unittest tests.test_dots.TestDots.test_dk_fk
|
|
|
|
td = os.path.join(self.td, "vfs")
|
|
os.mkdir(td)
|
|
os.chdir(td)
|
|
|
|
vcfg = []
|
|
for k in "dk dks dky fk fka dk,fk dks,fk".split():
|
|
vcfg += ["{0}:{0}:r.,u1:g,u2:c,{0}".format(k)]
|
|
zs = "%s/s1/s2" % (k,)
|
|
os.makedirs(zs)
|
|
|
|
with open("%s/f.t1" % (k,), "wb") as f:
|
|
f.write(b"f1")
|
|
|
|
with open("%s/s1/f.t2" % (k,), "wb") as f:
|
|
f.write(b"f2")
|
|
|
|
with open("%s/s1/s2/f.t3" % (k,), "wb") as f:
|
|
f.write(b"f3")
|
|
|
|
self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"])
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
self.cinit()
|
|
|
|
dk = {}
|
|
for d in "dk dks dk,fk dks,fk".split():
|
|
zj = json.loads(self.curl("%s?ls" % (d,), "u1")[1])
|
|
dk[d] = zj["dk"]
|
|
|
|
##
|
|
## dk
|
|
|
|
# should not be able to access dk with wrong dirkey,
|
|
zs = self.curl("dk?ls&k=%s" % (dk["dks"]), "u2")[1]
|
|
self.assertEqual(zs, "\nJ2EOT")
|
|
# so use the right key
|
|
zs = self.curl("dk?ls&k=%s" % (dk["dk"]), "u2")[1]
|
|
zj = json.loads(zs)
|
|
self.assertEqual(len(zj["dirs"]), 0)
|
|
self.assertEqual(len(zj["files"]), 1)
|
|
self.assertEqual(zj["files"][0]["href"], "f.t1")
|
|
|
|
##
|
|
## dk thumbs
|
|
|
|
self.assertIn('">folder</text>', self.curl("dk?th=x", "u1")[1])
|
|
self.assertIn('">e403</text>', self.curl("dk?th=x", "u2")[1])
|
|
|
|
zs = "dk?th=x&k=%s" % (dk["dks"])
|
|
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
|
|
|
|
zs = "dk?th=x&k=%s" % (dk["dk"])
|
|
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
|
|
|
|
# fk not enabled, so this should work
|
|
self.assertIn('">t1</text>', self.curl("dk/f.t1?th=x", "u2")[1])
|
|
self.assertIn('">t2</text>', self.curl("dk/s1/f.t2?th=x", "u2")[1])
|
|
|
|
##
|
|
## dks
|
|
|
|
# should not be able to access dks with wrong dirkey,
|
|
zs = self.curl("dks?ls&k=%s" % (dk["dk"]), "u2")[1]
|
|
self.assertEqual(zs, "\nJ2EOT")
|
|
# so use the right key
|
|
zs = self.curl("dks?ls&k=%s" % (dk["dks"]), "u2")[1]
|
|
zj = json.loads(zs)
|
|
self.assertEqual(len(zj["dirs"]), 1)
|
|
self.assertEqual(len(zj["files"]), 1)
|
|
self.assertEqual(zj["files"][0]["href"], "f.t1")
|
|
# dks should return correct dirkey of subfolders;
|
|
s1 = zj["dirs"][0]["href"]
|
|
self.assertEqual(s1.split("/")[0], "s1")
|
|
zs = self.curl("dks/%s&ls" % (s1), "u2")[1]
|
|
self.assertIn('"s2/?k=', zs)
|
|
|
|
##
|
|
## dks thumbs
|
|
|
|
self.assertIn('">folder</text>', self.curl("dks?th=x", "u1")[1])
|
|
self.assertIn('">e403</text>', self.curl("dks?th=x", "u2")[1])
|
|
|
|
zs = "dks?th=x&k=%s" % (dk["dk"])
|
|
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
|
|
|
|
zs = "dks?th=x&k=%s" % (dk["dks"])
|
|
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
|
|
|
|
# fk not enabled, so this should work
|
|
self.assertIn('">t1</text>', self.curl("dks/f.t1?th=x", "u2")[1])
|
|
self.assertIn('">t2</text>', self.curl("dks/s1/f.t2?th=x", "u2")[1])
|
|
|
|
##
|
|
## dky
|
|
|
|
# doesn't care about keys
|
|
zs = self.curl("dky?ls&k=ok", "u2")[1]
|
|
self.assertEqual(zs, self.curl("dky?ls", "u2")[1])
|
|
zj = json.loads(zs)
|
|
self.assertEqual(len(zj["dirs"]), 0)
|
|
self.assertEqual(len(zj["files"]), 1)
|
|
self.assertEqual(zj["files"][0]["href"], "f.t1")
|
|
|
|
##
|
|
## dky thumbs
|
|
|
|
self.assertIn('">folder</text>', self.curl("dky?th=x", "u1")[1])
|
|
self.assertIn('">folder</text>', self.curl("dky?th=x", "u2")[1])
|
|
|
|
zs = "dky?th=x&k=%s" % (dk["dk"])
|
|
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
|
|
|
|
# fk not enabled, so this should work
|
|
self.assertIn('">t1</text>', self.curl("dky/f.t1?th=x", "u2")[1])
|
|
self.assertIn('">t2</text>', self.curl("dky/s1/f.t2?th=x", "u2")[1])
|
|
|
|
##
|
|
## dk+fk
|
|
|
|
# should not be able to access dk with wrong dirkey,
|
|
zs = self.curl("dk,fk?ls&k=%s" % (dk["dk"]), "u2")[1]
|
|
self.assertEqual(zs, "\nJ2EOT")
|
|
# so use the right key
|
|
zs = self.curl("dk,fk?ls&k=%s" % (dk["dk,fk"]), "u2")[1]
|
|
zj = json.loads(zs)
|
|
self.assertEqual(len(zj["dirs"]), 0)
|
|
self.assertEqual(len(zj["files"]), 1)
|
|
self.assertEqual(zj["files"][0]["href"][:7], "f.t1?k=")
|
|
|
|
##
|
|
## dk+fk thumbs
|
|
|
|
self.assertIn('">folder</text>', self.curl("dk,fk?th=x", "u1")[1])
|
|
self.assertIn('">e403</text>', self.curl("dk,fk?th=x", "u2")[1])
|
|
|
|
zs = "dk,fk?th=x&k=%s" % (dk["dk"])
|
|
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
|
|
|
|
zs = "dk,fk?th=x&k=%s" % (dk["dk,fk"])
|
|
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
|
|
|
|
# fk enabled, so this should fail
|
|
self.assertIn('">e403</text>', self.curl("dk,fk/f.t1?th=x", "u2")[1])
|
|
self.assertIn('">e403</text>', self.curl("dk,fk/s1/f.t2?th=x", "u2")[1])
|
|
|
|
# but dk should return correct filekeys, so try that
|
|
zs = "dk,fk/%s&th=x" % (zj["files"][0]["href"])
|
|
self.assertIn('">t1</text>', self.curl(zs, "u2")[1])
|
|
|
|
##
|
|
## dks+fk
|
|
|
|
# should not be able to access dk with wrong dirkey,
|
|
zs = self.curl("dks,fk?ls&k=%s" % (dk["dk"]), "u2")[1]
|
|
self.assertEqual(zs, "\nJ2EOT")
|
|
# so use the right key
|
|
zs = self.curl("dks,fk?ls&k=%s" % (dk["dks,fk"]), "u2")[1]
|
|
zj = json.loads(zs)
|
|
self.assertEqual(len(zj["dirs"]), 1)
|
|
self.assertEqual(len(zj["files"]), 1)
|
|
self.assertEqual(zj["dirs"][0]["href"][:6], "s1/?k=")
|
|
self.assertEqual(zj["files"][0]["href"][:7], "f.t1?k=")
|
|
|
|
##
|
|
## dks+fk thumbs
|
|
|
|
self.assertIn('">folder</text>', self.curl("dks,fk?th=x", "u1")[1])
|
|
self.assertIn('">e403</text>', self.curl("dks,fk?th=x", "u2")[1])
|
|
|
|
zs = "dks,fk?th=x&k=%s" % (dk["dk"])
|
|
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
|
|
|
|
zs = "dks,fk?th=x&k=%s" % (dk["dks,fk"])
|
|
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
|
|
|
|
# subdir s1 without key
|
|
zs = "dks,fk/s1/?th=x"
|
|
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
|
|
|
|
# subdir s1 with bad key
|
|
zs = "dks,fk/s1/?th=x&k=no"
|
|
self.assertIn('">e403</text>', self.curl(zs, "u2")[1])
|
|
|
|
# subdir s1 with correct key
|
|
zs = "dks,fk/%s&th=x" % (zj["dirs"][0]["href"])
|
|
self.assertIn('">folder</text>', self.curl(zs, "u2")[1])
|
|
|
|
# fk enabled, so this should fail
|
|
self.assertIn('">e403</text>', self.curl("dks,fk/f.t1?th=x", "u2")[1])
|
|
self.assertIn('">e403</text>', self.curl("dks,fk/s1/f.t2?th=x", "u2")[1])
|
|
|
|
# but dk should return correct filekeys, so try that
|
|
zs = "dks,fk/%s&th=x" % (zj["files"][0]["href"])
|
|
self.assertIn('">t1</text>', self.curl(zs, "u2")[1])
|
|
|
|
# subdir
|
|
self.assertIn('">e403</text>', self.curl("dks,fk/s1/?th=x", "u2")[1])
|
|
self.assertEqual("\nJ2EOT", self.curl("dks,fk/s1/?ls", "u2")[1])
|
|
zs = "dks,fk/s1%s&th=x" % (zj["files"][0]["href"])
|
|
zs = self.curl("dks,fk?ls&k=%s" % (dk["dks,fk"]), "u2")[1]
|
|
zj = json.loads(zs)
|
|
url = "dks,fk/%s" % zj["dirs"][0]["href"]
|
|
self.assertIn('"files"', self.curl(url + "&ls", "u2")[1])
|
|
self.assertEqual("\nJ2EOT", self.curl(url + "x&ls", "u2")[1])
|
|
|
|
def tardir(self, url, uname):
|
|
top = url.split("?")[0]
|
|
top = ("top" if not top else top.lstrip(".").split("/")[0]) + "/"
|
|
url += ("&" if "?" in url else "?") + "tar"
|
|
h, b = self.curl(url, uname, True)
|
|
tar = tarfile.open(fileobj=io.BytesIO(b), mode="r|").getnames()
|
|
if len(tar) != len([x for x in tar if x.startswith(top)]):
|
|
raise Exception("bad-prefix:", tar)
|
|
return " ".join([x[len(top) :] for x in tar])
|
|
|
|
def tarsel(self, url, uname, sel):
|
|
url += ("&" if "?" in url else "?") + "tar"
|
|
zs = '--XD\r\nContent-Disposition: form-data; name="act"\r\n\r\nzip\r\n--XD\r\nContent-Disposition: form-data; name="files"\r\n\r\n'
|
|
zs += "\r\n".join(sel) + "\r\n--XD--\r\n"
|
|
zb = zs.encode("utf-8")
|
|
hdr = "POST /%s HTTP/1.1\r\nPW: %s\r\nConnection: close\r\nContent-Type: multipart/form-data; boundary=XD\r\nContent-Length: %d\r\n\r\n"
|
|
req = (hdr % (url, uname, len(zb))).encode("utf-8") + zb
|
|
h, b = self.curl("/" + url, uname, True, req)
|
|
tar = tarfile.open(fileobj=io.BytesIO(b), mode="r|").getnames()
|
|
return " ".join(tar)
|
|
|
|
def curl(self, url, uname, binary=False, req=b""):
|
|
req = req or hdr(url.replace("th=x", "th=j"), uname)
|
|
conn = self.conn.setbuf(req)
|
|
HttpCli(conn).run()
|
|
if binary:
|
|
h, b = conn.s._reply.split(b"\r\n\r\n", 1)
|
|
return [h.decode("utf-8"), b]
|
|
|
|
return conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
|
|
|
|
def log(self, src, msg, c=0):
|
|
print(msg, "\033[0m")
|