#!/usr/bin/env python3 # coding: utf-8 from __future__ import print_function, unicode_literals import json import os import shutil import sqlite3 import tempfile import unittest from copyparty.__init__ import ANYWIN from copyparty.authsrv import AuthSrv from copyparty.httpcli import HttpCli from copyparty.util import absreal from tests import util as tu from tests.util import Cfg class TestShr(unittest.TestCase): def log(self, src, msg, c=0): m = "%s" % (msg,) if ( "warning: filesystem-path does not exist:" in m or "you are sharing a system directory:" in m or "symlink-based deduplication is enabled" in m or m.startswith("hint: argument") ): return print(("[%s] %s" % (src, msg)).encode("ascii", "replace").decode("ascii")) def assertLD(self, url, auth, els, edl): ls = self.ls(url, auth) self.assertEqual(ls[0], len(els) == 2) if not ls[0]: return a = [list(sorted(els[0])), list(sorted(els[1]))] b = [list(sorted(ls[1])), list(sorted(ls[2]))] self.assertEqual(a, b) if edl is None: edl = els[1] can_dl = [] for fn in b[1]: if fn == "a.db": continue furl = url + "/" + fn if auth: furl += "?pw=p1" h, zb = self.curl(furl, True) if h.startswith("HTTP/1.1 200 "): can_dl.append(fn) self.assertEqual(edl, can_dl) def setUp(self): self.td = tu.get_ramdisk() td = os.path.join(self.td, "vfs") os.mkdir(td) os.chdir(td) os.mkdir("d1") os.mkdir("d2") os.mkdir("d2/d3") for zs in ("d1/f1", "d2/f2", "d2/d3/f3"): with open(zs, "wb") as f: f.write(zs.encode("utf-8")) for dst in ("d1", "d2", "d2/d3"): src, fn = zs.rsplit("/", 1) os.symlink(absreal(zs), dst + "/l" + fn[-1:]) db = sqlite3.connect("a.db") with db: zs = r"create table sh (k text, pw text, vp text, pr text, st int, un text, t0 int, t1 int)" db.execute(zs) db.close() def tearDown(self): os.chdir(tempfile.gettempdir()) shutil.rmtree(self.td) def cinit(self): self.asrv = AuthSrv(self.args, self.log) self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"", True) def test1(self): self.args = Cfg( a=["u1:p1"], v=["::A,u1", "d1:v1:A,u1", "d2/d3:d2/d3:A,u1"], shr="/shr/", shr1="shr/", shr_db="a.db", shr_v=False, ) self.cinit() self.assertLD("", True, [["d1", "d2", "v1"], ["a.db"]], []) self.assertLD("d1", True, [[], ["f1", "l1", "l2", "l3"]], None) self.assertLD("v1", True, [[], ["f1", "l1", "l2", "l3"]], None) self.assertLD("d2", True, [["d3"], ["f2", "l1", "l2", "l3"]], None) self.assertLD("d2/d3", True, [[], ["f3", "l1", "l2", "l3"]], None) self.assertLD("d3", True, [], []) jt = { "k": "r", "vp": ["/"], "pw": "", "exp": "99", "perms": ["read"], } print(self.post_json("?pw=p1&share", jt)[1]) jt = { "k": "d2", "vp": ["/d2/"], "pw": "", "exp": "99", "perms": ["read"], } print(self.post_json("?pw=p1&share", jt)[1]) self.conn.shutdown() self.cinit() self.assertLD("", True, [["d1", "d2", "v1"], ["a.db"]], []) self.assertLD("d1", True, [[], ["f1", "l1", "l2", "l3"]], None) self.assertLD("v1", True, [[], ["f1", "l1", "l2", "l3"]], None) self.assertLD("d2", True, [["d3"], ["f2", "l1", "l2", "l3"]], None) self.assertLD("d2/d3", True, [[], ["f3", "l1", "l2", "l3"]], None) self.assertLD("d3", True, [], []) self.assertLD("shr/d2", False, [[], ["f2", "l1", "l2", "l3"]], None) self.assertLD("shr/d2/d3", False, [], None) self.assertLD("shr/r", False, [["d1"], ["a.db"]], []) self.assertLD("shr/r/d1", False, [[], ["f1", "l1", "l2", "l3"]], None) self.assertLD("shr/r/d2", False, [], None) # unfortunate self.assertLD("shr/r/d2/d3", False, [], None) self.conn.shutdown() def test2(self): self.args = Cfg( a=["u1:p1"], v=["::A,u1", "d1:v1:A,u1", "d2/d3:d2/d3:A,u1"], shr="/shr/", shr1="shr/", shr_db="a.db", shr_v=False, xvol=True, ) self.cinit() self.assertLD("", True, [["d1", "d2", "v1"], ["a.db"]], []) self.assertLD("d1", True, [[], ["f1", "l1", "l2", "l3"]], None) self.assertLD("v1", True, [[], ["f1", "l1", "l2", "l3"]], None) self.assertLD("d2", True, [["d3"], ["f2", "l1", "l2", "l3"]], None) self.assertLD("d2/d3", True, [[], ["f3", "l1", "l2", "l3"]], None) self.assertLD("d3", True, [], []) jt = { "k": "r", "vp": ["/"], "pw": "", "exp": "99", "perms": ["read"], } print(self.post_json("?pw=p1&share", jt)[1]) jt = { "k": "d2", "vp": ["/d2/"], "pw": "", "exp": "99", "perms": ["read"], } print(self.post_json("?pw=p1&share", jt)[1]) self.conn.shutdown() self.cinit() self.assertLD("", True, [["d1", "d2", "v1"], ["a.db"]], []) self.assertLD("d1", True, [[], ["f1", "l1", "l2", "l3"]], None) self.assertLD("v1", True, [[], ["f1", "l1", "l2", "l3"]], None) self.assertLD("d2", True, [["d3"], ["f2", "l1", "l2", "l3"]], None) self.assertLD("d2/d3", True, [[], ["f3", "l1", "l2", "l3"]], None) self.assertLD("d3", True, [], []) self.assertLD("shr/d2", False, [[], ["f2", "l1", "l2", "l3"]], ["f2", "l2"]) self.assertLD("shr/d2/d3", False, [], []) self.assertLD("shr/r", False, [["d1"], ["a.db"]], []) self.assertLD( "shr/r/d1", False, [[], ["f1", "l1", "l2", "l3"]], ["f1", "l1", "l2"] ) self.assertLD("shr/r/d2", False, [], []) # unfortunate self.assertLD("shr/r/d2/d3", False, [], []) self.conn.shutdown() def ls(self, url: str, auth: bool): zs = url + "?ls" + ("&pw=p1" if auth else "") h, b = self.curl(zs) if not h.startswith("HTTP/1.1 200 "): return (False, [], []) jo = json.loads(b) return ( True, [x["href"].rstrip("/") for x in jo.get("dirs") or {}], [x["href"] for x in jo.get("files") or {}], ) def curl(self, url: str, binary=False): h = "GET /%s HTTP/1.1\r\nConnection: close\r\n\r\n" HttpCli(self.conn.setbuf((h % (url,)).encode("utf-8"))).run() if binary: h, b = self.conn.s._reply.split(b"\r\n\r\n", 1) return [h.decode("utf-8"), b] return self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1) def post_json(self, url: str, data): buf = json.dumps(data).encode("utf-8") msg = [ "POST /%s HTTP/1.1" % (url,), "Connection: close", "Content-Type: application/json", "Content-Length: %d" % (len(buf),), "\r\n", ] buf = "\r\n".join(msg).encode("utf-8") + buf print("PUT -->", buf) HttpCli(self.conn.setbuf(buf)).run() return self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)