#!/usr/bin/env python3 # coding: utf-8 from __future__ import print_function, unicode_literals import io import json import os import shutil import tarfile import tempfile import unittest from copyparty.authsrv import AuthSrv from copyparty.httpcli import HttpCli from copyparty.u2idx import U2idx from copyparty.up2k import Up2k from tests import util as tu from tests.util import J2_ENV, Cfg try: from typing import Optional except: pass def hdr(query, uname, extra=""): h = "GET /%s HTTP/1.1\r\nPW: %s\r\nConnection: close\r\n%s\r\n" return (h % (query, uname, extra)).encode("utf-8") class TestDots(unittest.TestCase): def __init__(self, *a, **ka): super(TestDots, self).__init__(*a, **ka) self.is_dut = True self.j2_brw = None def setUp(self): self.conn: Optional[tu.VHttpConn] = None self.td = tu.get_ramdisk() def tearDown(self): if self.conn: self.conn.shutdown() os.chdir(tempfile.gettempdir()) shutil.rmtree(self.td) def cinit(self): if self.conn: self.conn.shutdown() self.conn = None self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"") if not self.j2_brw: zs = os.path.dirname(__file__) with open("%s/../copyparty/web/browser.html" % (zs,), "rb") as f: zs = f.read().decode("utf-8") self.j2_brw = J2_ENV.from_string(zs) self.conn.hsrv.j2["browser"] = self.j2_brw def test_dots(self): td = os.path.join(self.td, "vfs") os.mkdir(td) os.chdir(td) # topDir volA volA/*dirA .volB .volB/*dirB spaths = " t .t a a/da a/.da .b .b/db .b/.db" for n, dirpath in enumerate(spaths.split(" ")): if dirpath: os.makedirs(dirpath) for pfx in "f", ".f": filepath = pfx + str(n) if dirpath: filepath = os.path.join(dirpath, filepath) with open(filepath, "wb") as f: f.write(filepath.encode("utf-8")) vcfg = [".::r,u1:r.,u2", "a:a:r,u1:r,u2", ".b:.b:r.,u1:r,u2"] self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"], e2dsa=True) self.asrv = AuthSrv(self.args, self.log) self.cinit() self.assertEqual(self.tardir("", "u1"), "f0 t/f1 a/f3 a/da/f4") self.assertEqual(self.tardir(".t", "u1"), "f2") self.assertEqual(self.tardir(".b", "u1"), ".f6 f6 .db/.f8 .db/f8 db/.f7 db/f7") zs = ".f0 f0 .t/.f2 .t/f2 t/.f1 t/f1 .b/f6 .b/db/f7 a/f3 a/da/f4" self.assertEqual(self.tardir("", "u2"), zs) self.assertEqual(self.curl("?tar", "x")[1][:17], "\nJ2EOT") ## ## search up2k = Up2k(self) u2idx = U2idx(self) allvols = list(self.asrv.vfs.all_vols.values()) x = u2idx.search("u1", allvols, "", 999) x = " ".join(sorted([x["rp"] for x in x[0]])) # u1 can see dotfiles in volB so they should be included xe = ".b/.db/.f8 .b/.db/f8 .b/.f6 .b/db/.f7 .b/db/f7 .b/f6 a/da/f4 a/f3 f0 t/f1" self.assertEqual(x, xe) x = u2idx.search("u2", allvols, "", 999) x = " ".join(sorted([x["rp"] for x in x[0]])) self.assertEqual(x, ".f0 .t/.f2 .t/f2 a/da/f4 a/f3 f0 t/.f1 t/f1") u2idx.shutdown() self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"], dotsrch=False, e2d=True) self.asrv = AuthSrv(self.args, self.log) u2idx = U2idx(self) self.cinit() x = u2idx.search("u1", self.asrv.vfs.all_vols.values(), "", 999) x = " ".join(sorted([x["rp"] for x in x[0]])) # u1 can see dotfiles in volB so they should be included xe = "a/da/f4 a/f3 f0 t/f1" self.assertEqual(x, xe) u2idx.shutdown() up2k.shutdown() ## ## dirkeys os.mkdir("v") with open("v/f1.txt", "wb") as f: f.write(b"a") os.rename("a", "v/a") os.rename(".b", "v/.b") vcfg = [ ".::r.,u1:g,u2:c,dks", "v/a:v/a:r.,u1:g,u2:c,dks", "v/.b:v/.b:r.,u1:g,u2:c,dks", ] self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"]) self.asrv = AuthSrv(self.args, self.log) self.cinit() zj = json.loads(self.curl("?ls", "u1")[1]) url = "?k=" + zj["dk"] # should descend into folders, but not other volumes: self.assertEqual(self.tardir(url, "u2"), "f0 t/f1 v/f1.txt") zj = json.loads(self.curl("v?ls", "u1")[1]) url = "v?k=" + zj["dk"] self.assertEqual(self.tarsel(url, "u2", ["f1.txt", "a", ".b"]), "f1.txt") shutil.rmtree("v") def test_dk_fk(self): # python3 -m unittest tests.test_dots.TestDots.test_dk_fk td = os.path.join(self.td, "vfs") os.mkdir(td) os.chdir(td) vcfg = [] zs = "dk dks=12 dky fk fka dk,fk dks=12:c,fk" for k2 in zs.split(): k = k2.replace("=12", "").replace(":c,", ",") vcfg += ["{0}:{0}:r.,u1:g,u2:c,{1}".format(k, k2)] zs = "%s/s1/s2" % (k,) os.makedirs(zs) with open("%s/f.t1" % (k,), "wb") as f: f.write(b"f1") with open("%s/s1/f.t2" % (k,), "wb") as f: f.write(b"f2") with open("%s/s1/s2/f.t3" % (k,), "wb") as f: f.write(b"f3") self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"]) self.asrv = AuthSrv(self.args, self.log) self.cinit() dk = {} for d in "dk dks dk,fk dks,fk".split(): zj = json.loads(self.curl("%s?ls" % (d,), "u1")[1]) dk[d] = zj["dk"] ## ## dk # should not be able to access dk with wrong dirkey, zs = self.curl("dk?ls&k=%s" % (dk["dks"]), "u2")[1] self.assertEqual(zs, "\nJ2EOT") # so use the right key zs = self.curl("dk?ls&k=%s" % (dk["dk"]), "u2")[1] zj = json.loads(zs) self.assertEqual(len(zj["dirs"]), 0) self.assertEqual(len(zj["files"]), 1) self.assertEqual(zj["files"][0]["href"], "f.t1") ## ## dk thumbs self.assertIn('">folder', self.curl("dk?th=x", "u1")[1]) self.assertIn('">e403', self.curl("dk?th=x", "u2")[1]) zs = "dk?th=x&k=%s" % (dk["dks"]) self.assertIn('">e403', self.curl(zs, "u2")[1]) zs = "dk?th=x&k=%s" % (dk["dk"]) self.assertIn('">folder', self.curl(zs, "u2")[1]) # fk not enabled, so this should work self.assertIn('">t1', self.curl("dk/f.t1?th=x", "u2")[1]) self.assertIn('">t2', self.curl("dk/s1/f.t2?th=x", "u2")[1]) ## ## dks # should not be able to access dks with wrong dirkey, zs = self.curl("dks?ls&k=%s" % (dk["dk"]), "u2")[1] self.assertEqual(zs, "\nJ2EOT") # so use the right key zs = self.curl("dks?ls&k=%s" % (dk["dks"]), "u2")[1] zj = json.loads(zs) self.assertEqual(len(zj["dirs"]), 1) self.assertEqual(len(zj["files"]), 1) self.assertEqual(zj["files"][0]["href"], "f.t1") # dks should return correct dirkey of subfolders; s1 = zj["dirs"][0]["href"] self.assertEqual(s1.split("/")[0], "s1") zs = self.curl("dks/%s&ls" % (s1), "u2")[1] self.assertIn('"s2/?k=', zs) # parent key should not exist in response self.assertNotIn(dk["dks"], zs) # and not in html either for ck in ("", "Cookie: js=y\r\n"): zb = hdr("dks/%s" % (s1), "u2", ck) zs = self.curl("", "", False, zb)[1] self.assertNotIn(dk["dks"], zs) self.assertIn('"s2/?k=', zs) ## ## dks thumbs self.assertIn('">folder', self.curl("dks?th=x", "u1")[1]) self.assertIn('">e403', self.curl("dks?th=x", "u2")[1]) zs = "dks?th=x&k=%s" % (dk["dk"]) self.assertIn('">e403', self.curl(zs, "u2")[1]) zs = "dks?th=x&k=%s" % (dk["dks"]) self.assertIn('">folder', self.curl(zs, "u2")[1]) # fk not enabled, so this should work self.assertIn('">t1', self.curl("dks/f.t1?th=x", "u2")[1]) self.assertIn('">t2', self.curl("dks/s1/f.t2?th=x", "u2")[1]) ## ## dky # doesn't care about keys zs = self.curl("dky?ls&k=ok", "u2")[1] self.assertEqual(zs, self.curl("dky?ls", "u2")[1]) zj = json.loads(zs) self.assertEqual(len(zj["dirs"]), 0) self.assertEqual(len(zj["files"]), 1) self.assertEqual(zj["files"][0]["href"], "f.t1") ## ## dky thumbs self.assertIn('">folder', self.curl("dky?th=x", "u1")[1]) self.assertIn('">folder', self.curl("dky?th=x", "u2")[1]) zs = "dky?th=x&k=%s" % (dk["dk"]) self.assertIn('">folder', self.curl(zs, "u2")[1]) # fk not enabled, so this should work self.assertIn('">t1', self.curl("dky/f.t1?th=x", "u2")[1]) self.assertIn('">t2', self.curl("dky/s1/f.t2?th=x", "u2")[1]) ## ## dk+fk # should not be able to access dk with wrong dirkey, zs = self.curl("dk,fk?ls&k=%s" % (dk["dk"]), "u2")[1] self.assertEqual(zs, "\nJ2EOT") # so use the right key zs = self.curl("dk,fk?ls&k=%s" % (dk["dk,fk"]), "u2")[1] zj = json.loads(zs) self.assertEqual(len(zj["dirs"]), 0) self.assertEqual(len(zj["files"]), 1) self.assertEqual(zj["files"][0]["href"][:7], "f.t1?k=") ## ## dk+fk thumbs self.assertIn('">folder', self.curl("dk,fk?th=x", "u1")[1]) self.assertIn('">e403', self.curl("dk,fk?th=x", "u2")[1]) zs = "dk,fk?th=x&k=%s" % (dk["dk"]) self.assertIn('">e403', self.curl(zs, "u2")[1]) zs = "dk,fk?th=x&k=%s" % (dk["dk,fk"]) self.assertIn('">folder', self.curl(zs, "u2")[1]) # fk enabled, so this should fail self.assertIn('">e403', self.curl("dk,fk/f.t1?th=x", "u2")[1]) self.assertIn('">e403', self.curl("dk,fk/s1/f.t2?th=x", "u2")[1]) # but dk should return correct filekeys, so try that zs = "dk,fk/%s&th=x" % (zj["files"][0]["href"]) self.assertIn('">t1', self.curl(zs, "u2")[1]) ## ## dks+fk # should not be able to access dk with wrong dirkey, zs = self.curl("dks,fk?ls&k=%s" % (dk["dk"]), "u2")[1] self.assertEqual(zs, "\nJ2EOT") # so use the right key zs = self.curl("dks,fk?ls&k=%s" % (dk["dks,fk"]), "u2")[1] zj = json.loads(zs) self.assertEqual(len(zj["dirs"]), 1) self.assertEqual(len(zj["files"]), 1) self.assertEqual(zj["dirs"][0]["href"][:6], "s1/?k=") self.assertEqual(zj["files"][0]["href"][:7], "f.t1?k=") ## ## dks+fk thumbs self.assertIn('">folder', self.curl("dks,fk?th=x", "u1")[1]) self.assertIn('">e403', self.curl("dks,fk?th=x", "u2")[1]) zs = "dks,fk?th=x&k=%s" % (dk["dk"]) self.assertIn('">e403', self.curl(zs, "u2")[1]) zs = "dks,fk?th=x&k=%s" % (dk["dks,fk"]) self.assertIn('">folder', self.curl(zs, "u2")[1]) # subdir s1 without key zs = "dks,fk/s1/?th=x" self.assertIn('">e403', self.curl(zs, "u2")[1]) # subdir s1 with bad key zs = "dks,fk/s1/?th=x&k=no" self.assertIn('">e403', self.curl(zs, "u2")[1]) # subdir s1 with correct key zs = "dks,fk/%s&th=x" % (zj["dirs"][0]["href"]) self.assertIn('">folder', self.curl(zs, "u2")[1]) # fk enabled, so this should fail self.assertIn('">e403', self.curl("dks,fk/f.t1?th=x", "u2")[1]) self.assertIn('">e403', self.curl("dks,fk/s1/f.t2?th=x", "u2")[1]) # but dk should return correct filekeys, so try that zs = "dks,fk/%s&th=x" % (zj["files"][0]["href"]) self.assertIn('">t1', self.curl(zs, "u2")[1]) # subdir self.assertIn('">e403', self.curl("dks,fk/s1/?th=x", "u2")[1]) self.assertEqual("\nJ2EOT", self.curl("dks,fk/s1/?ls", "u2")[1]) zs = "dks,fk/s1%s&th=x" % (zj["files"][0]["href"]) zs = self.curl("dks,fk?ls&k=%s" % (dk["dks,fk"]), "u2")[1] zj = json.loads(zs) url = "dks,fk/%s" % zj["dirs"][0]["href"] self.assertIn('"files"', self.curl(url + "&ls", "u2")[1]) self.assertEqual("\nJ2EOT", self.curl(url + "x&ls", "u2")[1]) def tardir(self, url, uname): top = url.split("?")[0] top = ("top" if not top else top.lstrip(".").split("/")[0]) + "/" url += ("&" if "?" in url else "?") + "tar" h, b = self.curl(url, uname, True) tar = tarfile.open(fileobj=io.BytesIO(b), mode="r|").getnames() if len(tar) != len([x for x in tar if x.startswith(top)]): raise Exception("bad-prefix:", tar) return " ".join([x[len(top) :] for x in tar]) def tarsel(self, url, uname, sel): url += ("&" if "?" in url else "?") + "tar" zs = '--XD\r\nContent-Disposition: form-data; name="act"\r\n\r\nzip\r\n--XD\r\nContent-Disposition: form-data; name="files"\r\n\r\n' zs += "\r\n".join(sel) + "\r\n--XD--\r\n" zb = zs.encode("utf-8") hdr = "POST /%s HTTP/1.1\r\nPW: %s\r\nConnection: close\r\nContent-Type: multipart/form-data; boundary=XD\r\nContent-Length: %d\r\n\r\n" req = (hdr % (url, uname, len(zb))).encode("utf-8") + zb h, b = self.curl("", "", True, req) tar = tarfile.open(fileobj=io.BytesIO(b), mode="r|").getnames() return " ".join(tar) def curl(self, url, uname, binary=False, req=b""): req = req or hdr(url.replace("th=x", "th=j"), uname) conn = self.conn.setbuf(req) HttpCli(conn).run() if binary: h, b = conn.s._reply.split(b"\r\n\r\n", 1) return [h.decode("utf-8"), b] return conn.s._reply.decode("utf-8").split("\r\n\r\n", 1) def log(self, src, msg, c=0): print(msg, "\033[0m")