From 746229846d0520d3a2d0a37b2d3665f192344381 Mon Sep 17 00:00:00 2001 From: ed Date: Tue, 30 Jul 2024 22:44:29 +0000 Subject: [PATCH] add test for zip-download --- tests/test_httpcli.py | 71 ++++++++++++++++++++++++++++--------------- 1 file changed, 47 insertions(+), 24 deletions(-) diff --git a/tests/test_httpcli.py b/tests/test_httpcli.py index 3de1b1d3..4b09743b 100644 --- a/tests/test_httpcli.py +++ b/tests/test_httpcli.py @@ -10,6 +10,7 @@ import tarfile import tempfile import time import unittest +import zipfile from copyparty.authsrv import AuthSrv from copyparty.httpcli import HttpCli @@ -31,6 +32,9 @@ class TestHttpCli(unittest.TestCase): shutil.rmtree(self.td) def test(self): + test_tar = True + test_zip = True + td = os.path.join(self.td, "vfs") os.mkdir(td) os.chdir(td) @@ -40,6 +44,7 @@ class TestHttpCli(unittest.TestCase): self.can_write = ["wa", "wo", "aa", "ao"] self.fn = "g{:x}g".format(int(time.time() * 3)) + tctr = 0 allfiles = [] allvols = [] for top in self.dtypes: @@ -83,6 +88,7 @@ class TestHttpCli(unittest.TestCase): self.asrv = AuthSrv(self.args, self.log) vfiles = [x for x in allfiles if x.startswith(top)] for fp in vfiles: + tctr += 1 rok, wok = self.can_rw(fp) furl = fp.split("/", 1)[1] durl = furl.rsplit("/", 1)[0] if "/" in furl else "" @@ -112,34 +118,51 @@ class TestHttpCli(unittest.TestCase): eprint("\033[33m{}\n# {}\033[0m".format(ret, url)) self.fail() - # tar - url = durl + "?tar" - h, b = self.curl(url, True) - # with open(os.path.join(td, "tar"), "wb") as f: - # f.write(b) - try: - tar = tarfile.open(fileobj=io.BytesIO(b), mode="r|").getnames() - except: - if "HTTP/1.1 403 Forbidden" not in h and b != b"\nJ2EOT": - eprint("bad tar?", url, h, b) - raise - tar = [] - tar = [x.split("/", 1)[1] for x in tar] - tar = ["/".join([y for y in [top, durl, x] if y]) for x in tar] - tar = [[x] + self.can_rw(x) for x in tar] - tar_ok = [x[0] for x in tar if x[1]] - tar_ng = [x[0] for x in tar if not x[1]] - self.assertEqual([], tar_ng) - - if durl.split("/")[-1] in self.can_read: + # expected files in archives + if rok: ref = [x for x in vfiles if self.in_dive(top + "/" + durl, x)] - for f in ref: - ok = f in tar_ok - pr = print if ok else eprint - pr("{}: {}".format("ok" if ok else "NG", f)) ref.sort() + else: + ref = [] + + if test_tar: + url = durl + "?tar" + h, b = self.curl(url, True) + try: + tar = tarfile.open(fileobj=io.BytesIO(b), mode="r|").getnames() + except: + if "HTTP/1.1 403 Forbidden" not in h and b != b"\nJ2EOT": + eprint("bad tar?", url, h, b) + raise + tar = [] + tar = [x.split("/", 1)[1] for x in tar] + tar = ["/".join([y for y in [top, durl, x] if y]) for x in tar] + tar = [[x] + self.can_rw(x) for x in tar] + tar_ok = [x[0] for x in tar if x[1]] + tar_ng = [x[0] for x in tar if not x[1]] tar_ok.sort() self.assertEqual(ref, tar_ok) + self.assertEqual([], tar_ng) + + if test_zip: + url = durl + "?zip" + h, b = self.curl(url, True) + try: + with zipfile.ZipFile(io.BytesIO(b), "r") as zf: + zfi = zf.infolist() + except: + if "HTTP/1.1 403 Forbidden" not in h and b != b"\nJ2EOT": + eprint("bad zip?", url, h, b) + raise + zfi = [] + zfn = [x.filename.split("/", 1)[1] for x in zfi] + zfn = ["/".join([y for y in [top, durl, x] if y]) for x in zfn] + zfn = [[x] + self.can_rw(x) for x in zfn] + zf_ok = [x[0] for x in zfn if x[1]] + zf_ng = [x[0] for x in zfn if not x[1]] + zf_ok.sort() + self.assertEqual(ref, zf_ok) + self.assertEqual([], zf_ng) # stash h, ret = self.put(url)