From 4e75534ef8136528407460f531f4ec11ce661171 Mon Sep 17 00:00:00 2001 From: ed Date: Wed, 11 Sep 2024 20:37:10 +0000 Subject: [PATCH] optimize BrokerThr, 7x faster: reduce the overhead of function-calls from the client thread to the svchub singletons (up2k, thumbs, metrics) down to 14% and optimize up2k chunk-receiver to spend 5x less time bookkeeping which restores up2k performance to before introducing incoming-ETA --- copyparty/broker_mp.py | 6 +++--- copyparty/broker_mpw.py | 4 ++-- copyparty/broker_thr.py | 15 +++++---------- copyparty/broker_util.py | 14 +++++++++++++- copyparty/httpcli.py | 19 ++++++++++++------- 5 files changed, 35 insertions(+), 23 deletions(-) diff --git a/copyparty/broker_mp.py b/copyparty/broker_mp.py index 57b2506c..0b83946f 100644 --- a/copyparty/broker_mp.py +++ b/copyparty/broker_mp.py @@ -9,14 +9,14 @@ import queue from .__init__ import CORES, TYPE_CHECKING from .broker_mpw import MpWorker -from .broker_util import ExceptionalQueue, try_exec +from .broker_util import ExceptionalQueue, NotExQueue, try_exec from .util import Daemon, mp if TYPE_CHECKING: from .svchub import SvcHub if True: # pylint: disable=using-constant-test - from typing import Any + from typing import Any, Union class MProcess(mp.Process): @@ -108,7 +108,7 @@ class BrokerMp(object): if retq_id: proc.q_pend.put((retq_id, "retq", rv)) - def ask(self, dest: str, *args: Any) -> ExceptionalQueue: + def ask(self, dest: str, *args: Any) -> Union[ExceptionalQueue, NotExQueue]: # new non-ipc invoking managed service in hub obj = self.hub diff --git a/copyparty/broker_mpw.py b/copyparty/broker_mpw.py index 2a961fa3..5450050b 100644 --- a/copyparty/broker_mpw.py +++ b/copyparty/broker_mpw.py @@ -11,7 +11,7 @@ import queue from .__init__ import ANYWIN from .authsrv import AuthSrv -from .broker_util import BrokerCli, ExceptionalQueue +from .broker_util import BrokerCli, ExceptionalQueue, NotExQueue from .httpsrv import HttpSrv from .util import FAKE_MP, Daemon, HMaccas @@ -114,7 +114,7 @@ class MpWorker(BrokerCli): else: raise Exception("what is " + str(dest)) - def ask(self, dest: str, *args: Any) -> ExceptionalQueue: + def ask(self, dest: str, *args: Any) -> Union[ExceptionalQueue, NotExQueue]: retq = ExceptionalQueue(1) retq_id = id(retq) with self.retpend_mutex: diff --git a/copyparty/broker_thr.py b/copyparty/broker_thr.py index 9d3edd5c..4b021feb 100644 --- a/copyparty/broker_thr.py +++ b/copyparty/broker_thr.py @@ -5,7 +5,7 @@ import os import threading from .__init__ import TYPE_CHECKING -from .broker_util import BrokerCli, ExceptionalQueue, try_exec +from .broker_util import BrokerCli, ExceptionalQueue, NotExQueue from .httpsrv import HttpSrv from .util import HMaccas @@ -13,7 +13,7 @@ if TYPE_CHECKING: from .svchub import SvcHub if True: # pylint: disable=using-constant-test - from typing import Any + from typing import Any, Union class BrokerThr(BrokerCli): @@ -43,19 +43,14 @@ class BrokerThr(BrokerCli): def noop(self) -> None: pass - def ask(self, dest: str, *args: Any) -> ExceptionalQueue: + def ask(self, dest: str, *args: Any) -> Union[ExceptionalQueue, NotExQueue]: # new ipc invoking managed service in hub obj = self.hub for node in dest.split("."): obj = getattr(obj, node) - rv = try_exec(True, obj, *args) - - # pretend we're broker_mp - retq = ExceptionalQueue(1) - retq.put(rv) - return retq + return NotExQueue(obj(*args)) # type: ignore def say(self, dest: str, *args: Any) -> None: if dest == "listen": @@ -71,4 +66,4 @@ class BrokerThr(BrokerCli): for node in dest.split("."): obj = getattr(obj, node) - try_exec(False, obj, *args) + obj(*args) # type: ignore diff --git a/copyparty/broker_util.py b/copyparty/broker_util.py index 22c419f2..ea987200 100644 --- a/copyparty/broker_util.py +++ b/copyparty/broker_util.py @@ -33,6 +33,18 @@ class ExceptionalQueue(Queue, object): return rv +class NotExQueue(object): + """ + BrokerThr uses this instead of ExceptionalQueue; 7x faster + """ + + def __init__(self, rv: Any) -> None: + self.rv = rv + + def get(self) -> Any: + return self.rv + + class BrokerCli(object): """ helps mypy understand httpsrv.broker but still fails a few levels deeper, @@ -48,7 +60,7 @@ class BrokerCli(object): def __init__(self) -> None: pass - def ask(self, dest: str, *args: Any) -> ExceptionalQueue: + def ask(self, dest: str, *args: Any) -> Union[ExceptionalQueue, NotExQueue]: return ExceptionalQueue(1) def say(self, dest: str, *args: Any) -> None: diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 5ed50914..7a347fe4 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -2311,6 +2311,7 @@ class HttpCli(object): locked = chashes # remaining chunks to be received in this request written = [] # chunks written to disk, but not yet released by up2k num_left = -1 # num chunks left according to most recent up2k release + treport = time.time() # ratelimit up2k reporting to reduce overhead try: if self.args.nw: @@ -2356,11 +2357,8 @@ class HttpCli(object): remains -= chunksize if len(cstart) > 1 and path != os.devnull: - self.log( - "clone {} to {}".format( - cstart[0], " & ".join(unicode(x) for x in cstart[1:]) - ) - ) + t = " & ".join(unicode(x) for x in cstart[1:]) + self.log("clone %s to %s" % (cstart[0], t)) ofs = 0 while ofs < chunksize: bufsz = max(4 * 1024 * 1024, self.args.iobuf) @@ -2378,6 +2376,10 @@ class HttpCli(object): # be quick to keep the tcp winsize scale; # if we can't confirm rn then that's fine written.append(chash) + now = time.time() + if now - treport < 1: + continue + treport = now x = broker.ask("up2k.fast_confirm_chunks", ptop, wark, written) num_left, t = x.get() if num_left < -1: @@ -2385,8 +2387,9 @@ class HttpCli(object): locked = written = [] return False elif num_left >= 0: - self.log("got %d chunks, %d left" % (len(written), num_left), 6) - locked = locked[len(written):] + t = "got %d more chunks, %d left" + self.log(t % (len(written), num_left), 6) + locked = locked[len(written) :] written = [] if not fpool: @@ -2406,6 +2409,8 @@ class HttpCli(object): if num_left < 0: self.loud_reply(t, status=500) return False + t = "got %d more chunks, %d left" + self.log(t % (len(locked), num_left), 6) if num_left < 0: raise Pebkac(500, "unconfirmed; see serverlog")