optimize BrokerThr, 7x faster:

reduce the overhead of function-calls from the client thread
to the svchub singletons (up2k, thumbs, metrics) down to 14%

and optimize up2k chunk-receiver to spend 5x less time bookkeeping
which restores up2k performance to before introducing incoming-ETA
This commit is contained in:
ed 2024-09-11 20:37:10 +00:00
parent 7a573cafd1
commit 4e75534ef8
5 changed files with 35 additions and 23 deletions

View file

@ -9,14 +9,14 @@ import queue
from .__init__ import CORES, TYPE_CHECKING from .__init__ import CORES, TYPE_CHECKING
from .broker_mpw import MpWorker from .broker_mpw import MpWorker
from .broker_util import ExceptionalQueue, try_exec from .broker_util import ExceptionalQueue, NotExQueue, try_exec
from .util import Daemon, mp from .util import Daemon, mp
if TYPE_CHECKING: if TYPE_CHECKING:
from .svchub import SvcHub from .svchub import SvcHub
if True: # pylint: disable=using-constant-test if True: # pylint: disable=using-constant-test
from typing import Any from typing import Any, Union
class MProcess(mp.Process): class MProcess(mp.Process):
@ -108,7 +108,7 @@ class BrokerMp(object):
if retq_id: if retq_id:
proc.q_pend.put((retq_id, "retq", rv)) proc.q_pend.put((retq_id, "retq", rv))
def ask(self, dest: str, *args: Any) -> ExceptionalQueue: def ask(self, dest: str, *args: Any) -> Union[ExceptionalQueue, NotExQueue]:
# new non-ipc invoking managed service in hub # new non-ipc invoking managed service in hub
obj = self.hub obj = self.hub

View file

@ -11,7 +11,7 @@ import queue
from .__init__ import ANYWIN from .__init__ import ANYWIN
from .authsrv import AuthSrv from .authsrv import AuthSrv
from .broker_util import BrokerCli, ExceptionalQueue from .broker_util import BrokerCli, ExceptionalQueue, NotExQueue
from .httpsrv import HttpSrv from .httpsrv import HttpSrv
from .util import FAKE_MP, Daemon, HMaccas from .util import FAKE_MP, Daemon, HMaccas
@ -114,7 +114,7 @@ class MpWorker(BrokerCli):
else: else:
raise Exception("what is " + str(dest)) raise Exception("what is " + str(dest))
def ask(self, dest: str, *args: Any) -> ExceptionalQueue: def ask(self, dest: str, *args: Any) -> Union[ExceptionalQueue, NotExQueue]:
retq = ExceptionalQueue(1) retq = ExceptionalQueue(1)
retq_id = id(retq) retq_id = id(retq)
with self.retpend_mutex: with self.retpend_mutex:

View file

@ -5,7 +5,7 @@ import os
import threading import threading
from .__init__ import TYPE_CHECKING from .__init__ import TYPE_CHECKING
from .broker_util import BrokerCli, ExceptionalQueue, try_exec from .broker_util import BrokerCli, ExceptionalQueue, NotExQueue
from .httpsrv import HttpSrv from .httpsrv import HttpSrv
from .util import HMaccas from .util import HMaccas
@ -13,7 +13,7 @@ if TYPE_CHECKING:
from .svchub import SvcHub from .svchub import SvcHub
if True: # pylint: disable=using-constant-test if True: # pylint: disable=using-constant-test
from typing import Any from typing import Any, Union
class BrokerThr(BrokerCli): class BrokerThr(BrokerCli):
@ -43,19 +43,14 @@ class BrokerThr(BrokerCli):
def noop(self) -> None: def noop(self) -> None:
pass pass
def ask(self, dest: str, *args: Any) -> ExceptionalQueue: def ask(self, dest: str, *args: Any) -> Union[ExceptionalQueue, NotExQueue]:
# new ipc invoking managed service in hub # new ipc invoking managed service in hub
obj = self.hub obj = self.hub
for node in dest.split("."): for node in dest.split("."):
obj = getattr(obj, node) obj = getattr(obj, node)
rv = try_exec(True, obj, *args) return NotExQueue(obj(*args)) # type: ignore
# pretend we're broker_mp
retq = ExceptionalQueue(1)
retq.put(rv)
return retq
def say(self, dest: str, *args: Any) -> None: def say(self, dest: str, *args: Any) -> None:
if dest == "listen": if dest == "listen":
@ -71,4 +66,4 @@ class BrokerThr(BrokerCli):
for node in dest.split("."): for node in dest.split("."):
obj = getattr(obj, node) obj = getattr(obj, node)
try_exec(False, obj, *args) obj(*args) # type: ignore

View file

@ -33,6 +33,18 @@ class ExceptionalQueue(Queue, object):
return rv return rv
class NotExQueue(object):
"""
BrokerThr uses this instead of ExceptionalQueue; 7x faster
"""
def __init__(self, rv: Any) -> None:
self.rv = rv
def get(self) -> Any:
return self.rv
class BrokerCli(object): class BrokerCli(object):
""" """
helps mypy understand httpsrv.broker but still fails a few levels deeper, helps mypy understand httpsrv.broker but still fails a few levels deeper,
@ -48,7 +60,7 @@ class BrokerCli(object):
def __init__(self) -> None: def __init__(self) -> None:
pass pass
def ask(self, dest: str, *args: Any) -> ExceptionalQueue: def ask(self, dest: str, *args: Any) -> Union[ExceptionalQueue, NotExQueue]:
return ExceptionalQueue(1) return ExceptionalQueue(1)
def say(self, dest: str, *args: Any) -> None: def say(self, dest: str, *args: Any) -> None:

View file

@ -2311,6 +2311,7 @@ class HttpCli(object):
locked = chashes # remaining chunks to be received in this request locked = chashes # remaining chunks to be received in this request
written = [] # chunks written to disk, but not yet released by up2k written = [] # chunks written to disk, but not yet released by up2k
num_left = -1 # num chunks left according to most recent up2k release num_left = -1 # num chunks left according to most recent up2k release
treport = time.time() # ratelimit up2k reporting to reduce overhead
try: try:
if self.args.nw: if self.args.nw:
@ -2356,11 +2357,8 @@ class HttpCli(object):
remains -= chunksize remains -= chunksize
if len(cstart) > 1 and path != os.devnull: if len(cstart) > 1 and path != os.devnull:
self.log( t = " & ".join(unicode(x) for x in cstart[1:])
"clone {} to {}".format( self.log("clone %s to %s" % (cstart[0], t))
cstart[0], " & ".join(unicode(x) for x in cstart[1:])
)
)
ofs = 0 ofs = 0
while ofs < chunksize: while ofs < chunksize:
bufsz = max(4 * 1024 * 1024, self.args.iobuf) bufsz = max(4 * 1024 * 1024, self.args.iobuf)
@ -2378,6 +2376,10 @@ class HttpCli(object):
# be quick to keep the tcp winsize scale; # be quick to keep the tcp winsize scale;
# if we can't confirm rn then that's fine # if we can't confirm rn then that's fine
written.append(chash) written.append(chash)
now = time.time()
if now - treport < 1:
continue
treport = now
x = broker.ask("up2k.fast_confirm_chunks", ptop, wark, written) x = broker.ask("up2k.fast_confirm_chunks", ptop, wark, written)
num_left, t = x.get() num_left, t = x.get()
if num_left < -1: if num_left < -1:
@ -2385,8 +2387,9 @@ class HttpCli(object):
locked = written = [] locked = written = []
return False return False
elif num_left >= 0: elif num_left >= 0:
self.log("got %d chunks, %d left" % (len(written), num_left), 6) t = "got %d more chunks, %d left"
locked = locked[len(written):] self.log(t % (len(written), num_left), 6)
locked = locked[len(written) :]
written = [] written = []
if not fpool: if not fpool:
@ -2406,6 +2409,8 @@ class HttpCli(object):
if num_left < 0: if num_left < 0:
self.loud_reply(t, status=500) self.loud_reply(t, status=500)
return False return False
t = "got %d more chunks, %d left"
self.log(t % (len(locked), num_left), 6)
if num_left < 0: if num_left < 0:
raise Pebkac(500, "unconfirmed; see serverlog") raise Pebkac(500, "unconfirmed; see serverlog")