mirror of
https://github.com/9001/copyparty.git
synced 2025-08-17 09:02:15 -06:00
211 lines
5.3 KiB
Python
211 lines
5.3 KiB
Python
import os
|
|
import sys
|
|
import time
|
|
import shlex
|
|
import shutil
|
|
import signal
|
|
import tempfile
|
|
import requests
|
|
import threading
|
|
import subprocess as sp
|
|
|
|
|
|
CPP = []
|
|
|
|
|
|
class Cpp(object):
|
|
def __init__(self, args):
|
|
args = [sys.executable, "-m", "copyparty"] + args
|
|
print(" ".join([shlex.quote(x) for x in args]))
|
|
|
|
self.ls_pre = set(list(os.listdir()))
|
|
self.p = sp.Popen(args)
|
|
# , stdout=sp.PIPE, stderr=sp.PIPE)
|
|
|
|
self.t = threading.Thread(target=self._run)
|
|
self.t.daemon = True
|
|
self.t.start()
|
|
|
|
def _run(self):
|
|
self.so, self.se = self.p.communicate()
|
|
|
|
def stop(self, wait):
|
|
if wait:
|
|
os.kill(self.p.pid, signal.SIGINT)
|
|
self.t.join(timeout=2)
|
|
else:
|
|
self.p.kill() # macos py3.8
|
|
|
|
def clean(self):
|
|
t = os.listdir()
|
|
for f in t:
|
|
if f not in self.ls_pre and f.startswith("up."):
|
|
os.unlink(f)
|
|
|
|
def await_idle(self, ub, timeout):
|
|
req = ["scanning</td><td>False", "hash-q</td><td>0", "tag-q</td><td>0"]
|
|
lim = int(timeout * 10)
|
|
u = ub + "?h"
|
|
for n in range(lim):
|
|
try:
|
|
time.sleep(0.1)
|
|
r = requests.get(u, timeout=0.1)
|
|
for x in req:
|
|
if x not in r.text:
|
|
print("ST: {}/{} miss {}".format(n, lim, x))
|
|
raise Exception()
|
|
print("ST: idle")
|
|
return
|
|
except:
|
|
pass
|
|
|
|
|
|
def tc1():
|
|
ub = "http://127.0.0.1:4321/"
|
|
td = os.path.join("srv", "smoketest")
|
|
try:
|
|
shutil.rmtree(td)
|
|
except:
|
|
if os.path.exists(td):
|
|
raise
|
|
|
|
for _ in range(10):
|
|
try:
|
|
os.mkdir(td)
|
|
except:
|
|
time.sleep(0.1) # win10
|
|
|
|
assert os.path.exists(td)
|
|
|
|
vidp = os.path.join(tempfile.gettempdir(), "smoketest.h264")
|
|
if not os.path.exists(vidp):
|
|
cmd = "ffmpeg -f lavfi -i testsrc=48x32:3 -t 1 -c:v libx264 -tune animation -preset veryslow -crf 69"
|
|
sp.check_call(cmd.split(" ") + [vidp])
|
|
|
|
with open(vidp, "rb") as f:
|
|
ovid = f.read()
|
|
|
|
args = [
|
|
"-p4321",
|
|
"-e2dsa",
|
|
"-e2tsr",
|
|
"--no-mutagen",
|
|
"--th-ff-jpg",
|
|
"--hist",
|
|
os.path.join(td, "dbm"),
|
|
]
|
|
pdirs = []
|
|
hpaths = {}
|
|
|
|
for d1 in ["r", "w", "a"]:
|
|
pdirs.append("{}/{}".format(td, d1))
|
|
pdirs.append("{}/{}/j".format(td, d1))
|
|
for d2 in ["r", "w", "a"]:
|
|
d = os.path.join(td, d1, "j", d2)
|
|
pdirs.append(d)
|
|
os.makedirs(d)
|
|
|
|
pdirs = [x.replace("\\", "/") for x in pdirs]
|
|
udirs = [x.split("/", 2)[2] for x in pdirs]
|
|
perms = [x.rstrip("j/")[-1] for x in pdirs]
|
|
perms = ["rw" if x == "a" else x for x in perms]
|
|
for pd, ud, p in zip(pdirs, udirs, perms):
|
|
if ud[-1] == "j":
|
|
continue
|
|
|
|
hp = None
|
|
if pd.endswith("st/a"):
|
|
hp = hpaths[ud] = os.path.join(td, "db1")
|
|
elif pd[:-1].endswith("a/j/"):
|
|
hpaths[ud] = os.path.join(td, "dbm")
|
|
hp = None
|
|
else:
|
|
hp = "-"
|
|
hpaths[ud] = os.path.join(pd, ".hist")
|
|
|
|
arg = "{}:{}:{}".format(pd, ud, p, hp)
|
|
if hp:
|
|
arg += ":c,hist=" + hp
|
|
|
|
args += ["-v", arg]
|
|
|
|
# return
|
|
cpp = Cpp(args)
|
|
CPP.append(cpp)
|
|
cpp.await_idle(ub, 3)
|
|
|
|
for d in udirs:
|
|
vid = ovid + "\n{}".format(d).encode("utf-8")
|
|
try:
|
|
requests.post(ub + d, data={"act": "bput"}, files={"f": ("a.h264", vid)})
|
|
except:
|
|
pass
|
|
|
|
cpp.clean()
|
|
|
|
# GET permission
|
|
for d, p in zip(udirs, perms):
|
|
u = "{}{}/a.h264".format(ub, d)
|
|
r = requests.get(u)
|
|
ok = bool(r)
|
|
if ok != (p in ["rw"]):
|
|
raise Exception("get {} with perm {} at {}".format(ok, p, u))
|
|
|
|
# stat filesystem
|
|
for d, p in zip(pdirs, perms):
|
|
u = "{}/a.h264".format(d)
|
|
ok = os.path.exists(u)
|
|
if ok != (p in ["rw", "w"]):
|
|
raise Exception("stat {} with perm {} at {}".format(ok, p, u))
|
|
|
|
# GET thumbnail, vreify contents
|
|
for d, p in zip(udirs, perms):
|
|
u = "{}{}/a.h264?th=j".format(ub, d)
|
|
r = requests.get(u)
|
|
ok = bool(r and r.content[:3] == b"\xff\xd8\xff")
|
|
if ok != (p in ["rw"]):
|
|
raise Exception("thumb {} with perm {} at {}".format(ok, p, u))
|
|
|
|
# check tags
|
|
cpp.await_idle(ub, 5)
|
|
for d, p in zip(udirs, perms):
|
|
u = "{}{}?ls".format(ub, d)
|
|
r = requests.get(u)
|
|
j = r.json() if r else False
|
|
tag = None
|
|
if j:
|
|
for f in j["files"]:
|
|
tag = tag or f["tags"].get("res")
|
|
|
|
r_ok = bool(j)
|
|
w_ok = bool(r_ok and j.get("files"))
|
|
|
|
if not r_ok or w_ok != (p in ["rw"]):
|
|
raise Exception("ls {} with perm {} at {}".format(ok, p, u))
|
|
|
|
if (tag and p != "rw") or (not tag and p == "rw"):
|
|
raise Exception("tag {} with perm {} at {}".format(tag, p, u))
|
|
|
|
if tag is not None and tag != "48x32":
|
|
raise Exception("tag [{}] at {}".format(tag, u))
|
|
|
|
cpp.stop(True)
|
|
|
|
|
|
def run(tc):
|
|
try:
|
|
tc()
|
|
finally:
|
|
try:
|
|
CPP[0].stop(False)
|
|
except:
|
|
pass
|
|
|
|
|
|
def main():
|
|
run(tc1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|