From 2f7f9de3f548cf69f186ab78b0057e92650227ea Mon Sep 17 00:00:00 2001 From: ed Date: Sat, 20 Apr 2024 20:13:31 +0000 Subject: [PATCH] pipe: optimize (1 GiB/s @ ryzen5-4500U) --- copyparty/httpcli.py | 18 ++++++++++++++---- copyparty/httpconn.py | 1 + copyparty/httpsrv.py | 2 ++ copyparty/up2k.py | 10 +++++----- copyparty/util.py | 31 +++++++++++++++++++++++++++++++ tests/test_dots.py | 4 +--- tests/util.py | 5 +++-- 7 files changed, 57 insertions(+), 14 deletions(-) diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index b3616b05..8b260891 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -128,6 +128,7 @@ class HttpCli(object): self.ico = conn.ico # mypy404 self.thumbcli = conn.thumbcli # mypy404 self.u2fh = conn.u2fh # mypy404 + self.pipes = conn.pipes # mypy404 self.log_func = conn.log_func # mypy404 self.log_src = conn.log_src # mypy404 self.gen_fk = self._gen_fk if self.args.log_fk else gen_filekey @@ -2949,6 +2950,7 @@ class HttpCli(object): job = json.loads(x.get()) if not job: raise Exception("not found in registry") + self.pipes.set(req_path, job) except Exception as ex: self.log("will not pipe [%s]; %s" % (ap_data, ex), 6) ptop = None @@ -3172,8 +3174,14 @@ class HttpCli(object): tiers = ["uncapped", "reduced speed", "one byte per sec"] while lower < upper and not broken: - x = self.conn.hsrv.broker.ask("up2k.find_job_by_ap", ptop, req_path) - job = json.loads(x.get()) + with self.pipes.lk: + job = self.pipes.get(req_path) + if not job: + x = self.conn.hsrv.broker.ask("up2k.find_job_by_ap", ptop, req_path) + job = json.loads(x.get()) + if job: + self.pipes.set(req_path, job) + if not job: t = "pipe: upload has finished; yeeting remainder" data_end = file_size @@ -3223,13 +3231,15 @@ class HttpCli(object): self.log("moved to tier %d (%s)" % (tier, tiers[tier])) try: - with open(ap_data, "rb") as f: + with open(ap_data, "rb", self.args.iobuf) as f: f.seek(lower) page = f.read(min(winsz, data_end - lower, upper - lower)) if not page: raise Exception("got 0 bytes (EOF?)") except Exception as ex: self.log("pipe: read failed at %.2f MiB: %s" % (lower / M, ex), 3) + with self.pipes.lk: + self.pipes.c.pop(req_path, None) spins += 1 if spins > 3: raise Pebkac(500, "file became unreadable") @@ -3900,7 +3910,7 @@ class HttpCli(object): if not allvols: ret = [{"kinshi": 1}] - jtxt = '{"u":%s,"c":%s}' % (uret, json.dumps(ret, indent=0)) + jtxt = '{"u":%s,"c":%s}' % (uret, json.dumps(ret, separators=(",\n", ": "))) zi = len(uret.split('\n"pd":')) - 1 self.log("%s #%d+%d %.2fsec" % (lm, zi, len(ret), time.time() - t0)) self.reply(jtxt.encode("utf-8", "replace"), mime="application/json") diff --git a/copyparty/httpconn.py b/copyparty/httpconn.py index 3e40f55a..e6c3a9da 100644 --- a/copyparty/httpconn.py +++ b/copyparty/httpconn.py @@ -55,6 +55,7 @@ class HttpConn(object): self.E: EnvParams = self.args.E self.asrv: AuthSrv = hsrv.asrv # mypy404 self.u2fh: Util.FHC = hsrv.u2fh # mypy404 + self.pipes: Util.CachedDict = hsrv.pipes # mypy404 self.ipa_nm: Optional[NetMap] = hsrv.ipa_nm self.xff_nm: Optional[NetMap] = hsrv.xff_nm self.xff_lan: NetMap = hsrv.xff_lan # type: ignore diff --git a/copyparty/httpsrv.py b/copyparty/httpsrv.py index 1c6a8cca..adef2d4a 100644 --- a/copyparty/httpsrv.py +++ b/copyparty/httpsrv.py @@ -61,6 +61,7 @@ from .u2idx import U2idx from .util import ( E_SCK, FHC, + CachedDict, Daemon, Garda, Magician, @@ -130,6 +131,7 @@ class HttpSrv(object): self.t_periodic: Optional[threading.Thread] = None self.u2fh = FHC() + self.pipes = CachedDict(0.2) self.metrics = Metrics(self) self.nreq = 0 self.nsus = 0 diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 4bd219ed..55b8411b 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -291,7 +291,7 @@ class Up2k(object): min(1000 * 24 * 60 * 60 - 1, time.time() - self.db_act) ), } - return json.dumps(ret, indent=4) + return json.dumps(ret, separators=(",\n", ": ")) def find_job_by_ap(self, ptop: str, ap: str) -> str: try: @@ -304,7 +304,7 @@ class Up2k(object): tab2 = self.registry[ptop] for job in tab2.values(): if job["prel"] == dn and job["name"] == fn: - return json.dumps(job, indent=0) + return json.dumps(job, separators=(",\n", ": ")) except: pass @@ -355,7 +355,7 @@ class Up2k(object): } for (at, vp, sz, nn, nh) in ret ] - return json.dumps(ret2, indent=0) + return json.dumps(ret2, separators=(",\n", ": ")) def get_unfinished(self) -> str: if PY2 or not self.reg_mutex.acquire(timeout=0.5): @@ -382,7 +382,7 @@ class Up2k(object): finally: self.reg_mutex.release() - return json.dumps(ret, indent=4) + return json.dumps(ret, separators=(",\n", ": ")) def get_volsize(self, ptop: str) -> tuple[int, int]: with self.reg_mutex: @@ -4200,7 +4200,7 @@ class Up2k(object): path2 = "{}.{}".format(path, os.getpid()) body = {"droppable": self.droppable[ptop], "registry": reg} - j = json.dumps(body, indent=2, sort_keys=True).encode("utf-8") + j = json.dumps(body, sort_keys=True, separators=(",\n", ": ")).encode("utf-8") with gzip.GzipFile(path2, "wb") as f: f.write(j) diff --git a/copyparty/util.py b/copyparty/util.py index f7211d71..6086aa8e 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -759,6 +759,37 @@ class CachedSet(object): self.oldest = now +class CachedDict(object): + def __init__(self, maxage: float) -> None: + self.lk = threading.Lock() + self.c: dict[str, tuple[float, Any]] = {} + self.maxage = maxage + self.oldest = 0.0 + + def set(self, k: str, v: Any) -> None: + now = time.time() + self.c[k] = (now, v) + if now - self.oldest < self.maxage: + return + + c = self.c = {k: v for k, v in self.c.items() if now - v[0] < self.maxage} + try: + self.oldest = min([x[0] for x in c.values()]) + except: + self.oldest = now + + def get(self, k: str) -> Optional[tuple[str, Any]]: + try: + ts, ret = self.c[k] + now = time.time() + if now - ts > self.maxage: + del self.c[k] + return None + return ret + except: + return None + + class FHC(object): class CE(object): def __init__(self, fh: typing.BinaryIO) -> None: diff --git a/tests/test_dots.py b/tests/test_dots.py index 749618c9..946a3773 100644 --- a/tests/test_dots.py +++ b/tests/test_dots.py @@ -3,10 +3,8 @@ from __future__ import print_function, unicode_literals import io -import os -import time import json -import pprint +import os import shutil import tarfile import tempfile diff --git a/tests/util.py b/tests/util.py index 5b664ee0..8a5a75e6 100644 --- a/tests/util.py +++ b/tests/util.py @@ -44,7 +44,7 @@ if MACOS: from copyparty.__init__ import E from copyparty.__main__ import init_E from copyparty.u2idx import U2idx -from copyparty.util import FHC, Garda, Unrecv +from copyparty.util import FHC, CachedDict, Garda, Unrecv init_E(E) @@ -110,7 +110,7 @@ class Cfg(Namespace): def __init__(self, a=None, v=None, c=None, **ka0): ka = {} - ex = "daw dav_auth dav_inf dav_mac dav_rt e2d e2ds e2dsa e2t e2ts e2tsr e2v e2vu e2vp early_ban ed emp exp force_js getmod grid hardlink ih ihead magic never_symlink nid nih no_acode no_athumb no_dav no_dedup no_del no_dupe no_lifetime no_logues no_mv no_readme no_robots no_sb_md no_sb_lg no_scandir no_tarcmp no_thumb no_vthumb no_zip nrand nw q rand smb srch_dbg stats vague_403 vc ver xdev xlink xvol" + ex = "daw dav_auth dav_inf dav_mac dav_rt e2d e2ds e2dsa e2t e2ts e2tsr e2v e2vu e2vp early_ban ed emp exp force_js getmod grid hardlink ih ihead magic never_symlink nid nih no_acode no_athumb no_dav no_dedup no_del no_dupe no_lifetime no_logues no_mv no_pipe no_readme no_robots no_sb_md no_sb_lg no_scandir no_tarcmp no_thumb no_vthumb no_zip nrand nw q rand smb srch_dbg stats vague_403 vc ver xdev xlink xvol" ka.update(**{k: False for k in ex.split()}) ex = "dotpart dotsrch no_dhash no_fastboot no_rescan no_sendfile no_voldump re_dhash plain_ip" @@ -251,6 +251,7 @@ class VHttpConn(object): self.log_func = log self.log_src = "a" self.mutex = threading.Lock() + self.pipes = CachedDict(1) self.u2mutex = threading.Lock() self.nbyte = 0 self.nid = None