mirror of
https://github.com/9001/copyparty.git
synced 2025-08-18 09:22:31 -06:00
add test for zip-download
This commit is contained in:
parent
ffd7cd3ca8
commit
746229846d
|
@ -10,6 +10,7 @@ import tarfile
|
||||||
import tempfile
|
import tempfile
|
||||||
import time
|
import time
|
||||||
import unittest
|
import unittest
|
||||||
|
import zipfile
|
||||||
|
|
||||||
from copyparty.authsrv import AuthSrv
|
from copyparty.authsrv import AuthSrv
|
||||||
from copyparty.httpcli import HttpCli
|
from copyparty.httpcli import HttpCli
|
||||||
|
@ -31,6 +32,9 @@ class TestHttpCli(unittest.TestCase):
|
||||||
shutil.rmtree(self.td)
|
shutil.rmtree(self.td)
|
||||||
|
|
||||||
def test(self):
|
def test(self):
|
||||||
|
test_tar = True
|
||||||
|
test_zip = True
|
||||||
|
|
||||||
td = os.path.join(self.td, "vfs")
|
td = os.path.join(self.td, "vfs")
|
||||||
os.mkdir(td)
|
os.mkdir(td)
|
||||||
os.chdir(td)
|
os.chdir(td)
|
||||||
|
@ -40,6 +44,7 @@ class TestHttpCli(unittest.TestCase):
|
||||||
self.can_write = ["wa", "wo", "aa", "ao"]
|
self.can_write = ["wa", "wo", "aa", "ao"]
|
||||||
self.fn = "g{:x}g".format(int(time.time() * 3))
|
self.fn = "g{:x}g".format(int(time.time() * 3))
|
||||||
|
|
||||||
|
tctr = 0
|
||||||
allfiles = []
|
allfiles = []
|
||||||
allvols = []
|
allvols = []
|
||||||
for top in self.dtypes:
|
for top in self.dtypes:
|
||||||
|
@ -83,6 +88,7 @@ class TestHttpCli(unittest.TestCase):
|
||||||
self.asrv = AuthSrv(self.args, self.log)
|
self.asrv = AuthSrv(self.args, self.log)
|
||||||
vfiles = [x for x in allfiles if x.startswith(top)]
|
vfiles = [x for x in allfiles if x.startswith(top)]
|
||||||
for fp in vfiles:
|
for fp in vfiles:
|
||||||
|
tctr += 1
|
||||||
rok, wok = self.can_rw(fp)
|
rok, wok = self.can_rw(fp)
|
||||||
furl = fp.split("/", 1)[1]
|
furl = fp.split("/", 1)[1]
|
||||||
durl = furl.rsplit("/", 1)[0] if "/" in furl else ""
|
durl = furl.rsplit("/", 1)[0] if "/" in furl else ""
|
||||||
|
@ -112,34 +118,51 @@ class TestHttpCli(unittest.TestCase):
|
||||||
eprint("\033[33m{}\n# {}\033[0m".format(ret, url))
|
eprint("\033[33m{}\n# {}\033[0m".format(ret, url))
|
||||||
self.fail()
|
self.fail()
|
||||||
|
|
||||||
# tar
|
# expected files in archives
|
||||||
url = durl + "?tar"
|
if rok:
|
||||||
h, b = self.curl(url, True)
|
|
||||||
# with open(os.path.join(td, "tar"), "wb") as f:
|
|
||||||
# f.write(b)
|
|
||||||
try:
|
|
||||||
tar = tarfile.open(fileobj=io.BytesIO(b), mode="r|").getnames()
|
|
||||||
except:
|
|
||||||
if "HTTP/1.1 403 Forbidden" not in h and b != b"\nJ2EOT":
|
|
||||||
eprint("bad tar?", url, h, b)
|
|
||||||
raise
|
|
||||||
tar = []
|
|
||||||
tar = [x.split("/", 1)[1] for x in tar]
|
|
||||||
tar = ["/".join([y for y in [top, durl, x] if y]) for x in tar]
|
|
||||||
tar = [[x] + self.can_rw(x) for x in tar]
|
|
||||||
tar_ok = [x[0] for x in tar if x[1]]
|
|
||||||
tar_ng = [x[0] for x in tar if not x[1]]
|
|
||||||
self.assertEqual([], tar_ng)
|
|
||||||
|
|
||||||
if durl.split("/")[-1] in self.can_read:
|
|
||||||
ref = [x for x in vfiles if self.in_dive(top + "/" + durl, x)]
|
ref = [x for x in vfiles if self.in_dive(top + "/" + durl, x)]
|
||||||
for f in ref:
|
|
||||||
ok = f in tar_ok
|
|
||||||
pr = print if ok else eprint
|
|
||||||
pr("{}: {}".format("ok" if ok else "NG", f))
|
|
||||||
ref.sort()
|
ref.sort()
|
||||||
|
else:
|
||||||
|
ref = []
|
||||||
|
|
||||||
|
if test_tar:
|
||||||
|
url = durl + "?tar"
|
||||||
|
h, b = self.curl(url, True)
|
||||||
|
try:
|
||||||
|
tar = tarfile.open(fileobj=io.BytesIO(b), mode="r|").getnames()
|
||||||
|
except:
|
||||||
|
if "HTTP/1.1 403 Forbidden" not in h and b != b"\nJ2EOT":
|
||||||
|
eprint("bad tar?", url, h, b)
|
||||||
|
raise
|
||||||
|
tar = []
|
||||||
|
tar = [x.split("/", 1)[1] for x in tar]
|
||||||
|
tar = ["/".join([y for y in [top, durl, x] if y]) for x in tar]
|
||||||
|
tar = [[x] + self.can_rw(x) for x in tar]
|
||||||
|
tar_ok = [x[0] for x in tar if x[1]]
|
||||||
|
tar_ng = [x[0] for x in tar if not x[1]]
|
||||||
tar_ok.sort()
|
tar_ok.sort()
|
||||||
self.assertEqual(ref, tar_ok)
|
self.assertEqual(ref, tar_ok)
|
||||||
|
self.assertEqual([], tar_ng)
|
||||||
|
|
||||||
|
if test_zip:
|
||||||
|
url = durl + "?zip"
|
||||||
|
h, b = self.curl(url, True)
|
||||||
|
try:
|
||||||
|
with zipfile.ZipFile(io.BytesIO(b), "r") as zf:
|
||||||
|
zfi = zf.infolist()
|
||||||
|
except:
|
||||||
|
if "HTTP/1.1 403 Forbidden" not in h and b != b"\nJ2EOT":
|
||||||
|
eprint("bad zip?", url, h, b)
|
||||||
|
raise
|
||||||
|
zfi = []
|
||||||
|
zfn = [x.filename.split("/", 1)[1] for x in zfi]
|
||||||
|
zfn = ["/".join([y for y in [top, durl, x] if y]) for x in zfn]
|
||||||
|
zfn = [[x] + self.can_rw(x) for x in zfn]
|
||||||
|
zf_ok = [x[0] for x in zfn if x[1]]
|
||||||
|
zf_ng = [x[0] for x in zfn if not x[1]]
|
||||||
|
zf_ok.sort()
|
||||||
|
self.assertEqual(ref, zf_ok)
|
||||||
|
self.assertEqual([], zf_ng)
|
||||||
|
|
||||||
# stash
|
# stash
|
||||||
h, ret = self.put(url)
|
h, ret = self.put(url)
|
||||||
|
|
Loading…
Reference in a new issue