mirror of
https://github.com/9001/copyparty.git
synced 2025-10-23 16:52:43 -06:00
230 lines
7.6 KiB
Python
230 lines
7.6 KiB
Python
#!/usr/bin/env python3
|
|
# coding: utf-8
|
|
from __future__ import print_function, unicode_literals
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import sqlite3
|
|
import tempfile
|
|
import unittest
|
|
|
|
from copyparty.__init__ import ANYWIN
|
|
from copyparty.authsrv import AuthSrv
|
|
from copyparty.httpcli import HttpCli
|
|
from copyparty.util import absreal
|
|
from tests import util as tu
|
|
from tests.util import Cfg
|
|
|
|
|
|
class TestShr(unittest.TestCase):
|
|
def log(self, src, msg, c=0):
|
|
m = "%s" % (msg,)
|
|
if (
|
|
"warning: filesystem-path does not exist:" in m
|
|
or "you are sharing a system directory:" in m
|
|
or "symlink-based deduplication is enabled" in m
|
|
or m.startswith("hint: argument")
|
|
):
|
|
return
|
|
|
|
print(("[%s] %s" % (src, msg)).encode("ascii", "replace").decode("ascii"))
|
|
|
|
def assertLD(self, url, auth, els, edl):
|
|
ls = self.ls(url, auth)
|
|
self.assertEqual(ls[0], len(els) == 2)
|
|
if not ls[0]:
|
|
return
|
|
a = [list(sorted(els[0])), list(sorted(els[1]))]
|
|
b = [list(sorted(ls[1])), list(sorted(ls[2]))]
|
|
self.assertEqual(a, b)
|
|
|
|
if edl is None:
|
|
edl = els[1]
|
|
can_dl = []
|
|
for fn in b[1]:
|
|
if fn == "a.db":
|
|
continue
|
|
furl = url + "/" + fn
|
|
if auth:
|
|
furl += "?pw=p1"
|
|
h, zb = self.curl(furl, True)
|
|
if h.startswith("HTTP/1.1 200 "):
|
|
can_dl.append(fn)
|
|
self.assertEqual(edl, can_dl)
|
|
|
|
def setUp(self):
|
|
self.td = tu.get_ramdisk()
|
|
td = os.path.join(self.td, "vfs")
|
|
os.mkdir(td)
|
|
os.chdir(td)
|
|
os.mkdir("d1")
|
|
os.mkdir("d2")
|
|
os.mkdir("d2/d3")
|
|
for zs in ("d1/f1", "d2/f2", "d2/d3/f3"):
|
|
with open(zs, "wb") as f:
|
|
f.write(zs.encode("utf-8"))
|
|
for dst in ("d1", "d2", "d2/d3"):
|
|
src, fn = zs.rsplit("/", 1)
|
|
os.symlink(absreal(zs), dst + "/l" + fn[-1:])
|
|
|
|
db = sqlite3.connect("a.db")
|
|
with db:
|
|
zs = r"create table sh (k text, pw text, vp text, pr text, st int, un text, t0 int, t1 int)"
|
|
db.execute(zs)
|
|
db.close()
|
|
|
|
def tearDown(self):
|
|
os.chdir(tempfile.gettempdir())
|
|
shutil.rmtree(self.td)
|
|
|
|
def cinit(self):
|
|
self.asrv = AuthSrv(self.args, self.log)
|
|
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"", True)
|
|
|
|
def test1(self):
|
|
self.args = Cfg(
|
|
a=["u1:p1"],
|
|
v=["::A,u1", "d1:v1:A,u1", "d2/d3:d2/d3:A,u1"],
|
|
shr="/shr/",
|
|
shr1="shr/",
|
|
shr_db="a.db",
|
|
shr_v=False,
|
|
)
|
|
self.cinit()
|
|
|
|
self.assertLD("", True, [["d1", "d2", "v1"], ["a.db"]], [])
|
|
self.assertLD("d1", True, [[], ["f1", "l1", "l2", "l3"]], None)
|
|
self.assertLD("v1", True, [[], ["f1", "l1", "l2", "l3"]], None)
|
|
self.assertLD("d2", True, [["d3"], ["f2", "l1", "l2", "l3"]], None)
|
|
self.assertLD("d2/d3", True, [[], ["f3", "l1", "l2", "l3"]], None)
|
|
self.assertLD("d3", True, [], [])
|
|
|
|
jt = {
|
|
"k": "r",
|
|
"vp": ["/"],
|
|
"pw": "",
|
|
"exp": "99",
|
|
"perms": ["read"],
|
|
}
|
|
print(self.post_json("?pw=p1&share", jt)[1])
|
|
jt = {
|
|
"k": "d2",
|
|
"vp": ["/d2/"],
|
|
"pw": "",
|
|
"exp": "99",
|
|
"perms": ["read"],
|
|
}
|
|
print(self.post_json("?pw=p1&share", jt)[1])
|
|
self.conn.shutdown()
|
|
self.cinit()
|
|
|
|
self.assertLD("", True, [["d1", "d2", "v1"], ["a.db"]], [])
|
|
self.assertLD("d1", True, [[], ["f1", "l1", "l2", "l3"]], None)
|
|
self.assertLD("v1", True, [[], ["f1", "l1", "l2", "l3"]], None)
|
|
self.assertLD("d2", True, [["d3"], ["f2", "l1", "l2", "l3"]], None)
|
|
self.assertLD("d2/d3", True, [[], ["f3", "l1", "l2", "l3"]], None)
|
|
self.assertLD("d3", True, [], [])
|
|
|
|
self.assertLD("shr/d2", False, [[], ["f2", "l1", "l2", "l3"]], None)
|
|
self.assertLD("shr/d2/d3", False, [], None)
|
|
|
|
self.assertLD("shr/r", False, [["d1"], ["a.db"]], [])
|
|
self.assertLD("shr/r/d1", False, [[], ["f1", "l1", "l2", "l3"]], None)
|
|
self.assertLD("shr/r/d2", False, [], None) # unfortunate
|
|
self.assertLD("shr/r/d2/d3", False, [], None)
|
|
|
|
self.conn.shutdown()
|
|
|
|
def test2(self):
|
|
self.args = Cfg(
|
|
a=["u1:p1"],
|
|
v=["::A,u1", "d1:v1:A,u1", "d2/d3:d2/d3:A,u1"],
|
|
shr="/shr/",
|
|
shr1="shr/",
|
|
shr_db="a.db",
|
|
shr_v=False,
|
|
xvol=True,
|
|
)
|
|
self.cinit()
|
|
|
|
self.assertLD("", True, [["d1", "d2", "v1"], ["a.db"]], [])
|
|
self.assertLD("d1", True, [[], ["f1", "l1", "l2", "l3"]], None)
|
|
self.assertLD("v1", True, [[], ["f1", "l1", "l2", "l3"]], None)
|
|
self.assertLD("d2", True, [["d3"], ["f2", "l1", "l2", "l3"]], None)
|
|
self.assertLD("d2/d3", True, [[], ["f3", "l1", "l2", "l3"]], None)
|
|
self.assertLD("d3", True, [], [])
|
|
|
|
jt = {
|
|
"k": "r",
|
|
"vp": ["/"],
|
|
"pw": "",
|
|
"exp": "99",
|
|
"perms": ["read"],
|
|
}
|
|
print(self.post_json("?pw=p1&share", jt)[1])
|
|
jt = {
|
|
"k": "d2",
|
|
"vp": ["/d2/"],
|
|
"pw": "",
|
|
"exp": "99",
|
|
"perms": ["read"],
|
|
}
|
|
print(self.post_json("?pw=p1&share", jt)[1])
|
|
self.conn.shutdown()
|
|
self.cinit()
|
|
|
|
self.assertLD("", True, [["d1", "d2", "v1"], ["a.db"]], [])
|
|
self.assertLD("d1", True, [[], ["f1", "l1", "l2", "l3"]], None)
|
|
self.assertLD("v1", True, [[], ["f1", "l1", "l2", "l3"]], None)
|
|
self.assertLD("d2", True, [["d3"], ["f2", "l1", "l2", "l3"]], None)
|
|
self.assertLD("d2/d3", True, [[], ["f3", "l1", "l2", "l3"]], None)
|
|
self.assertLD("d3", True, [], [])
|
|
|
|
self.assertLD("shr/d2", False, [[], ["f2", "l1", "l2", "l3"]], ["f2", "l2"])
|
|
self.assertLD("shr/d2/d3", False, [], [])
|
|
|
|
self.assertLD("shr/r", False, [["d1"], ["a.db"]], [])
|
|
self.assertLD(
|
|
"shr/r/d1", False, [[], ["f1", "l1", "l2", "l3"]], ["f1", "l1", "l2"]
|
|
)
|
|
self.assertLD("shr/r/d2", False, [], []) # unfortunate
|
|
self.assertLD("shr/r/d2/d3", False, [], [])
|
|
|
|
self.conn.shutdown()
|
|
|
|
def ls(self, url: str, auth: bool):
|
|
zs = url + "?ls" + ("&pw=p1" if auth else "")
|
|
h, b = self.curl(zs)
|
|
if not h.startswith("HTTP/1.1 200 "):
|
|
return (False, [], [])
|
|
jo = json.loads(b)
|
|
return (
|
|
True,
|
|
[x["href"].rstrip("/") for x in jo.get("dirs") or {}],
|
|
[x["href"] for x in jo.get("files") or {}],
|
|
)
|
|
|
|
def curl(self, url: str, binary=False):
|
|
h = "GET /%s HTTP/1.1\r\nConnection: close\r\n\r\n"
|
|
HttpCli(self.conn.setbuf((h % (url,)).encode("utf-8"))).run()
|
|
if binary:
|
|
h, b = self.conn.s._reply.split(b"\r\n\r\n", 1)
|
|
return [h.decode("utf-8"), b]
|
|
|
|
return self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
|
|
|
|
def post_json(self, url: str, data):
|
|
buf = json.dumps(data).encode("utf-8")
|
|
msg = [
|
|
"POST /%s HTTP/1.1" % (url,),
|
|
"Connection: close",
|
|
"Content-Type: application/json",
|
|
"Content-Length: %d" % (len(buf),),
|
|
"\r\n",
|
|
]
|
|
buf = "\r\n".join(msg).encode("utf-8") + buf
|
|
print("PUT -->", buf)
|
|
HttpCli(self.conn.setbuf(buf)).run()
|
|
return self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
|