#!/usr/bin/env python3 # coding: utf-8 from __future__ import print_function, unicode_literals import io import os import time import shutil import pprint import tarfile import tempfile import unittest from argparse import Namespace from tests import util as tu from copyparty.authsrv import AuthSrv from copyparty.httpcli import HttpCli def hdr(query): h = "GET /{} HTTP/1.1\r\nCookie: cppwd=o\r\nConnection: close\r\n\r\n" return h.format(query).encode("utf-8") class Cfg(Namespace): def __init__(self, a=None, v=None, c=None): ka = {} ex = "e2d e2ds e2dsa e2t e2ts e2tsr ed emp force_js ihead no_acode no_athumb no_del no_logues no_mv no_readme no_robots no_scandir no_thumb no_vthumb no_zip nw" ka.update(**{k: False for k in ex.split()}) ex = "nih no_rescan no_sendfile no_voldump" ka.update(**{k: True for k in ex.split()}) ex = "css_browser hist js_browser no_hash no_idx" ka.update(**{k: None for k in ex.split()}) ex = "re_maxage rproxy rsp_slp s_wr_slp theme themes turbo" ka.update(**{k: 0 for k in ex.split()}) ex = "doctitle favico html_head mth textfiles" ka.update(**{k: "" for k in ex.split()}) super(Cfg, self).__init__( a=a or [], v=v or [], c=c, s_wr_sz=512 * 1024, unpost=600, mtp=[], mte="a", lang="eng", logout=573, **ka ) class TestHttpCli(unittest.TestCase): def setUp(self): self.td = tu.get_ramdisk() def tearDown(self): os.chdir(tempfile.gettempdir()) shutil.rmtree(self.td) def test(self): td = os.path.join(self.td, "vfs") os.mkdir(td) os.chdir(td) self.dtypes = ["ra", "ro", "rx", "wa", "wo", "wx", "aa", "ao", "ax"] self.can_read = ["ra", "ro", "aa", "ao"] self.can_write = ["wa", "wo", "aa", "ao"] self.fn = "g{:x}g".format(int(time.time() * 3)) allfiles = [] allvols = [] for top in self.dtypes: allvols.append(top) allfiles.append("/".join([top, self.fn])) for s1 in self.dtypes: p = "/".join([top, s1]) allvols.append(p) allfiles.append(p + "/" + self.fn) allfiles.append(p + "/n/" + self.fn) for s2 in self.dtypes: p = "/".join([top, s1, "n", s2]) os.makedirs(p) allvols.append(p) allfiles.append(p + "/" + self.fn) for fp in allfiles: with open(fp, "w") as f: f.write("ok {}\n".format(fp)) for top in self.dtypes: vcfg = [] for vol in allvols: if not vol.startswith(top): continue mode = vol[-2].replace("a", "rw") usr = vol[-1] if usr == "a": usr = "" if "/" not in vol: vol += "/" top, sub = vol.split("/", 1) vcfg.append("{0}/{1}:{1}:{2},{3}".format(top, sub, mode, usr)) pprint.pprint(vcfg) self.args = Cfg(v=vcfg, a=["o:o", "x:x"]) self.asrv = AuthSrv(self.args, self.log) vfiles = [x for x in allfiles if x.startswith(top)] for fp in vfiles: rok, wok = self.can_rw(fp) furl = fp.split("/", 1)[1] durl = furl.rsplit("/", 1)[0] if "/" in furl else "" # file download h, ret = self.curl(furl) res = "ok " + fp in ret print("[{}] {} {} = {}".format(fp, rok, wok, res)) if rok != res: print("\033[33m{}\n# {}\033[0m".format(ret, furl)) self.fail() # file browser: html h, ret = self.curl(durl) res = "'{}'".format(self.fn) in ret print(res) if rok != res: print("\033[33m{}\n# {}\033[0m".format(ret, durl)) self.fail() # file browser: json url = durl + "?ls" h, ret = self.curl(url) res = '"{}"'.format(self.fn) in ret print(res) if rok != res: print("\033[33m{}\n# {}\033[0m".format(ret, url)) self.fail() # tar url = durl + "?tar" h, b = self.curl(url, True) # with open(os.path.join(td, "tar"), "wb") as f: # f.write(b) try: tar = tarfile.open(fileobj=io.BytesIO(b)).getnames() except: tar = [] tar = [x[4:] if x.startswith("top/") else x for x in tar] tar = ["/".join([y for y in [top, durl, x] if y]) for x in tar] tar = [[x] + self.can_rw(x) for x in tar] tar_ok = [x[0] for x in tar if x[1]] tar_ng = [x[0] for x in tar if not x[1]] self.assertEqual([], tar_ng) if durl.split("/")[-1] in self.can_read: ref = [x for x in vfiles if self.in_dive(top + "/" + durl, x)] for f in ref: print("{}: {}".format("ok" if f in tar_ok else "NG", f)) ref.sort() tar_ok.sort() self.assertEqual(ref, tar_ok) # stash h, ret = self.put(url) res = h.startswith("HTTP/1.1 200 ") self.assertEqual(res, wok) def can_rw(self, fp): # lowest non-neutral folder declares permissions expect = fp.split("/")[:-1] for x in reversed(expect): if x != "n": expect = x break return [expect in self.can_read, expect in self.can_write] def in_dive(self, top, fp): # archiver bails at first inaccessible subvolume top = top.strip("/").split("/") fp = fp.split("/") for f1, f2 in zip(top, fp): if f1 != f2: return False for f in fp[len(top) :]: if f == self.fn: return True if f not in self.can_read and f != "n": return False return True def put(self, url): buf = "PUT /{0} HTTP/1.1\r\nCookie: cppwd=o\r\nConnection: close\r\nContent-Length: {1}\r\n\r\nok {0}\n" buf = buf.format(url, len(url) + 4).encode("utf-8") conn = tu.VHttpConn(self.args, self.asrv, self.log, buf) HttpCli(conn).run() return conn.s._reply.decode("utf-8").split("\r\n\r\n", 1) def curl(self, url, binary=False): conn = tu.VHttpConn(self.args, self.asrv, self.log, hdr(url)) HttpCli(conn).run() if binary: h, b = conn.s._reply.split(b"\r\n\r\n", 1) return [h.decode("utf-8"), b] return conn.s._reply.decode("utf-8").split("\r\n\r\n", 1) def log(self, src, msg, c=0): # print(repr(msg)) pass