From 7781e0529d1e848b788ea836ab4da70ea6994dcd Mon Sep 17 00:00:00 2001 From: ed Date: Sun, 24 Mar 2024 21:53:28 +0000 Subject: [PATCH] nogil: remove `-j` (multiprocessing option) in case cpython's free-threading / nogil performance proves to be good enough that the multiprocessing option can be removed, this is roughly how you'd do it (not everything's been tested) but while you'd expect this change to improve performance, it somehow doesn't, not even measurably so as the performance gain is negligible, the only win here is simplifying the code, and maybe less chance of bugs in the future as a result, this probably won't get merged anytime soon (if at all), and would probably warrant bumping to v2 --- copyparty/__main__.py | 3 +- copyparty/authsrv.py | 25 +++---- copyparty/broker_mp.py | 141 ----------------------------------- copyparty/broker_mpw.py | 123 ------------------------------ copyparty/broker_thr.py | 73 ------------------ copyparty/broker_util.py | 72 ------------------ copyparty/ftpd.py | 8 +- copyparty/httpcli.py | 70 +++++++---------- copyparty/httpconn.py | 2 +- copyparty/httpsrv.py | 34 +++------ copyparty/metrics.py | 11 ++- copyparty/ssdp.py | 7 +- copyparty/svchub.py | 84 ++------------------- copyparty/tcpsrv.py | 4 +- copyparty/th_cli.py | 15 +--- copyparty/up2k.py | 4 +- scripts/pyinstaller/build.sh | 2 - scripts/sfx.ls | 4 - tests/util.py | 12 +-- 19 files changed, 81 insertions(+), 613 deletions(-) delete mode 100644 copyparty/broker_mp.py delete mode 100644 copyparty/broker_mpw.py delete mode 100644 copyparty/broker_thr.py delete mode 100644 copyparty/broker_util.py diff --git a/copyparty/__main__.py b/copyparty/__main__.py index a115eeaf..c1a9604e 100755 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -841,7 +841,6 @@ def add_general(ap, nc, srvname): ap2 = ap.add_argument_group('general options') ap2.add_argument("-c", metavar="PATH", type=u, action="append", help="add config file") ap2.add_argument("-nc", metavar="NUM", type=int, default=nc, help="max num clients") - ap2.add_argument("-j", metavar="CORES", type=int, default=1, help="max num cpu cores, 0=all") ap2.add_argument("-a", metavar="ACCT", type=u, action="append", help="add account, \033[33mUSER\033[0m:\033[33mPASS\033[0m; example [\033[32med:wark\033[0m]") ap2.add_argument("-v", metavar="VOL", type=u, action="append", help="add volume, \033[33mSRC\033[0m:\033[33mDST\033[0m:\033[33mFLAG\033[0m; examples [\033[32m.::r\033[0m], [\033[32m/mnt/nas/music:/music:r:aed\033[0m], see --help-accounts") ap2.add_argument("--grp", metavar="G:N,N", type=u, action="append", help="add group, \033[33mNAME\033[0m:\033[33mUSER1\033[0m,\033[33mUSER2\033[0m,\033[33m...\033[0m; example [\033[32madmins:ed,foo,bar\033[0m]") @@ -881,7 +880,7 @@ def add_upload(ap): ap2.add_argument("--blank-wt", metavar="SEC", type=int, default=300, help="file write grace period (any client can write to a blank file last-modified more recently than \033[33mSEC\033[0m seconds ago)") ap2.add_argument("--reg-cap", metavar="N", type=int, default=38400, help="max number of uploads to keep in memory when running without \033[33m-e2d\033[0m; roughly 1 MiB RAM per 600") ap2.add_argument("--no-fpool", action="store_true", help="disable file-handle pooling -- instead, repeatedly close and reopen files during upload (bad idea to enable this on windows and/or cow filesystems)") - ap2.add_argument("--use-fpool", action="store_true", help="force file-handle pooling, even when it might be dangerous (multiprocessing, filesystems lacking sparse-files support, ...)") + ap2.add_argument("--use-fpool", action="store_true", help="force file-handle pooling, even when it might be dangerous (filesystems lacking sparse-files support, ...)") ap2.add_argument("--hardlink", action="store_true", help="prefer hardlinks instead of symlinks when possible (within same filesystem) (volflag=hardlink)") ap2.add_argument("--never-symlink", action="store_true", help="do not fallback to symlinks when a hardlink cannot be made (volflag=neversymlink)") ap2.add_argument("--no-dedup", action="store_true", help="disable symlink/hardlink creation; copy file contents instead (volflag=copydupes)") diff --git a/copyparty/authsrv.py b/copyparty/authsrv.py index ecaf63fa..b0b47273 100644 --- a/copyparty/authsrv.py +++ b/copyparty/authsrv.py @@ -44,9 +44,7 @@ if True: # pylint: disable=using-constant-test from .util import NamedLogger, RootLogger if TYPE_CHECKING: - from .broker_mp import BrokerMp - from .broker_thr import BrokerThr - from .broker_util import BrokerCli + from .svchub import SvcHub # Vflags: TypeAlias = dict[str, str | bool | float | list[str]] # Vflags: TypeAlias = dict[str, Any] @@ -141,9 +139,9 @@ class Lim(object): sz: int, ptop: str, abspath: str, - broker: Optional[Union["BrokerCli", "BrokerMp", "BrokerThr"]] = None, + hub: Optional["SvcHub"] = None, reg: Optional[dict[str, dict[str, Any]]] = None, - volgetter: str = "up2k.get_volsize", + volgetter: str = "get_volsize", ) -> tuple[str, str]: if reg is not None and self.reg is None: self.reg = reg @@ -154,7 +152,7 @@ class Lim(object): self.chk_rem(rem) if sz != -1: self.chk_sz(sz) - self.chk_vsz(broker, ptop, sz, volgetter) + self.chk_vsz(hub, ptop, sz, volgetter) self.chk_df(abspath, sz) # side effects; keep last-ish ap2, vp2 = self.rot(abspath) @@ -172,16 +170,15 @@ class Lim(object): def chk_vsz( self, - broker: Optional[Union["BrokerCli", "BrokerMp", "BrokerThr"]], + hub: Optional["SvcHub"], ptop: str, sz: int, volgetter: str = "up2k.get_volsize", ) -> None: - if not broker or not self.vbmax + self.vnmax: + if not hub or not self.vbmax + self.vnmax: return - x = broker.ask(volgetter, ptop) - nbytes, nfiles = x.get() + nbytes, nfiles = hub.up2k.getattr(volgetter)(ptop) if self.vbmax and self.vbmax < nbytes + sz: raise Pebkac(400, "volume has exceeded max size") @@ -815,9 +812,7 @@ class AuthSrv(object): yield prev, True - def idp_checkin( - self, broker: Optional["BrokerCli"], uname: str, gname: str - ) -> bool: + def idp_checkin(self, hub: Optional["SvcHub"], uname: str, gname: str) -> bool: if uname in self.acct: return False @@ -837,12 +832,12 @@ class AuthSrv(object): t = "reinitializing due to new user from IdP: [%s:%s]" self.log(t % (uname, gnames), 3) - if not broker: + if not hub: # only true for tests self._reload() return True - broker.ask("_reload_blocking", False).get() + hub._reload_blocking(False) return True def _map_volume_idp( diff --git a/copyparty/broker_mp.py b/copyparty/broker_mp.py deleted file mode 100644 index 848b07ee..00000000 --- a/copyparty/broker_mp.py +++ /dev/null @@ -1,141 +0,0 @@ -# coding: utf-8 -from __future__ import print_function, unicode_literals - -import threading -import time -import traceback - -import queue - -from .__init__ import CORES, TYPE_CHECKING -from .broker_mpw import MpWorker -from .broker_util import ExceptionalQueue, try_exec -from .util import Daemon, mp - -if TYPE_CHECKING: - from .svchub import SvcHub - -if True: # pylint: disable=using-constant-test - from typing import Any - - -class MProcess(mp.Process): - def __init__( - self, - q_pend: queue.Queue[tuple[int, str, list[Any]]], - q_yield: queue.Queue[tuple[int, str, list[Any]]], - target: Any, - args: Any, - ) -> None: - super(MProcess, self).__init__(target=target, args=args) - self.q_pend = q_pend - self.q_yield = q_yield - - -class BrokerMp(object): - """external api; manages MpWorkers""" - - def __init__(self, hub: "SvcHub") -> None: - self.hub = hub - self.log = hub.log - self.args = hub.args - - self.procs = [] - self.mutex = threading.Lock() - - self.num_workers = self.args.j or CORES - self.log("broker", "booting {} subprocesses".format(self.num_workers)) - for n in range(1, self.num_workers + 1): - q_pend: queue.Queue[tuple[int, str, list[Any]]] = mp.Queue(1) # type: ignore - q_yield: queue.Queue[tuple[int, str, list[Any]]] = mp.Queue(64) # type: ignore - - proc = MProcess(q_pend, q_yield, MpWorker, (q_pend, q_yield, self.args, n)) - Daemon(self.collector, "mp-sink-{}".format(n), (proc,)) - self.procs.append(proc) - proc.start() - - def shutdown(self) -> None: - self.log("broker", "shutting down") - for n, proc in enumerate(self.procs): - thr = threading.Thread( - target=proc.q_pend.put((0, "shutdown", [])), - name="mp-shutdown-{}-{}".format(n, len(self.procs)), - ) - thr.start() - - with self.mutex: - procs = self.procs - self.procs = [] - - while procs: - if procs[-1].is_alive(): - time.sleep(0.05) - continue - - procs.pop() - - def reload(self) -> None: - self.log("broker", "reloading") - for _, proc in enumerate(self.procs): - proc.q_pend.put((0, "reload", [])) - - def collector(self, proc: MProcess) -> None: - """receive message from hub in other process""" - while True: - msg = proc.q_yield.get() - retq_id, dest, args = msg - - if dest == "log": - self.log(*args) - - elif dest == "retq": - # response from previous ipc call - raise Exception("invalid broker_mp usage") - - else: - # new ipc invoking managed service in hub - try: - obj = self.hub - for node in dest.split("."): - obj = getattr(obj, node) - - # TODO will deadlock if dest performs another ipc - rv = try_exec(retq_id, obj, *args) - except: - rv = ["exception", "stack", traceback.format_exc()] - - if retq_id: - proc.q_pend.put((retq_id, "retq", rv)) - - def ask(self, dest: str, *args: Any) -> ExceptionalQueue: - - # new non-ipc invoking managed service in hub - obj = self.hub - for node in dest.split("."): - obj = getattr(obj, node) - - rv = try_exec(True, obj, *args) - - retq = ExceptionalQueue(1) - retq.put(rv) - return retq - - def say(self, dest: str, *args: Any) -> None: - """ - send message to non-hub component in other process, - returns a Queue object which eventually contains the response if want_retval - (not-impl here since nothing uses it yet) - """ - if dest == "listen": - for p in self.procs: - p.q_pend.put((0, dest, [args[0], len(self.procs)])) - - elif dest == "set_netdevs": - for p in self.procs: - p.q_pend.put((0, dest, list(args))) - - elif dest == "cb_httpsrv_up": - self.hub.cb_httpsrv_up() - - else: - raise Exception("what is " + str(dest)) diff --git a/copyparty/broker_mpw.py b/copyparty/broker_mpw.py deleted file mode 100644 index e74c4547..00000000 --- a/copyparty/broker_mpw.py +++ /dev/null @@ -1,123 +0,0 @@ -# coding: utf-8 -from __future__ import print_function, unicode_literals - -import argparse -import os -import signal -import sys -import threading - -import queue - -from .__init__ import ANYWIN -from .authsrv import AuthSrv -from .broker_util import BrokerCli, ExceptionalQueue -from .httpsrv import HttpSrv -from .util import FAKE_MP, Daemon, HMaccas - -if True: # pylint: disable=using-constant-test - from types import FrameType - - from typing import Any, Optional, Union - - -class MpWorker(BrokerCli): - """one single mp instance""" - - def __init__( - self, - q_pend: queue.Queue[tuple[int, str, list[Any]]], - q_yield: queue.Queue[tuple[int, str, list[Any]]], - args: argparse.Namespace, - n: int, - ) -> None: - super(MpWorker, self).__init__() - - self.q_pend = q_pend - self.q_yield = q_yield - self.args = args - self.n = n - - self.log = self._log_disabled if args.q and not args.lo else self._log_enabled - - self.retpend: dict[int, Any] = {} - self.retpend_mutex = threading.Lock() - self.mutex = threading.Lock() - - # we inherited signal_handler from parent, - # replace it with something harmless - if not FAKE_MP: - sigs = [signal.SIGINT, signal.SIGTERM] - if not ANYWIN: - sigs.append(signal.SIGUSR1) - - for sig in sigs: - signal.signal(sig, self.signal_handler) - - # starting to look like a good idea - self.asrv = AuthSrv(args, None, False) - - # instantiate all services here (TODO: inheritance?) - self.iphash = HMaccas(os.path.join(self.args.E.cfg, "iphash"), 8) - self.httpsrv = HttpSrv(self, n) - - # on winxp and some other platforms, - # use thr.join() to block all signals - Daemon(self.main, "mpw-main").join() - - def signal_handler(self, sig: Optional[int], frame: Optional[FrameType]) -> None: - # print('k') - pass - - def _log_enabled(self, src: str, msg: str, c: Union[int, str] = 0) -> None: - self.q_yield.put((0, "log", [src, msg, c])) - - def _log_disabled(self, src: str, msg: str, c: Union[int, str] = 0) -> None: - pass - - def logw(self, msg: str, c: Union[int, str] = 0) -> None: - self.log("mp%d" % (self.n,), msg, c) - - def main(self) -> None: - while True: - retq_id, dest, args = self.q_pend.get() - - # self.logw("work: [{}]".format(d[0])) - if dest == "shutdown": - self.httpsrv.shutdown() - self.logw("ok bye") - sys.exit(0) - return - - elif dest == "reload": - self.logw("mpw.asrv reloading") - self.asrv.reload() - self.logw("mpw.asrv reloaded") - - elif dest == "listen": - self.httpsrv.listen(args[0], args[1]) - - elif dest == "set_netdevs": - self.httpsrv.set_netdevs(args[0]) - - elif dest == "retq": - # response from previous ipc call - with self.retpend_mutex: - retq = self.retpend.pop(retq_id) - - retq.put(args) - - else: - raise Exception("what is " + str(dest)) - - def ask(self, dest: str, *args: Any) -> ExceptionalQueue: - retq = ExceptionalQueue(1) - retq_id = id(retq) - with self.retpend_mutex: - self.retpend[retq_id] = retq - - self.q_yield.put((retq_id, dest, list(args))) - return retq - - def say(self, dest: str, *args: Any) -> None: - self.q_yield.put((0, dest, list(args))) diff --git a/copyparty/broker_thr.py b/copyparty/broker_thr.py deleted file mode 100644 index e40cd38d..00000000 --- a/copyparty/broker_thr.py +++ /dev/null @@ -1,73 +0,0 @@ -# coding: utf-8 -from __future__ import print_function, unicode_literals - -import os -import threading - -from .__init__ import TYPE_CHECKING -from .broker_util import BrokerCli, ExceptionalQueue, try_exec -from .httpsrv import HttpSrv -from .util import HMaccas - -if TYPE_CHECKING: - from .svchub import SvcHub - -if True: # pylint: disable=using-constant-test - from typing import Any - - -class BrokerThr(BrokerCli): - """external api; behaves like BrokerMP but using plain threads""" - - def __init__(self, hub: "SvcHub") -> None: - super(BrokerThr, self).__init__() - - self.hub = hub - self.log = hub.log - self.args = hub.args - self.asrv = hub.asrv - - self.mutex = threading.Lock() - self.num_workers = 1 - - # instantiate all services here (TODO: inheritance?) - self.iphash = HMaccas(os.path.join(self.args.E.cfg, "iphash"), 8) - self.httpsrv = HttpSrv(self, None) - self.reload = self.noop - - def shutdown(self) -> None: - # self.log("broker", "shutting down") - self.httpsrv.shutdown() - - def noop(self) -> None: - pass - - def ask(self, dest: str, *args: Any) -> ExceptionalQueue: - - # new ipc invoking managed service in hub - obj = self.hub - for node in dest.split("."): - obj = getattr(obj, node) - - rv = try_exec(True, obj, *args) - - # pretend we're broker_mp - retq = ExceptionalQueue(1) - retq.put(rv) - return retq - - def say(self, dest: str, *args: Any) -> None: - if dest == "listen": - self.httpsrv.listen(args[0], 1) - return - - if dest == "set_netdevs": - self.httpsrv.set_netdevs(args[0]) - return - - # new ipc invoking managed service in hub - obj = self.hub - for node in dest.split("."): - obj = getattr(obj, node) - - try_exec(False, obj, *args) diff --git a/copyparty/broker_util.py b/copyparty/broker_util.py deleted file mode 100644 index 105ac535..00000000 --- a/copyparty/broker_util.py +++ /dev/null @@ -1,72 +0,0 @@ -# coding: utf-8 -from __future__ import print_function, unicode_literals - -import argparse -import traceback - -from queue import Queue - -from .__init__ import TYPE_CHECKING -from .authsrv import AuthSrv -from .util import HMaccas, Pebkac - -if True: # pylint: disable=using-constant-test - from typing import Any, Optional, Union - - from .util import RootLogger - -if TYPE_CHECKING: - from .httpsrv import HttpSrv - - -class ExceptionalQueue(Queue, object): - def get(self, block: bool = True, timeout: Optional[float] = None) -> Any: - rv = super(ExceptionalQueue, self).get(block, timeout) - - if isinstance(rv, list): - if rv[0] == "exception": - if rv[1] == "pebkac": - raise Pebkac(*rv[2:]) - else: - raise Exception(rv[2]) - - return rv - - -class BrokerCli(object): - """ - helps mypy understand httpsrv.broker but still fails a few levels deeper, - for example resolving httpconn.* in httpcli -- see lines tagged #mypy404 - """ - - log: "RootLogger" - args: argparse.Namespace - asrv: AuthSrv - httpsrv: "HttpSrv" - iphash: HMaccas - - def __init__(self) -> None: - pass - - def ask(self, dest: str, *args: Any) -> ExceptionalQueue: - return ExceptionalQueue(1) - - def say(self, dest: str, *args: Any) -> None: - pass - - -def try_exec(want_retval: Union[bool, int], func: Any, *args: list[Any]) -> Any: - try: - return func(*args) - - except Pebkac as ex: - if not want_retval: - raise - - return ["exception", "pebkac", ex.code, str(ex)] - - except: - if not want_retval: - raise - - return ["exception", "stack", traceback.format_exc()] diff --git a/copyparty/ftpd.py b/copyparty/ftpd.py index a918b234..0006f52f 100644 --- a/copyparty/ftpd.py +++ b/copyparty/ftpd.py @@ -88,12 +88,8 @@ class FtpAuth(DummyAuthorizer): if bonk: logging.warning("client banned: invalid passwords") bans[ip] = bonk - try: - # only possible if multiprocessing disabled - self.hub.broker.httpsrv.bans[ip] = bonk # type: ignore - self.hub.broker.httpsrv.nban += 1 # type: ignore - except: - pass + self.hub.httpsrv.bans[ip] = bonk + self.hub.httpsrv.nban += 1 raise AuthenticationFailed("Authentication failed.") diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index a1a5088a..ba3f3239 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -115,6 +115,7 @@ class HttpCli(object): self.t0 = time.time() self.conn = conn + self.hub = conn.hsrv.hub self.u2mutex = conn.u2mutex # mypy404 self.s = conn.s self.sr = conn.sr @@ -475,7 +476,7 @@ class HttpCli(object): ) or self.args.idp_h_key in self.headers if trusted_key and trusted_xff: - self.asrv.idp_checkin(self.conn.hsrv.broker, idp_usr, idp_grp) + self.asrv.idp_checkin(self.hub, idp_usr, idp_grp) else: if not trusted_key: t = 'the idp-h-key header ("%s") is not present in the request; will NOT trust the other headers saying that the client\'s username is "%s" and group is "%s"' @@ -626,7 +627,7 @@ class HttpCli(object): msg += "hint: %s\r\n" % (self.hint,) if "database is locked" in em: - self.conn.hsrv.broker.say("log_stacks") + self.hub.log_stacks() msg += "hint: important info in the server log\r\n" zb = b"
" + html_escape(msg).encode("utf-8", "replace")
@@ -1629,9 +1630,7 @@ class HttpCli(object):
         lim = vfs.get_dbv(rem)[0].lim
         fdir = vfs.canonical(rem)
         if lim:
-            fdir, rem = lim.all(
-                self.ip, rem, remains, vfs.realpath, fdir, self.conn.hsrv.broker
-            )
+            fdir, rem = lim.all(self.ip, rem, remains, vfs.realpath, fdir, self.hub)
 
         fn = None
         if rem and not self.trailing_slash and not bos.path.isdir(fdir):
@@ -1764,7 +1763,7 @@ class HttpCli(object):
             lim.bup(self.ip, post_sz)
             try:
                 lim.chk_sz(post_sz)
-                lim.chk_vsz(self.conn.hsrv.broker, vfs.realpath, post_sz)
+                lim.chk_vsz(self.hub, vfs.realpath, post_sz)
             except:
                 wunlink(self.log, path, vfs.flags)
                 raise
@@ -1823,8 +1822,7 @@ class HttpCli(object):
             raise Pebkac(403, t)
 
         vfs, rem = vfs.get_dbv(rem)
-        self.conn.hsrv.broker.say(
-            "up2k.hash_file",
+        self.hub.up2k.hash_file(
             vfs.realpath,
             vfs.vpath,
             vfs.flags,
@@ -2049,8 +2047,7 @@ class HttpCli(object):
 
         # not to protect u2fh, but to prevent handshakes while files are closing
         with self.u2mutex:
-            x = self.conn.hsrv.broker.ask("up2k.handle_json", body, self.u2fh.aps)
-            ret = x.get()
+            ret = self.hub.up2k.handle_json(body, self.u2fh.aps)
 
         if self.is_vproxied:
             if "purl" in ret:
@@ -2138,7 +2135,7 @@ class HttpCli(object):
         vfs, _ = self.asrv.vfs.get(self.vpath, self.uname, False, True)
         ptop = (vfs.dbv or vfs).realpath
 
-        x = self.conn.hsrv.broker.ask("up2k.handle_chunk", ptop, wark, chash)
+        x = self.hub.up2k.handle_chunk(ptop, wark, chash)
         response = x.get()
         chunksize, cstart, path, lastmod, sprs = response
 
@@ -2207,11 +2204,9 @@ class HttpCli(object):
                 f.close()
                 raise
         finally:
-            x = self.conn.hsrv.broker.ask("up2k.release_chunk", ptop, wark, chash)
-            x.get()  # block client until released
+            self.hub.up2k.release_chunk(ptop, wark, chash)
 
-        x = self.conn.hsrv.broker.ask("up2k.confirm_chunk", ptop, wark, chash)
-        ztis = x.get()
+        ztis = self.hub.up2k.confirm_chunk(ptop, wark, chash)
         try:
             num_left, fin_path = ztis
         except:
@@ -2223,9 +2218,7 @@ class HttpCli(object):
                 self.u2fh.close(path)
 
         if not num_left and not self.args.nw:
-            self.conn.hsrv.broker.ask(
-                "up2k.finish_upload", ptop, wark, self.u2fh.aps
-            ).get()
+            self.hub.up2k.finish_upload(ptop, wark, self.u2fh.aps)
 
         cinf = self.headers.get("x-up2k-stat", "")
 
@@ -2403,7 +2396,7 @@ class HttpCli(object):
         fdir_base = vfs.canonical(rem)
         if lim:
             fdir_base, rem = lim.all(
-                self.ip, rem, -1, vfs.realpath, fdir_base, self.conn.hsrv.broker
+                self.ip, rem, -1, vfs.realpath, fdir_base, self.hub
             )
             upload_vpath = "{}/{}".format(vfs.vpath, rem).strip("/")
             if not nullwrite:
@@ -2511,7 +2504,7 @@ class HttpCli(object):
                         try:
                             lim.chk_df(tabspath, sz, True)
                             lim.chk_sz(sz)
-                            lim.chk_vsz(self.conn.hsrv.broker, vfs.realpath, sz)
+                            lim.chk_vsz(self.hub, vfs.realpath, sz)
                             lim.chk_bup(self.ip)
                             lim.chk_nup(self.ip)
                         except:
@@ -2549,8 +2542,7 @@ class HttpCli(object):
                         raise Pebkac(403, t)
 
                     dbv, vrem = vfs.get_dbv(rem)
-                    self.conn.hsrv.broker.say(
-                        "up2k.hash_file",
+                    self.hub.up2k.hash_file(
                         dbv.realpath,
                         vfs.vpath,
                         dbv.flags,
@@ -2697,7 +2689,7 @@ class HttpCli(object):
         fp = vfs.canonical(rp)
         lim = vfs.get_dbv(rem)[0].lim
         if lim:
-            fp, rp = lim.all(self.ip, rp, clen, vfs.realpath, fp, self.conn.hsrv.broker)
+            fp, rp = lim.all(self.ip, rp, clen, vfs.realpath, fp, self.hub)
             bos.makedirs(fp)
 
         fp = os.path.join(fp, fn)
@@ -2799,7 +2791,7 @@ class HttpCli(object):
             lim.bup(self.ip, sz)
             try:
                 lim.chk_sz(sz)
-                lim.chk_vsz(self.conn.hsrv.broker, vfs.realpath, sz)
+                lim.chk_vsz(self.hub, vfs.realpath, sz)
             except:
                 wunlink(self.log, fp, vfs.flags)
                 raise
@@ -2828,8 +2820,7 @@ class HttpCli(object):
             raise Pebkac(403, t)
 
         vfs, rem = vfs.get_dbv(rem)
-        self.conn.hsrv.broker.say(
-            "up2k.hash_file",
+        self.hub.up2k.hash_file(
             vfs.realpath,
             vfs.vpath,
             vfs.flags,
@@ -3363,8 +3354,8 @@ class HttpCli(object):
         ]
 
         if self.avol and not self.args.no_rescan:
-            x = self.conn.hsrv.broker.ask("up2k.get_state")
-            vs = json.loads(x.get())
+            zs = self.hub.up2k.get_state()
+            vs = json.loads(zs)
             vstate = {("/" + k).rstrip("/") + "/": v for k, v in vs["volstate"].items()}
         else:
             vstate = {}
@@ -3508,10 +3499,8 @@ class HttpCli(object):
 
         vn, _ = self.asrv.vfs.get(self.vpath, self.uname, True, True)
 
-        args = [self.asrv.vfs.all_vols, [vn.vpath], False, True]
+        err = self.hub.up2k.rescan(self.asrv.vfs.all_vols, [vn.vpath], False, True)
 
-        x = self.conn.hsrv.broker.ask("up2k.rescan", *args)
-        err = x.get()
         if not err:
             self.redirect("", "?h")
             return True
@@ -3529,8 +3518,8 @@ class HttpCli(object):
         if self.args.no_reload:
             raise Pebkac(403, "the reload feature is disabled in server config")
 
-        x = self.conn.hsrv.broker.ask("reload")
-        return self.redirect("", "?h", x.get(), "return to", False)
+        zs = self.hub.reload()
+        return self.redirect("", "?h", zs, "return to", False)
 
     def tx_stack(self) -> bool:
         if not self.avol and not [x for x in self.wvol if x in self.rvol]:
@@ -3632,10 +3621,7 @@ class HttpCli(object):
             and (self.uname in vol.axs.uread or self.uname in vol.axs.upget)
         }
 
-        x = self.conn.hsrv.broker.ask(
-            "up2k.get_unfinished_by_user", self.uname, self.ip
-        )
-        uret = x.get()
+        uret = self.hub.up2k.get_unfinished_by_user(self.uname, self.ip)
 
         if not self.args.unpost:
             allvols = []
@@ -3721,10 +3707,8 @@ class HttpCli(object):
         nlim = int(self.uparam.get("lim") or 0)
         lim = [nlim, nlim] if nlim else []
 
-        x = self.conn.hsrv.broker.ask(
-            "up2k.handle_rm", self.uname, self.ip, req, lim, False, unpost
-        )
-        self.loud_reply(x.get())
+        zs = self.hub.up2k.handle_rm(self.uname, self.ip, req, lim, False, unpost)
+        self.loud_reply(zs)
         return True
 
     def handle_mv(self) -> bool:
@@ -3746,8 +3730,8 @@ class HttpCli(object):
         if self.args.no_mv:
             raise Pebkac(403, "the rename/move feature is disabled in server config")
 
-        x = self.conn.hsrv.broker.ask("up2k.handle_mv", self.uname, vsrc, vdst)
-        self.loud_reply(x.get(), status=201)
+        zs = self.hub.up2k.handle_mv(self.uname, vsrc, vdst)
+        self.loud_reply(zs, status=201)
         return True
 
     def tx_ls(self, ls: dict[str, Any]) -> bool:
diff --git a/copyparty/httpconn.py b/copyparty/httpconn.py
index 3e40f55a..3b404e95 100644
--- a/copyparty/httpconn.py
+++ b/copyparty/httpconn.py
@@ -58,7 +58,7 @@ class HttpConn(object):
         self.ipa_nm: Optional[NetMap] = hsrv.ipa_nm
         self.xff_nm: Optional[NetMap] = hsrv.xff_nm
         self.xff_lan: NetMap = hsrv.xff_lan  # type: ignore
-        self.iphash: HMaccas = hsrv.broker.iphash
+        self.iphash: HMaccas = hsrv.hub.iphash
         self.bans: dict[str, int] = hsrv.bans
         self.aclose: dict[str, int] = hsrv.aclose
 
diff --git a/copyparty/httpsrv.py b/copyparty/httpsrv.py
index 1c6a8cca..3b0ec427 100644
--- a/copyparty/httpsrv.py
+++ b/copyparty/httpsrv.py
@@ -77,7 +77,7 @@ from .util import (
 )
 
 if TYPE_CHECKING:
-    from .broker_util import BrokerCli
+    from .svchub import SvcHub
     from .ssdp import SSDPr
 
 if True:  # pylint: disable=using-constant-test
@@ -90,16 +90,13 @@ class HttpSrv(object):
     relying on MpSrv for performance (HttpSrv is just plain threads)
     """
 
-    def __init__(self, broker: "BrokerCli", nid: Optional[int]) -> None:
-        self.broker = broker
+    def __init__(self, hub: "SvcHub", nid: Optional[int]) -> None:
+        self.hub = hub
         self.nid = nid
-        self.args = broker.args
+        self.args = hub.args
         self.E: EnvParams = self.args.E
-        self.log = broker.log
-        self.asrv = broker.asrv
-
-        # redefine in case of multiprocessing
-        socket.setdefaulttimeout(120)
+        self.log = hub.log
+        self.asrv = hub.asrv
 
         self.t0 = time.time()
         nsuf = "-n{}-i{:x}".format(nid, os.getpid()) if nid else ""
@@ -169,7 +166,7 @@ class HttpSrv(object):
         if self.args.zs:
             from .ssdp import SSDPr
 
-            self.ssdp = SSDPr(broker)
+            self.ssdp = SSDPr(hub)
 
         if self.tp_q:
             self.start_threads(4)
@@ -186,8 +183,7 @@ class HttpSrv(object):
 
     def post_init(self) -> None:
         try:
-            x = self.broker.ask("thumbsrv.getcfg")
-            self.th_cfg = x.get()
+            self.th_cfg = self.hub.thumbsrv.getcfg()
         except:
             pass
 
@@ -237,19 +233,11 @@ class HttpSrv(object):
                     self.t_periodic = None
                     return
 
-    def listen(self, sck: socket.socket, nlisteners: int) -> None:
-        if self.args.j != 1:
-            # lost in the pickle; redefine
-            if not ANYWIN or self.args.reuseaddr:
-                sck.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
-            sck.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
-            sck.settimeout(None)  # < does not inherit, ^ opts above do
-
+    def listen(self, sck: socket.socket) -> None:
         ip, port = sck.getsockname()[:2]
         self.srvs.append(sck)
         self.bound.add((ip, port))
-        self.nclimax = math.ceil(self.args.nc * 1.0 / nlisteners)
+        self.nclimax = self.args.nc
         Daemon(
             self.thr_listen,
             "httpsrv-n{}-listen-{}-{}".format(self.nid or "0", ip, port),
@@ -265,7 +253,7 @@ class HttpSrv(object):
         self.log(self.name, msg)
 
         def fun() -> None:
-            self.broker.say("cb_httpsrv_up")
+            self.hub.cb_httpsrv_up()
 
         threading.Thread(target=fun, name="sig-hsrv-up1").start()
 
diff --git a/copyparty/metrics.py b/copyparty/metrics.py
index 3af8be9d..41e53bc1 100644
--- a/copyparty/metrics.py
+++ b/copyparty/metrics.py
@@ -15,6 +15,7 @@ if TYPE_CHECKING:
 class Metrics(object):
     def __init__(self, hsrv: "HttpSrv") -> None:
         self.hsrv = hsrv
+        self.hub = hsrv.hub
 
     def tx(self, cli: "HttpCli") -> bool:
         if not cli.avol:
@@ -88,8 +89,8 @@ class Metrics(object):
         addg("cpp_total_bans", str(self.hsrv.nban), t)
 
         if not args.nos_vst:
-            x = self.hsrv.broker.ask("up2k.get_state")
-            vs = json.loads(x.get())
+            zs = self.hub.up2k.get_state()
+            vs = json.loads(zs.get())
 
             nvidle = 0
             nvbusy = 0
@@ -146,8 +147,7 @@ class Metrics(object):
             volsizes = []
             try:
                 ptops = [x.realpath for _, x in allvols]
-                x = self.hsrv.broker.ask("up2k.get_volsizes", ptops)
-                volsizes = x.get()
+                volsizes = self.hub.up2k.get_volsizes(ptops)
             except Exception as ex:
                 cli.log("tx_stats get_volsizes: {!r}".format(ex), 3)
 
@@ -204,8 +204,7 @@ class Metrics(object):
             tnbytes = 0
             tnfiles = 0
             try:
-                x = self.hsrv.broker.ask("up2k.get_unfinished")
-                xs = x.get()
+                xs = self.hub.up2k.get_unfinished()
                 if not xs:
                     raise Exception("up2k mutex acquisition timed out")
 
diff --git a/copyparty/ssdp.py b/copyparty/ssdp.py
index 3464a50a..754f6d9c 100644
--- a/copyparty/ssdp.py
+++ b/copyparty/ssdp.py
@@ -12,7 +12,6 @@ from .multicast import MC_Sck, MCast
 from .util import CachedSet, html_escape, min_ex
 
 if TYPE_CHECKING:
-    from .broker_util import BrokerCli
     from .httpcli import HttpCli
     from .svchub import SvcHub
 
@@ -32,9 +31,9 @@ class SSDP_Sck(MC_Sck):
 class SSDPr(object):
     """generates http responses for httpcli"""
 
-    def __init__(self, broker: "BrokerCli") -> None:
-        self.broker = broker
-        self.args = broker.args
+    def __init__(self, hub: "SvcHub") -> None:
+        self.hub = hub
+        self.args = hub.args
 
     def reply(self, hc: "HttpCli") -> bool:
         if hc.vpath.endswith("device.xml"):
diff --git a/copyparty/svchub.py b/copyparty/svchub.py
index 028dcab9..1f5ef089 100644
--- a/copyparty/svchub.py
+++ b/copyparty/svchub.py
@@ -28,9 +28,10 @@ if True:  # pylint: disable=using-constant-test
     import typing
     from typing import Any, Optional, Union
 
-from .__init__ import ANYWIN, EXE, MACOS, TYPE_CHECKING, E, EnvParams, unicode
+from .__init__ import ANYWIN, EXE, TYPE_CHECKING, E, EnvParams, unicode
 from .authsrv import BAD_CFG, AuthSrv
 from .cert import ensure_cert
+from .httpsrv import HttpSrv
 from .mtag import HAVE_FFMPEG, HAVE_FFPROBE
 from .tcpsrv import TcpSrv
 from .th_srv import HAVE_PIL, HAVE_VIPS, HAVE_WEBP, ThumbSrv
@@ -51,7 +52,6 @@ from .util import (
     ansi_re,
     build_netmap,
     min_ex,
-    mp,
     odfusion,
     pybin,
     start_log_thrs,
@@ -67,16 +67,6 @@ if TYPE_CHECKING:
 
 
 class SvcHub(object):
-    """
-    Hosts all services which cannot be parallelized due to reliance on monolithic resources.
-    Creates a Broker which does most of the heavy stuff; hosted services can use this to perform work:
-        hub.broker.(destination, args_list).
-
-    Either BrokerThr (plain threads) or BrokerMP (multiprocessing) is used depending on configuration.
-    Nothing is returned synchronously; if you want any value returned from the call,
-    put() can return a queue (if want_reply=True) which has a blocking get() with the response.
-    """
-
     def __init__(
         self,
         args: argparse.Namespace,
@@ -163,16 +153,6 @@ class SvcHub(object):
         if args.log_thrs:
             start_log_thrs(self.log, args.log_thrs, 0)
 
-        if not args.use_fpool and args.j != 1:
-            args.no_fpool = True
-            t = "multithreading enabled with -j {}, so disabling fpool -- this can reduce upload performance on some filesystems"
-            self.log("root", t.format(args.j))
-
-        if not args.no_fpool and args.j != 1:
-            t = "WARNING: ignoring --use-fpool because multithreading (-j{}) is enabled"
-            self.log("root", t.format(args.j), c=3)
-            args.no_fpool = True
-
         for name, arg in (
             ("iobuf", "iobuf"),
             ("s-rd-sz", "s_rd_sz"),
@@ -316,13 +296,7 @@ class SvcHub(object):
         self.mdns: Optional["MDNS"] = None
         self.ssdp: Optional["SSDPd"] = None
 
-        # decide which worker impl to use
-        if self.check_mp_enable():
-            from .broker_mp import BrokerMp as Broker
-        else:
-            from .broker_thr import BrokerThr as Broker  # type: ignore
-
-        self.broker = Broker(self)
+        self.httpsrv = HttpSrv(self, None)
 
     def start_ftpd(self) -> None:
         time.sleep(30)
@@ -361,15 +335,14 @@ class SvcHub(object):
 
     def thr_httpsrv_up(self) -> None:
         time.sleep(1 if self.args.ign_ebind_all else 5)
-        expected = self.broker.num_workers * self.tcpsrv.nsrv
+        expected = self.tcpsrv.nsrv
         failed = expected - self.httpsrv_up
         if not failed:
             return
 
         if self.args.ign_ebind_all:
             if not self.tcpsrv.srv:
-                for _ in range(self.broker.num_workers):
-                    self.broker.say("cb_httpsrv_up")
+                self.cb_httpsrv_up()
             return
 
         if self.args.ign_ebind and self.tcpsrv.srv:
@@ -387,8 +360,6 @@ class SvcHub(object):
 
     def cb_httpsrv_up(self) -> None:
         self.httpsrv_up += 1
-        if self.httpsrv_up != self.broker.num_workers:
-            return
 
         ar = self.args
         for _ in range(10 if ar.ftp or ar.ftps else 0):
@@ -723,7 +694,6 @@ class SvcHub(object):
             self.log("root", "reloading config")
             self.asrv.reload()
             self.up2k.reload(rescan_all_vols)
-            self.broker.reload()
             self.reloading = 0
 
     def _reload_blocking(self, rescan_all_vols: bool = True) -> None:
@@ -808,7 +778,7 @@ class SvcHub(object):
                 tasks.append(Daemon(self.ssdp.stop, "ssdp"))
                 slp = time.time() + 0.5
 
-            self.broker.shutdown()
+            self.httpsrv.shutdown()
             self.tcpsrv.shutdown()
             self.up2k.shutdown()
 
@@ -970,48 +940,6 @@ class SvcHub(object):
             if ex.errno != errno.EPIPE:
                 raise
 
-    def check_mp_support(self) -> str:
-        if MACOS:
-            return "multiprocessing is wonky on mac osx;"
-        elif sys.version_info < (3, 3):
-            return "need python 3.3 or newer for multiprocessing;"
-
-        try:
-            x: mp.Queue[tuple[str, str]] = mp.Queue(1)
-            x.put(("foo", "bar"))
-            if x.get()[0] != "foo":
-                raise Exception()
-        except:
-            return "multiprocessing is not supported on your platform;"
-
-        return ""
-
-    def check_mp_enable(self) -> bool:
-        if self.args.j == 1:
-            return False
-
-        try:
-            if mp.cpu_count() <= 1:
-                raise Exception()
-        except:
-            self.log("svchub", "only one CPU detected; multiprocessing disabled")
-            return False
-
-        try:
-            # support vscode debugger (bonus: same behavior as on windows)
-            mp.set_start_method("spawn", True)
-        except AttributeError:
-            # py2.7 probably, anyways dontcare
-            pass
-
-        err = self.check_mp_support()
-        if not err:
-            return True
-        else:
-            self.log("svchub", err)
-            self.log("svchub", "cannot efficiently use multiple CPU cores")
-            return False
-
     def sd_notify(self) -> None:
         try:
             zb = os.getenv("NOTIFY_SOCKET")
diff --git a/copyparty/tcpsrv.py b/copyparty/tcpsrv.py
index 4bbea2c9..fa47dccd 100644
--- a/copyparty/tcpsrv.py
+++ b/copyparty/tcpsrv.py
@@ -297,7 +297,7 @@ class TcpSrv(object):
             if self.args.q:
                 print(msg)
 
-            self.hub.broker.say("listen", srv)
+            self.hub.httpsrv.listen(srv)
 
         self.srv = srvs
         self.bound = bound
@@ -305,7 +305,7 @@ class TcpSrv(object):
         self._distribute_netdevs()
 
     def _distribute_netdevs(self):
-        self.hub.broker.say("set_netdevs", self.netdevs)
+        self.hub.httpsrv.set_netdevs(self.netdevs)
         self.hub.start_zeroconf()
         gencert(self.log, self.args, self.netdevs)
         self.hub.restart_ftpd()
diff --git a/copyparty/th_cli.py b/copyparty/th_cli.py
index 9cfef9aa..6fc3c167 100644
--- a/copyparty/th_cli.py
+++ b/copyparty/th_cli.py
@@ -7,7 +7,6 @@ from .__init__ import TYPE_CHECKING
 from .authsrv import VFS
 from .bos import bos
 from .th_srv import HAVE_WEBP, thumb_path
-from .util import Cooldown
 
 if True:  # pylint: disable=using-constant-test
     from typing import Optional, Union
@@ -18,14 +17,11 @@ if TYPE_CHECKING:
 
 class ThumbCli(object):
     def __init__(self, hsrv: "HttpSrv") -> None:
-        self.broker = hsrv.broker
+        self.hub = hsrv.hub
         self.log_func = hsrv.log
         self.args = hsrv.args
         self.asrv = hsrv.asrv
 
-        # cache on both sides for less broker spam
-        self.cooldown = Cooldown(self.args.th_poke)
-
         try:
             c = hsrv.th_cfg
             if not c:
@@ -134,13 +130,11 @@ class ThumbCli(object):
 
         if ret:
             tdir = os.path.dirname(tpath)
-            if self.cooldown.poke(tdir):
-                self.broker.say("thumbsrv.poke", tdir)
+            self.hub.thumbsrv.poke(tdir)
 
             if want_opus:
                 # audio files expire individually
-                if self.cooldown.poke(tpath):
-                    self.broker.say("thumbsrv.poke", tpath)
+                self.hub.thumbsrv.poke(tpath)
 
             return ret
 
@@ -150,5 +144,4 @@ class ThumbCli(object):
         if not bos.path.getsize(os.path.join(ptop, rem)):
             return None
 
-        x = self.broker.ask("thumbsrv.get", ptop, rem, mtime, fmt)
-        return x.get()  # type: ignore
+        return self.hub.thumbsrv.get(ptop, rem, mtime, fmt)
diff --git a/copyparty/up2k.py b/copyparty/up2k.py
index 39f386a0..10199f41 100644
--- a/copyparty/up2k.py
+++ b/copyparty/up2k.py
@@ -2745,9 +2745,9 @@ class Up2k(object):
                         cj["size"],
                         cj["ptop"],
                         ap1,
-                        self.hub.broker,
+                        self.hub,
                         reg,
-                        "up2k._get_volsize",
+                        "_get_volsize",
                     )
                     bos.makedirs(ap2)
                     vfs.lim.nup(cj["addr"])
diff --git a/scripts/pyinstaller/build.sh b/scripts/pyinstaller/build.sh
index adf94c98..602cd702 100644
--- a/scripts/pyinstaller/build.sh
+++ b/scripts/pyinstaller/build.sh
@@ -69,8 +69,6 @@ sed -ri s/copyparty.exe/copyparty$esuf.exe/ loader.rc2
 
 excl=(
     asyncio
-    copyparty.broker_mp
-    copyparty.broker_mpw
     copyparty.smbd
     ctypes.macholib
     curses
diff --git a/scripts/sfx.ls b/scripts/sfx.ls
index 407137be..c88d3af4 100644
--- a/scripts/sfx.ls
+++ b/scripts/sfx.ls
@@ -7,10 +7,6 @@ copyparty/bos,
 copyparty/bos/__init__.py,
 copyparty/bos/bos.py,
 copyparty/bos/path.py,
-copyparty/broker_mp.py,
-copyparty/broker_mpw.py,
-copyparty/broker_thr.py,
-copyparty/broker_util.py,
 copyparty/cert.py,
 copyparty/cfg.py,
 copyparty/dxml.py,
diff --git a/tests/util.py b/tests/util.py
index 415ac5a0..bbad57b4 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -170,12 +170,14 @@ class Cfg(Namespace):
         )
 
 
-class NullBroker(object):
-    def say(self, *args):
+class NullUp2k(object):
+    def hash_file(*a):
         pass
 
-    def ask(self, *args):
-        pass
+
+class NullHub(object):
+    def __init__(self):
+        self.up2k = NullUp2k()
 
 
 class VSock(object):
@@ -206,7 +208,7 @@ class VHttpSrv(object):
         self.asrv = asrv
         self.log = log
 
-        self.broker = NullBroker()
+        self.hub = NullHub()
         self.prism = None
         self.bans = {}
         self.nreq = 0