#!/usr/bin/env python3 # coding: utf-8 from __future__ import print_function, unicode_literals import io import json import os import shutil import tarfile import tempfile import unittest from copyparty.authsrv import AuthSrv from copyparty.httpcli import HttpCli from copyparty.u2idx import U2idx from copyparty.up2k import Up2k from tests import util as tu from tests.util import Cfg try: from typing import Optional except: pass def hdr(query, uname): h = "GET /%s HTTP/1.1\r\nPW: %s\r\nConnection: close\r\n\r\n" return (h % (query, uname)).encode("utf-8") class TestDots(unittest.TestCase): def __init__(self, *a, **ka): super(TestDots, self).__init__(*a, **ka) self.is_dut = True def setUp(self): self.conn: Optional[tu.VHttpConn] = None self.td = tu.get_ramdisk() def tearDown(self): if self.conn: self.conn.shutdown() os.chdir(tempfile.gettempdir()) shutil.rmtree(self.td) def cinit(self): if self.conn: self.conn.shutdown() self.conn = None self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"") def test_dots(self): td = os.path.join(self.td, "vfs") os.mkdir(td) os.chdir(td) # topDir volA volA/*dirA .volB .volB/*dirB spaths = " t .t a a/da a/.da .b .b/db .b/.db" for n, dirpath in enumerate(spaths.split(" ")): if dirpath: os.makedirs(dirpath) for pfx in "f", ".f": filepath = pfx + str(n) if dirpath: filepath = os.path.join(dirpath, filepath) with open(filepath, "wb") as f: f.write(filepath.encode("utf-8")) vcfg = [".::r,u1:r.,u2", "a:a:r,u1:r,u2", ".b:.b:r.,u1:r,u2"] self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"], e2dsa=True) self.asrv = AuthSrv(self.args, self.log) self.cinit() self.assertEqual(self.tardir("", "u1"), "f0 t/f1 a/f3 a/da/f4") self.assertEqual(self.tardir(".t", "u1"), "f2") self.assertEqual(self.tardir(".b", "u1"), ".f6 f6 .db/.f8 .db/f8 db/.f7 db/f7") zs = ".f0 f0 .t/.f2 .t/f2 t/.f1 t/f1 .b/f6 .b/db/f7 a/f3 a/da/f4" self.assertEqual(self.tardir("", "u2"), zs) self.assertEqual(self.curl("?tar", "x")[1][:17], "\nJ2EOT") ## ## search up2k = Up2k(self) u2idx = U2idx(self) allvols = list(self.asrv.vfs.all_vols.values()) x = u2idx.search("u1", allvols, "", 999) x = " ".join(sorted([x["rp"] for x in x[0]])) # u1 can see dotfiles in volB so they should be included xe = ".b/.db/.f8 .b/.db/f8 .b/.f6 .b/db/.f7 .b/db/f7 .b/f6 a/da/f4 a/f3 f0 t/f1" self.assertEqual(x, xe) x = u2idx.search("u2", allvols, "", 999) x = " ".join(sorted([x["rp"] for x in x[0]])) self.assertEqual(x, ".f0 .t/.f2 .t/f2 a/da/f4 a/f3 f0 t/.f1 t/f1") u2idx.shutdown() self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"], dotsrch=False, e2d=True) self.asrv = AuthSrv(self.args, self.log) u2idx = U2idx(self) self.cinit() x = u2idx.search("u1", self.asrv.vfs.all_vols.values(), "", 999) x = " ".join(sorted([x["rp"] for x in x[0]])) # u1 can see dotfiles in volB so they should be included xe = "a/da/f4 a/f3 f0 t/f1" self.assertEqual(x, xe) u2idx.shutdown() up2k.shutdown() ## ## dirkeys os.mkdir("v") with open("v/f1.txt", "wb") as f: f.write(b"a") os.rename("a", "v/a") os.rename(".b", "v/.b") vcfg = [ ".::r.,u1:g,u2:c,dks", "v/a:v/a:r.,u1:g,u2:c,dks", "v/.b:v/.b:r.,u1:g,u2:c,dks", ] self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"]) self.asrv = AuthSrv(self.args, self.log) self.cinit() zj = json.loads(self.curl("?ls", "u1")[1]) url = "?k=" + zj["dk"] # should descend into folders, but not other volumes: self.assertEqual(self.tardir(url, "u2"), "f0 t/f1 v/f1.txt") zj = json.loads(self.curl("v?ls", "u1")[1]) url = "v?k=" + zj["dk"] self.assertEqual(self.tarsel(url, "u2", ["f1.txt", "a", ".b"]), "f1.txt") shutil.rmtree("v") def test_dk_fk(self): # python3 -m unittest tests.test_dots.TestDots.test_dk_fk td = os.path.join(self.td, "vfs") os.mkdir(td) os.chdir(td) vcfg = [] for k in "dk dks dky fk fka dk,fk dks,fk".split(): vcfg += ["{0}:{0}:r.,u1:g,u2:c,{0}".format(k)] zs = "%s/s1/s2" % (k,) os.makedirs(zs) with open("%s/f.t1" % (k,), "wb") as f: f.write(b"f1") with open("%s/s1/f.t2" % (k,), "wb") as f: f.write(b"f2") with open("%s/s1/s2/f.t3" % (k,), "wb") as f: f.write(b"f3") self.args = Cfg(v=vcfg, a=["u1:u1", "u2:u2"]) self.asrv = AuthSrv(self.args, self.log) self.cinit() dk = {} for d in "dk dks dk,fk dks,fk".split(): zj = json.loads(self.curl("%s?ls" % (d,), "u1")[1]) dk[d] = zj["dk"] ## ## dk # should not be able to access dk with wrong dirkey, zs = self.curl("dk?ls&k=%s" % (dk["dks"]), "u2")[1] self.assertEqual(zs, "\nJ2EOT") # so use the right key zs = self.curl("dk?ls&k=%s" % (dk["dk"]), "u2")[1] zj = json.loads(zs) self.assertEqual(len(zj["dirs"]), 0) self.assertEqual(len(zj["files"]), 1) self.assertEqual(zj["files"][0]["href"], "f.t1") ## ## dk thumbs self.assertIn('">folder', self.curl("dk?th=x", "u1")[1]) self.assertIn('">e403', self.curl("dk?th=x", "u2")[1]) zs = "dk?th=x&k=%s" % (dk["dks"]) self.assertIn('">e403', self.curl(zs, "u2")[1]) zs = "dk?th=x&k=%s" % (dk["dk"]) self.assertIn('">folder', self.curl(zs, "u2")[1]) # fk not enabled, so this should work self.assertIn('">t1', self.curl("dk/f.t1?th=x", "u2")[1]) self.assertIn('">t2', self.curl("dk/s1/f.t2?th=x", "u2")[1]) ## ## dks # should not be able to access dks with wrong dirkey, zs = self.curl("dks?ls&k=%s" % (dk["dk"]), "u2")[1] self.assertEqual(zs, "\nJ2EOT") # so use the right key zs = self.curl("dks?ls&k=%s" % (dk["dks"]), "u2")[1] zj = json.loads(zs) self.assertEqual(len(zj["dirs"]), 1) self.assertEqual(len(zj["files"]), 1) self.assertEqual(zj["files"][0]["href"], "f.t1") # dks should return correct dirkey of subfolders; s1 = zj["dirs"][0]["href"] self.assertEqual(s1.split("/")[0], "s1") zs = self.curl("dks/%s&ls" % (s1), "u2")[1] self.assertIn('"s2/?k=', zs) ## ## dks thumbs self.assertIn('">folder', self.curl("dks?th=x", "u1")[1]) self.assertIn('">e403', self.curl("dks?th=x", "u2")[1]) zs = "dks?th=x&k=%s" % (dk["dk"]) self.assertIn('">e403', self.curl(zs, "u2")[1]) zs = "dks?th=x&k=%s" % (dk["dks"]) self.assertIn('">folder', self.curl(zs, "u2")[1]) # fk not enabled, so this should work self.assertIn('">t1', self.curl("dks/f.t1?th=x", "u2")[1]) self.assertIn('">t2', self.curl("dks/s1/f.t2?th=x", "u2")[1]) ## ## dky # doesn't care about keys zs = self.curl("dky?ls&k=ok", "u2")[1] self.assertEqual(zs, self.curl("dky?ls", "u2")[1]) zj = json.loads(zs) self.assertEqual(len(zj["dirs"]), 0) self.assertEqual(len(zj["files"]), 1) self.assertEqual(zj["files"][0]["href"], "f.t1") ## ## dky thumbs self.assertIn('">folder', self.curl("dky?th=x", "u1")[1]) self.assertIn('">folder', self.curl("dky?th=x", "u2")[1]) zs = "dky?th=x&k=%s" % (dk["dk"]) self.assertIn('">folder', self.curl(zs, "u2")[1]) # fk not enabled, so this should work self.assertIn('">t1', self.curl("dky/f.t1?th=x", "u2")[1]) self.assertIn('">t2', self.curl("dky/s1/f.t2?th=x", "u2")[1]) ## ## dk+fk # should not be able to access dk with wrong dirkey, zs = self.curl("dk,fk?ls&k=%s" % (dk["dk"]), "u2")[1] self.assertEqual(zs, "\nJ2EOT") # so use the right key zs = self.curl("dk,fk?ls&k=%s" % (dk["dk,fk"]), "u2")[1] zj = json.loads(zs) self.assertEqual(len(zj["dirs"]), 0) self.assertEqual(len(zj["files"]), 1) self.assertEqual(zj["files"][0]["href"][:7], "f.t1?k=") ## ## dk+fk thumbs self.assertIn('">folder', self.curl("dk,fk?th=x", "u1")[1]) self.assertIn('">e403', self.curl("dk,fk?th=x", "u2")[1]) zs = "dk,fk?th=x&k=%s" % (dk["dk"]) self.assertIn('">e403', self.curl(zs, "u2")[1]) zs = "dk,fk?th=x&k=%s" % (dk["dk,fk"]) self.assertIn('">folder', self.curl(zs, "u2")[1]) # fk enabled, so this should fail self.assertIn('">e403', self.curl("dk,fk/f.t1?th=x", "u2")[1]) self.assertIn('">e403', self.curl("dk,fk/s1/f.t2?th=x", "u2")[1]) # but dk should return correct filekeys, so try that zs = "dk,fk/%s&th=x" % (zj["files"][0]["href"]) self.assertIn('">t1', self.curl(zs, "u2")[1]) ## ## dks+fk # should not be able to access dk with wrong dirkey, zs = self.curl("dks,fk?ls&k=%s" % (dk["dk"]), "u2")[1] self.assertEqual(zs, "\nJ2EOT") # so use the right key zs = self.curl("dks,fk?ls&k=%s" % (dk["dks,fk"]), "u2")[1] zj = json.loads(zs) self.assertEqual(len(zj["dirs"]), 1) self.assertEqual(len(zj["files"]), 1) self.assertEqual(zj["dirs"][0]["href"][:6], "s1/?k=") self.assertEqual(zj["files"][0]["href"][:7], "f.t1?k=") ## ## dks+fk thumbs self.assertIn('">folder', self.curl("dks,fk?th=x", "u1")[1]) self.assertIn('">e403', self.curl("dks,fk?th=x", "u2")[1]) zs = "dks,fk?th=x&k=%s" % (dk["dk"]) self.assertIn('">e403', self.curl(zs, "u2")[1]) zs = "dks,fk?th=x&k=%s" % (dk["dks,fk"]) self.assertIn('">folder', self.curl(zs, "u2")[1]) # subdir s1 without key zs = "dks,fk/s1/?th=x" self.assertIn('">e403', self.curl(zs, "u2")[1]) # subdir s1 with bad key zs = "dks,fk/s1/?th=x&k=no" self.assertIn('">e403', self.curl(zs, "u2")[1]) # subdir s1 with correct key zs = "dks,fk/%s&th=x" % (zj["dirs"][0]["href"]) self.assertIn('">folder', self.curl(zs, "u2")[1]) # fk enabled, so this should fail self.assertIn('">e403', self.curl("dks,fk/f.t1?th=x", "u2")[1]) self.assertIn('">e403', self.curl("dks,fk/s1/f.t2?th=x", "u2")[1]) # but dk should return correct filekeys, so try that zs = "dks,fk/%s&th=x" % (zj["files"][0]["href"]) self.assertIn('">t1', self.curl(zs, "u2")[1]) # subdir self.assertIn('">e403', self.curl("dks,fk/s1/?th=x", "u2")[1]) self.assertEqual("\nJ2EOT", self.curl("dks,fk/s1/?ls", "u2")[1]) zs = "dks,fk/s1%s&th=x" % (zj["files"][0]["href"]) zs = self.curl("dks,fk?ls&k=%s" % (dk["dks,fk"]), "u2")[1] zj = json.loads(zs) url = "dks,fk/%s" % zj["dirs"][0]["href"] self.assertIn('"files"', self.curl(url + "&ls", "u2")[1]) self.assertEqual("\nJ2EOT", self.curl(url + "x&ls", "u2")[1]) def tardir(self, url, uname): top = url.split("?")[0] top = ("top" if not top else top.lstrip(".").split("/")[0]) + "/" url += ("&" if "?" in url else "?") + "tar" h, b = self.curl(url, uname, True) tar = tarfile.open(fileobj=io.BytesIO(b), mode="r|").getnames() if len(tar) != len([x for x in tar if x.startswith(top)]): raise Exception("bad-prefix:", tar) return " ".join([x[len(top) :] for x in tar]) def tarsel(self, url, uname, sel): url += ("&" if "?" in url else "?") + "tar" zs = '--XD\r\nContent-Disposition: form-data; name="act"\r\n\r\nzip\r\n--XD\r\nContent-Disposition: form-data; name="files"\r\n\r\n' zs += "\r\n".join(sel) + "\r\n--XD--\r\n" zb = zs.encode("utf-8") hdr = "POST /%s HTTP/1.1\r\nPW: %s\r\nConnection: close\r\nContent-Type: multipart/form-data; boundary=XD\r\nContent-Length: %d\r\n\r\n" req = (hdr % (url, uname, len(zb))).encode("utf-8") + zb h, b = self.curl("/" + url, uname, True, req) tar = tarfile.open(fileobj=io.BytesIO(b), mode="r|").getnames() return " ".join(tar) def curl(self, url, uname, binary=False, req=b""): req = req or hdr(url, uname) conn = self.conn.setbuf(req) HttpCli(conn).run() if binary: h, b = conn.s._reply.split(b"\r\n\r\n", 1) return [h.decode("utf-8"), b] return conn.s._reply.decode("utf-8").split("\r\n\r\n", 1) def log(self, src, msg, c=0): print(msg, "\033[0m")