diff --git a/copyparty/__main__.py b/copyparty/__main__.py index a115eeaf..c1a9604e 100755 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -841,7 +841,6 @@ def add_general(ap, nc, srvname): ap2 = ap.add_argument_group('general options') ap2.add_argument("-c", metavar="PATH", type=u, action="append", help="add config file") ap2.add_argument("-nc", metavar="NUM", type=int, default=nc, help="max num clients") - ap2.add_argument("-j", metavar="CORES", type=int, default=1, help="max num cpu cores, 0=all") ap2.add_argument("-a", metavar="ACCT", type=u, action="append", help="add account, \033[33mUSER\033[0m:\033[33mPASS\033[0m; example [\033[32med:wark\033[0m]") ap2.add_argument("-v", metavar="VOL", type=u, action="append", help="add volume, \033[33mSRC\033[0m:\033[33mDST\033[0m:\033[33mFLAG\033[0m; examples [\033[32m.::r\033[0m], [\033[32m/mnt/nas/music:/music:r:aed\033[0m], see --help-accounts") ap2.add_argument("--grp", metavar="G:N,N", type=u, action="append", help="add group, \033[33mNAME\033[0m:\033[33mUSER1\033[0m,\033[33mUSER2\033[0m,\033[33m...\033[0m; example [\033[32madmins:ed,foo,bar\033[0m]") @@ -881,7 +880,7 @@ def add_upload(ap): ap2.add_argument("--blank-wt", metavar="SEC", type=int, default=300, help="file write grace period (any client can write to a blank file last-modified more recently than \033[33mSEC\033[0m seconds ago)") ap2.add_argument("--reg-cap", metavar="N", type=int, default=38400, help="max number of uploads to keep in memory when running without \033[33m-e2d\033[0m; roughly 1 MiB RAM per 600") ap2.add_argument("--no-fpool", action="store_true", help="disable file-handle pooling -- instead, repeatedly close and reopen files during upload (bad idea to enable this on windows and/or cow filesystems)") - ap2.add_argument("--use-fpool", action="store_true", help="force file-handle pooling, even when it might be dangerous (multiprocessing, filesystems lacking sparse-files support, ...)") + ap2.add_argument("--use-fpool", action="store_true", help="force file-handle pooling, even when it might be dangerous (filesystems lacking sparse-files support, ...)") ap2.add_argument("--hardlink", action="store_true", help="prefer hardlinks instead of symlinks when possible (within same filesystem) (volflag=hardlink)") ap2.add_argument("--never-symlink", action="store_true", help="do not fallback to symlinks when a hardlink cannot be made (volflag=neversymlink)") ap2.add_argument("--no-dedup", action="store_true", help="disable symlink/hardlink creation; copy file contents instead (volflag=copydupes)") diff --git a/copyparty/authsrv.py b/copyparty/authsrv.py index ecaf63fa..b0b47273 100644 --- a/copyparty/authsrv.py +++ b/copyparty/authsrv.py @@ -44,9 +44,7 @@ if True: # pylint: disable=using-constant-test from .util import NamedLogger, RootLogger if TYPE_CHECKING: - from .broker_mp import BrokerMp - from .broker_thr import BrokerThr - from .broker_util import BrokerCli + from .svchub import SvcHub # Vflags: TypeAlias = dict[str, str | bool | float | list[str]] # Vflags: TypeAlias = dict[str, Any] @@ -141,9 +139,9 @@ class Lim(object): sz: int, ptop: str, abspath: str, - broker: Optional[Union["BrokerCli", "BrokerMp", "BrokerThr"]] = None, + hub: Optional["SvcHub"] = None, reg: Optional[dict[str, dict[str, Any]]] = None, - volgetter: str = "up2k.get_volsize", + volgetter: str = "get_volsize", ) -> tuple[str, str]: if reg is not None and self.reg is None: self.reg = reg @@ -154,7 +152,7 @@ class Lim(object): self.chk_rem(rem) if sz != -1: self.chk_sz(sz) - self.chk_vsz(broker, ptop, sz, volgetter) + self.chk_vsz(hub, ptop, sz, volgetter) self.chk_df(abspath, sz) # side effects; keep last-ish ap2, vp2 = self.rot(abspath) @@ -172,16 +170,15 @@ class Lim(object): def chk_vsz( self, - broker: Optional[Union["BrokerCli", "BrokerMp", "BrokerThr"]], + hub: Optional["SvcHub"], ptop: str, sz: int, volgetter: str = "up2k.get_volsize", ) -> None: - if not broker or not self.vbmax + self.vnmax: + if not hub or not self.vbmax + self.vnmax: return - x = broker.ask(volgetter, ptop) - nbytes, nfiles = x.get() + nbytes, nfiles = hub.up2k.getattr(volgetter)(ptop) if self.vbmax and self.vbmax < nbytes + sz: raise Pebkac(400, "volume has exceeded max size") @@ -815,9 +812,7 @@ class AuthSrv(object): yield prev, True - def idp_checkin( - self, broker: Optional["BrokerCli"], uname: str, gname: str - ) -> bool: + def idp_checkin(self, hub: Optional["SvcHub"], uname: str, gname: str) -> bool: if uname in self.acct: return False @@ -837,12 +832,12 @@ class AuthSrv(object): t = "reinitializing due to new user from IdP: [%s:%s]" self.log(t % (uname, gnames), 3) - if not broker: + if not hub: # only true for tests self._reload() return True - broker.ask("_reload_blocking", False).get() + hub._reload_blocking(False) return True def _map_volume_idp( diff --git a/copyparty/broker_mp.py b/copyparty/broker_mp.py deleted file mode 100644 index 848b07ee..00000000 --- a/copyparty/broker_mp.py +++ /dev/null @@ -1,141 +0,0 @@ -# coding: utf-8 -from __future__ import print_function, unicode_literals - -import threading -import time -import traceback - -import queue - -from .__init__ import CORES, TYPE_CHECKING -from .broker_mpw import MpWorker -from .broker_util import ExceptionalQueue, try_exec -from .util import Daemon, mp - -if TYPE_CHECKING: - from .svchub import SvcHub - -if True: # pylint: disable=using-constant-test - from typing import Any - - -class MProcess(mp.Process): - def __init__( - self, - q_pend: queue.Queue[tuple[int, str, list[Any]]], - q_yield: queue.Queue[tuple[int, str, list[Any]]], - target: Any, - args: Any, - ) -> None: - super(MProcess, self).__init__(target=target, args=args) - self.q_pend = q_pend - self.q_yield = q_yield - - -class BrokerMp(object): - """external api; manages MpWorkers""" - - def __init__(self, hub: "SvcHub") -> None: - self.hub = hub - self.log = hub.log - self.args = hub.args - - self.procs = [] - self.mutex = threading.Lock() - - self.num_workers = self.args.j or CORES - self.log("broker", "booting {} subprocesses".format(self.num_workers)) - for n in range(1, self.num_workers + 1): - q_pend: queue.Queue[tuple[int, str, list[Any]]] = mp.Queue(1) # type: ignore - q_yield: queue.Queue[tuple[int, str, list[Any]]] = mp.Queue(64) # type: ignore - - proc = MProcess(q_pend, q_yield, MpWorker, (q_pend, q_yield, self.args, n)) - Daemon(self.collector, "mp-sink-{}".format(n), (proc,)) - self.procs.append(proc) - proc.start() - - def shutdown(self) -> None: - self.log("broker", "shutting down") - for n, proc in enumerate(self.procs): - thr = threading.Thread( - target=proc.q_pend.put((0, "shutdown", [])), - name="mp-shutdown-{}-{}".format(n, len(self.procs)), - ) - thr.start() - - with self.mutex: - procs = self.procs - self.procs = [] - - while procs: - if procs[-1].is_alive(): - time.sleep(0.05) - continue - - procs.pop() - - def reload(self) -> None: - self.log("broker", "reloading") - for _, proc in enumerate(self.procs): - proc.q_pend.put((0, "reload", [])) - - def collector(self, proc: MProcess) -> None: - """receive message from hub in other process""" - while True: - msg = proc.q_yield.get() - retq_id, dest, args = msg - - if dest == "log": - self.log(*args) - - elif dest == "retq": - # response from previous ipc call - raise Exception("invalid broker_mp usage") - - else: - # new ipc invoking managed service in hub - try: - obj = self.hub - for node in dest.split("."): - obj = getattr(obj, node) - - # TODO will deadlock if dest performs another ipc - rv = try_exec(retq_id, obj, *args) - except: - rv = ["exception", "stack", traceback.format_exc()] - - if retq_id: - proc.q_pend.put((retq_id, "retq", rv)) - - def ask(self, dest: str, *args: Any) -> ExceptionalQueue: - - # new non-ipc invoking managed service in hub - obj = self.hub - for node in dest.split("."): - obj = getattr(obj, node) - - rv = try_exec(True, obj, *args) - - retq = ExceptionalQueue(1) - retq.put(rv) - return retq - - def say(self, dest: str, *args: Any) -> None: - """ - send message to non-hub component in other process, - returns a Queue object which eventually contains the response if want_retval - (not-impl here since nothing uses it yet) - """ - if dest == "listen": - for p in self.procs: - p.q_pend.put((0, dest, [args[0], len(self.procs)])) - - elif dest == "set_netdevs": - for p in self.procs: - p.q_pend.put((0, dest, list(args))) - - elif dest == "cb_httpsrv_up": - self.hub.cb_httpsrv_up() - - else: - raise Exception("what is " + str(dest)) diff --git a/copyparty/broker_mpw.py b/copyparty/broker_mpw.py deleted file mode 100644 index e74c4547..00000000 --- a/copyparty/broker_mpw.py +++ /dev/null @@ -1,123 +0,0 @@ -# coding: utf-8 -from __future__ import print_function, unicode_literals - -import argparse -import os -import signal -import sys -import threading - -import queue - -from .__init__ import ANYWIN -from .authsrv import AuthSrv -from .broker_util import BrokerCli, ExceptionalQueue -from .httpsrv import HttpSrv -from .util import FAKE_MP, Daemon, HMaccas - -if True: # pylint: disable=using-constant-test - from types import FrameType - - from typing import Any, Optional, Union - - -class MpWorker(BrokerCli): - """one single mp instance""" - - def __init__( - self, - q_pend: queue.Queue[tuple[int, str, list[Any]]], - q_yield: queue.Queue[tuple[int, str, list[Any]]], - args: argparse.Namespace, - n: int, - ) -> None: - super(MpWorker, self).__init__() - - self.q_pend = q_pend - self.q_yield = q_yield - self.args = args - self.n = n - - self.log = self._log_disabled if args.q and not args.lo else self._log_enabled - - self.retpend: dict[int, Any] = {} - self.retpend_mutex = threading.Lock() - self.mutex = threading.Lock() - - # we inherited signal_handler from parent, - # replace it with something harmless - if not FAKE_MP: - sigs = [signal.SIGINT, signal.SIGTERM] - if not ANYWIN: - sigs.append(signal.SIGUSR1) - - for sig in sigs: - signal.signal(sig, self.signal_handler) - - # starting to look like a good idea - self.asrv = AuthSrv(args, None, False) - - # instantiate all services here (TODO: inheritance?) - self.iphash = HMaccas(os.path.join(self.args.E.cfg, "iphash"), 8) - self.httpsrv = HttpSrv(self, n) - - # on winxp and some other platforms, - # use thr.join() to block all signals - Daemon(self.main, "mpw-main").join() - - def signal_handler(self, sig: Optional[int], frame: Optional[FrameType]) -> None: - # print('k') - pass - - def _log_enabled(self, src: str, msg: str, c: Union[int, str] = 0) -> None: - self.q_yield.put((0, "log", [src, msg, c])) - - def _log_disabled(self, src: str, msg: str, c: Union[int, str] = 0) -> None: - pass - - def logw(self, msg: str, c: Union[int, str] = 0) -> None: - self.log("mp%d" % (self.n,), msg, c) - - def main(self) -> None: - while True: - retq_id, dest, args = self.q_pend.get() - - # self.logw("work: [{}]".format(d[0])) - if dest == "shutdown": - self.httpsrv.shutdown() - self.logw("ok bye") - sys.exit(0) - return - - elif dest == "reload": - self.logw("mpw.asrv reloading") - self.asrv.reload() - self.logw("mpw.asrv reloaded") - - elif dest == "listen": - self.httpsrv.listen(args[0], args[1]) - - elif dest == "set_netdevs": - self.httpsrv.set_netdevs(args[0]) - - elif dest == "retq": - # response from previous ipc call - with self.retpend_mutex: - retq = self.retpend.pop(retq_id) - - retq.put(args) - - else: - raise Exception("what is " + str(dest)) - - def ask(self, dest: str, *args: Any) -> ExceptionalQueue: - retq = ExceptionalQueue(1) - retq_id = id(retq) - with self.retpend_mutex: - self.retpend[retq_id] = retq - - self.q_yield.put((retq_id, dest, list(args))) - return retq - - def say(self, dest: str, *args: Any) -> None: - self.q_yield.put((0, dest, list(args))) diff --git a/copyparty/broker_thr.py b/copyparty/broker_thr.py deleted file mode 100644 index e40cd38d..00000000 --- a/copyparty/broker_thr.py +++ /dev/null @@ -1,73 +0,0 @@ -# coding: utf-8 -from __future__ import print_function, unicode_literals - -import os -import threading - -from .__init__ import TYPE_CHECKING -from .broker_util import BrokerCli, ExceptionalQueue, try_exec -from .httpsrv import HttpSrv -from .util import HMaccas - -if TYPE_CHECKING: - from .svchub import SvcHub - -if True: # pylint: disable=using-constant-test - from typing import Any - - -class BrokerThr(BrokerCli): - """external api; behaves like BrokerMP but using plain threads""" - - def __init__(self, hub: "SvcHub") -> None: - super(BrokerThr, self).__init__() - - self.hub = hub - self.log = hub.log - self.args = hub.args - self.asrv = hub.asrv - - self.mutex = threading.Lock() - self.num_workers = 1 - - # instantiate all services here (TODO: inheritance?) - self.iphash = HMaccas(os.path.join(self.args.E.cfg, "iphash"), 8) - self.httpsrv = HttpSrv(self, None) - self.reload = self.noop - - def shutdown(self) -> None: - # self.log("broker", "shutting down") - self.httpsrv.shutdown() - - def noop(self) -> None: - pass - - def ask(self, dest: str, *args: Any) -> ExceptionalQueue: - - # new ipc invoking managed service in hub - obj = self.hub - for node in dest.split("."): - obj = getattr(obj, node) - - rv = try_exec(True, obj, *args) - - # pretend we're broker_mp - retq = ExceptionalQueue(1) - retq.put(rv) - return retq - - def say(self, dest: str, *args: Any) -> None: - if dest == "listen": - self.httpsrv.listen(args[0], 1) - return - - if dest == "set_netdevs": - self.httpsrv.set_netdevs(args[0]) - return - - # new ipc invoking managed service in hub - obj = self.hub - for node in dest.split("."): - obj = getattr(obj, node) - - try_exec(False, obj, *args) diff --git a/copyparty/broker_util.py b/copyparty/broker_util.py deleted file mode 100644 index 105ac535..00000000 --- a/copyparty/broker_util.py +++ /dev/null @@ -1,72 +0,0 @@ -# coding: utf-8 -from __future__ import print_function, unicode_literals - -import argparse -import traceback - -from queue import Queue - -from .__init__ import TYPE_CHECKING -from .authsrv import AuthSrv -from .util import HMaccas, Pebkac - -if True: # pylint: disable=using-constant-test - from typing import Any, Optional, Union - - from .util import RootLogger - -if TYPE_CHECKING: - from .httpsrv import HttpSrv - - -class ExceptionalQueue(Queue, object): - def get(self, block: bool = True, timeout: Optional[float] = None) -> Any: - rv = super(ExceptionalQueue, self).get(block, timeout) - - if isinstance(rv, list): - if rv[0] == "exception": - if rv[1] == "pebkac": - raise Pebkac(*rv[2:]) - else: - raise Exception(rv[2]) - - return rv - - -class BrokerCli(object): - """ - helps mypy understand httpsrv.broker but still fails a few levels deeper, - for example resolving httpconn.* in httpcli -- see lines tagged #mypy404 - """ - - log: "RootLogger" - args: argparse.Namespace - asrv: AuthSrv - httpsrv: "HttpSrv" - iphash: HMaccas - - def __init__(self) -> None: - pass - - def ask(self, dest: str, *args: Any) -> ExceptionalQueue: - return ExceptionalQueue(1) - - def say(self, dest: str, *args: Any) -> None: - pass - - -def try_exec(want_retval: Union[bool, int], func: Any, *args: list[Any]) -> Any: - try: - return func(*args) - - except Pebkac as ex: - if not want_retval: - raise - - return ["exception", "pebkac", ex.code, str(ex)] - - except: - if not want_retval: - raise - - return ["exception", "stack", traceback.format_exc()] diff --git a/copyparty/ftpd.py b/copyparty/ftpd.py index a918b234..0006f52f 100644 --- a/copyparty/ftpd.py +++ b/copyparty/ftpd.py @@ -88,12 +88,8 @@ class FtpAuth(DummyAuthorizer): if bonk: logging.warning("client banned: invalid passwords") bans[ip] = bonk - try: - # only possible if multiprocessing disabled - self.hub.broker.httpsrv.bans[ip] = bonk # type: ignore - self.hub.broker.httpsrv.nban += 1 # type: ignore - except: - pass + self.hub.httpsrv.bans[ip] = bonk + self.hub.httpsrv.nban += 1 raise AuthenticationFailed("Authentication failed.") diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index a1a5088a..ba3f3239 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -115,6 +115,7 @@ class HttpCli(object): self.t0 = time.time() self.conn = conn + self.hub = conn.hsrv.hub self.u2mutex = conn.u2mutex # mypy404 self.s = conn.s self.sr = conn.sr @@ -475,7 +476,7 @@ class HttpCli(object): ) or self.args.idp_h_key in self.headers if trusted_key and trusted_xff: - self.asrv.idp_checkin(self.conn.hsrv.broker, idp_usr, idp_grp) + self.asrv.idp_checkin(self.hub, idp_usr, idp_grp) else: if not trusted_key: t = 'the idp-h-key header ("%s") is not present in the request; will NOT trust the other headers saying that the client\'s username is "%s" and group is "%s"' @@ -626,7 +627,7 @@ class HttpCli(object): msg += "hint: %s\r\n" % (self.hint,) if "database is locked" in em: - self.conn.hsrv.broker.say("log_stacks") + self.hub.log_stacks() msg += "hint: important info in the server log\r\n" zb = b"
" + html_escape(msg).encode("utf-8", "replace") @@ -1629,9 +1630,7 @@ class HttpCli(object): lim = vfs.get_dbv(rem)[0].lim fdir = vfs.canonical(rem) if lim: - fdir, rem = lim.all( - self.ip, rem, remains, vfs.realpath, fdir, self.conn.hsrv.broker - ) + fdir, rem = lim.all(self.ip, rem, remains, vfs.realpath, fdir, self.hub) fn = None if rem and not self.trailing_slash and not bos.path.isdir(fdir): @@ -1764,7 +1763,7 @@ class HttpCli(object): lim.bup(self.ip, post_sz) try: lim.chk_sz(post_sz) - lim.chk_vsz(self.conn.hsrv.broker, vfs.realpath, post_sz) + lim.chk_vsz(self.hub, vfs.realpath, post_sz) except: wunlink(self.log, path, vfs.flags) raise @@ -1823,8 +1822,7 @@ class HttpCli(object): raise Pebkac(403, t) vfs, rem = vfs.get_dbv(rem) - self.conn.hsrv.broker.say( - "up2k.hash_file", + self.hub.up2k.hash_file( vfs.realpath, vfs.vpath, vfs.flags, @@ -2049,8 +2047,7 @@ class HttpCli(object): # not to protect u2fh, but to prevent handshakes while files are closing with self.u2mutex: - x = self.conn.hsrv.broker.ask("up2k.handle_json", body, self.u2fh.aps) - ret = x.get() + ret = self.hub.up2k.handle_json(body, self.u2fh.aps) if self.is_vproxied: if "purl" in ret: @@ -2138,7 +2135,7 @@ class HttpCli(object): vfs, _ = self.asrv.vfs.get(self.vpath, self.uname, False, True) ptop = (vfs.dbv or vfs).realpath - x = self.conn.hsrv.broker.ask("up2k.handle_chunk", ptop, wark, chash) + x = self.hub.up2k.handle_chunk(ptop, wark, chash) response = x.get() chunksize, cstart, path, lastmod, sprs = response @@ -2207,11 +2204,9 @@ class HttpCli(object): f.close() raise finally: - x = self.conn.hsrv.broker.ask("up2k.release_chunk", ptop, wark, chash) - x.get() # block client until released + self.hub.up2k.release_chunk(ptop, wark, chash) - x = self.conn.hsrv.broker.ask("up2k.confirm_chunk", ptop, wark, chash) - ztis = x.get() + ztis = self.hub.up2k.confirm_chunk(ptop, wark, chash) try: num_left, fin_path = ztis except: @@ -2223,9 +2218,7 @@ class HttpCli(object): self.u2fh.close(path) if not num_left and not self.args.nw: - self.conn.hsrv.broker.ask( - "up2k.finish_upload", ptop, wark, self.u2fh.aps - ).get() + self.hub.up2k.finish_upload(ptop, wark, self.u2fh.aps) cinf = self.headers.get("x-up2k-stat", "") @@ -2403,7 +2396,7 @@ class HttpCli(object): fdir_base = vfs.canonical(rem) if lim: fdir_base, rem = lim.all( - self.ip, rem, -1, vfs.realpath, fdir_base, self.conn.hsrv.broker + self.ip, rem, -1, vfs.realpath, fdir_base, self.hub ) upload_vpath = "{}/{}".format(vfs.vpath, rem).strip("/") if not nullwrite: @@ -2511,7 +2504,7 @@ class HttpCli(object): try: lim.chk_df(tabspath, sz, True) lim.chk_sz(sz) - lim.chk_vsz(self.conn.hsrv.broker, vfs.realpath, sz) + lim.chk_vsz(self.hub, vfs.realpath, sz) lim.chk_bup(self.ip) lim.chk_nup(self.ip) except: @@ -2549,8 +2542,7 @@ class HttpCli(object): raise Pebkac(403, t) dbv, vrem = vfs.get_dbv(rem) - self.conn.hsrv.broker.say( - "up2k.hash_file", + self.hub.up2k.hash_file( dbv.realpath, vfs.vpath, dbv.flags, @@ -2697,7 +2689,7 @@ class HttpCli(object): fp = vfs.canonical(rp) lim = vfs.get_dbv(rem)[0].lim if lim: - fp, rp = lim.all(self.ip, rp, clen, vfs.realpath, fp, self.conn.hsrv.broker) + fp, rp = lim.all(self.ip, rp, clen, vfs.realpath, fp, self.hub) bos.makedirs(fp) fp = os.path.join(fp, fn) @@ -2799,7 +2791,7 @@ class HttpCli(object): lim.bup(self.ip, sz) try: lim.chk_sz(sz) - lim.chk_vsz(self.conn.hsrv.broker, vfs.realpath, sz) + lim.chk_vsz(self.hub, vfs.realpath, sz) except: wunlink(self.log, fp, vfs.flags) raise @@ -2828,8 +2820,7 @@ class HttpCli(object): raise Pebkac(403, t) vfs, rem = vfs.get_dbv(rem) - self.conn.hsrv.broker.say( - "up2k.hash_file", + self.hub.up2k.hash_file( vfs.realpath, vfs.vpath, vfs.flags, @@ -3363,8 +3354,8 @@ class HttpCli(object): ] if self.avol and not self.args.no_rescan: - x = self.conn.hsrv.broker.ask("up2k.get_state") - vs = json.loads(x.get()) + zs = self.hub.up2k.get_state() + vs = json.loads(zs) vstate = {("/" + k).rstrip("/") + "/": v for k, v in vs["volstate"].items()} else: vstate = {} @@ -3508,10 +3499,8 @@ class HttpCli(object): vn, _ = self.asrv.vfs.get(self.vpath, self.uname, True, True) - args = [self.asrv.vfs.all_vols, [vn.vpath], False, True] + err = self.hub.up2k.rescan(self.asrv.vfs.all_vols, [vn.vpath], False, True) - x = self.conn.hsrv.broker.ask("up2k.rescan", *args) - err = x.get() if not err: self.redirect("", "?h") return True @@ -3529,8 +3518,8 @@ class HttpCli(object): if self.args.no_reload: raise Pebkac(403, "the reload feature is disabled in server config") - x = self.conn.hsrv.broker.ask("reload") - return self.redirect("", "?h", x.get(), "return to", False) + zs = self.hub.reload() + return self.redirect("", "?h", zs, "return to", False) def tx_stack(self) -> bool: if not self.avol and not [x for x in self.wvol if x in self.rvol]: @@ -3632,10 +3621,7 @@ class HttpCli(object): and (self.uname in vol.axs.uread or self.uname in vol.axs.upget) } - x = self.conn.hsrv.broker.ask( - "up2k.get_unfinished_by_user", self.uname, self.ip - ) - uret = x.get() + uret = self.hub.up2k.get_unfinished_by_user(self.uname, self.ip) if not self.args.unpost: allvols = [] @@ -3721,10 +3707,8 @@ class HttpCli(object): nlim = int(self.uparam.get("lim") or 0) lim = [nlim, nlim] if nlim else [] - x = self.conn.hsrv.broker.ask( - "up2k.handle_rm", self.uname, self.ip, req, lim, False, unpost - ) - self.loud_reply(x.get()) + zs = self.hub.up2k.handle_rm(self.uname, self.ip, req, lim, False, unpost) + self.loud_reply(zs) return True def handle_mv(self) -> bool: @@ -3746,8 +3730,8 @@ class HttpCli(object): if self.args.no_mv: raise Pebkac(403, "the rename/move feature is disabled in server config") - x = self.conn.hsrv.broker.ask("up2k.handle_mv", self.uname, vsrc, vdst) - self.loud_reply(x.get(), status=201) + zs = self.hub.up2k.handle_mv(self.uname, vsrc, vdst) + self.loud_reply(zs, status=201) return True def tx_ls(self, ls: dict[str, Any]) -> bool: diff --git a/copyparty/httpconn.py b/copyparty/httpconn.py index 3e40f55a..3b404e95 100644 --- a/copyparty/httpconn.py +++ b/copyparty/httpconn.py @@ -58,7 +58,7 @@ class HttpConn(object): self.ipa_nm: Optional[NetMap] = hsrv.ipa_nm self.xff_nm: Optional[NetMap] = hsrv.xff_nm self.xff_lan: NetMap = hsrv.xff_lan # type: ignore - self.iphash: HMaccas = hsrv.broker.iphash + self.iphash: HMaccas = hsrv.hub.iphash self.bans: dict[str, int] = hsrv.bans self.aclose: dict[str, int] = hsrv.aclose diff --git a/copyparty/httpsrv.py b/copyparty/httpsrv.py index 1c6a8cca..3b0ec427 100644 --- a/copyparty/httpsrv.py +++ b/copyparty/httpsrv.py @@ -77,7 +77,7 @@ from .util import ( ) if TYPE_CHECKING: - from .broker_util import BrokerCli + from .svchub import SvcHub from .ssdp import SSDPr if True: # pylint: disable=using-constant-test @@ -90,16 +90,13 @@ class HttpSrv(object): relying on MpSrv for performance (HttpSrv is just plain threads) """ - def __init__(self, broker: "BrokerCli", nid: Optional[int]) -> None: - self.broker = broker + def __init__(self, hub: "SvcHub", nid: Optional[int]) -> None: + self.hub = hub self.nid = nid - self.args = broker.args + self.args = hub.args self.E: EnvParams = self.args.E - self.log = broker.log - self.asrv = broker.asrv - - # redefine in case of multiprocessing - socket.setdefaulttimeout(120) + self.log = hub.log + self.asrv = hub.asrv self.t0 = time.time() nsuf = "-n{}-i{:x}".format(nid, os.getpid()) if nid else "" @@ -169,7 +166,7 @@ class HttpSrv(object): if self.args.zs: from .ssdp import SSDPr - self.ssdp = SSDPr(broker) + self.ssdp = SSDPr(hub) if self.tp_q: self.start_threads(4) @@ -186,8 +183,7 @@ class HttpSrv(object): def post_init(self) -> None: try: - x = self.broker.ask("thumbsrv.getcfg") - self.th_cfg = x.get() + self.th_cfg = self.hub.thumbsrv.getcfg() except: pass @@ -237,19 +233,11 @@ class HttpSrv(object): self.t_periodic = None return - def listen(self, sck: socket.socket, nlisteners: int) -> None: - if self.args.j != 1: - # lost in the pickle; redefine - if not ANYWIN or self.args.reuseaddr: - sck.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - sck.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - sck.settimeout(None) # < does not inherit, ^ opts above do - + def listen(self, sck: socket.socket) -> None: ip, port = sck.getsockname()[:2] self.srvs.append(sck) self.bound.add((ip, port)) - self.nclimax = math.ceil(self.args.nc * 1.0 / nlisteners) + self.nclimax = self.args.nc Daemon( self.thr_listen, "httpsrv-n{}-listen-{}-{}".format(self.nid or "0", ip, port), @@ -265,7 +253,7 @@ class HttpSrv(object): self.log(self.name, msg) def fun() -> None: - self.broker.say("cb_httpsrv_up") + self.hub.cb_httpsrv_up() threading.Thread(target=fun, name="sig-hsrv-up1").start() diff --git a/copyparty/metrics.py b/copyparty/metrics.py index 3af8be9d..41e53bc1 100644 --- a/copyparty/metrics.py +++ b/copyparty/metrics.py @@ -15,6 +15,7 @@ if TYPE_CHECKING: class Metrics(object): def __init__(self, hsrv: "HttpSrv") -> None: self.hsrv = hsrv + self.hub = hsrv.hub def tx(self, cli: "HttpCli") -> bool: if not cli.avol: @@ -88,8 +89,8 @@ class Metrics(object): addg("cpp_total_bans", str(self.hsrv.nban), t) if not args.nos_vst: - x = self.hsrv.broker.ask("up2k.get_state") - vs = json.loads(x.get()) + zs = self.hub.up2k.get_state() + vs = json.loads(zs.get()) nvidle = 0 nvbusy = 0 @@ -146,8 +147,7 @@ class Metrics(object): volsizes = [] try: ptops = [x.realpath for _, x in allvols] - x = self.hsrv.broker.ask("up2k.get_volsizes", ptops) - volsizes = x.get() + volsizes = self.hub.up2k.get_volsizes(ptops) except Exception as ex: cli.log("tx_stats get_volsizes: {!r}".format(ex), 3) @@ -204,8 +204,7 @@ class Metrics(object): tnbytes = 0 tnfiles = 0 try: - x = self.hsrv.broker.ask("up2k.get_unfinished") - xs = x.get() + xs = self.hub.up2k.get_unfinished() if not xs: raise Exception("up2k mutex acquisition timed out") diff --git a/copyparty/ssdp.py b/copyparty/ssdp.py index 3464a50a..754f6d9c 100644 --- a/copyparty/ssdp.py +++ b/copyparty/ssdp.py @@ -12,7 +12,6 @@ from .multicast import MC_Sck, MCast from .util import CachedSet, html_escape, min_ex if TYPE_CHECKING: - from .broker_util import BrokerCli from .httpcli import HttpCli from .svchub import SvcHub @@ -32,9 +31,9 @@ class SSDP_Sck(MC_Sck): class SSDPr(object): """generates http responses for httpcli""" - def __init__(self, broker: "BrokerCli") -> None: - self.broker = broker - self.args = broker.args + def __init__(self, hub: "SvcHub") -> None: + self.hub = hub + self.args = hub.args def reply(self, hc: "HttpCli") -> bool: if hc.vpath.endswith("device.xml"): diff --git a/copyparty/svchub.py b/copyparty/svchub.py index 028dcab9..1f5ef089 100644 --- a/copyparty/svchub.py +++ b/copyparty/svchub.py @@ -28,9 +28,10 @@ if True: # pylint: disable=using-constant-test import typing from typing import Any, Optional, Union -from .__init__ import ANYWIN, EXE, MACOS, TYPE_CHECKING, E, EnvParams, unicode +from .__init__ import ANYWIN, EXE, TYPE_CHECKING, E, EnvParams, unicode from .authsrv import BAD_CFG, AuthSrv from .cert import ensure_cert +from .httpsrv import HttpSrv from .mtag import HAVE_FFMPEG, HAVE_FFPROBE from .tcpsrv import TcpSrv from .th_srv import HAVE_PIL, HAVE_VIPS, HAVE_WEBP, ThumbSrv @@ -51,7 +52,6 @@ from .util import ( ansi_re, build_netmap, min_ex, - mp, odfusion, pybin, start_log_thrs, @@ -67,16 +67,6 @@ if TYPE_CHECKING: class SvcHub(object): - """ - Hosts all services which cannot be parallelized due to reliance on monolithic resources. - Creates a Broker which does most of the heavy stuff; hosted services can use this to perform work: - hub.broker.(destination, args_list). - - Either BrokerThr (plain threads) or BrokerMP (multiprocessing) is used depending on configuration. - Nothing is returned synchronously; if you want any value returned from the call, - put() can return a queue (if want_reply=True) which has a blocking get() with the response. - """ - def __init__( self, args: argparse.Namespace, @@ -163,16 +153,6 @@ class SvcHub(object): if args.log_thrs: start_log_thrs(self.log, args.log_thrs, 0) - if not args.use_fpool and args.j != 1: - args.no_fpool = True - t = "multithreading enabled with -j {}, so disabling fpool -- this can reduce upload performance on some filesystems" - self.log("root", t.format(args.j)) - - if not args.no_fpool and args.j != 1: - t = "WARNING: ignoring --use-fpool because multithreading (-j{}) is enabled" - self.log("root", t.format(args.j), c=3) - args.no_fpool = True - for name, arg in ( ("iobuf", "iobuf"), ("s-rd-sz", "s_rd_sz"), @@ -316,13 +296,7 @@ class SvcHub(object): self.mdns: Optional["MDNS"] = None self.ssdp: Optional["SSDPd"] = None - # decide which worker impl to use - if self.check_mp_enable(): - from .broker_mp import BrokerMp as Broker - else: - from .broker_thr import BrokerThr as Broker # type: ignore - - self.broker = Broker(self) + self.httpsrv = HttpSrv(self, None) def start_ftpd(self) -> None: time.sleep(30) @@ -361,15 +335,14 @@ class SvcHub(object): def thr_httpsrv_up(self) -> None: time.sleep(1 if self.args.ign_ebind_all else 5) - expected = self.broker.num_workers * self.tcpsrv.nsrv + expected = self.tcpsrv.nsrv failed = expected - self.httpsrv_up if not failed: return if self.args.ign_ebind_all: if not self.tcpsrv.srv: - for _ in range(self.broker.num_workers): - self.broker.say("cb_httpsrv_up") + self.cb_httpsrv_up() return if self.args.ign_ebind and self.tcpsrv.srv: @@ -387,8 +360,6 @@ class SvcHub(object): def cb_httpsrv_up(self) -> None: self.httpsrv_up += 1 - if self.httpsrv_up != self.broker.num_workers: - return ar = self.args for _ in range(10 if ar.ftp or ar.ftps else 0): @@ -723,7 +694,6 @@ class SvcHub(object): self.log("root", "reloading config") self.asrv.reload() self.up2k.reload(rescan_all_vols) - self.broker.reload() self.reloading = 0 def _reload_blocking(self, rescan_all_vols: bool = True) -> None: @@ -808,7 +778,7 @@ class SvcHub(object): tasks.append(Daemon(self.ssdp.stop, "ssdp")) slp = time.time() + 0.5 - self.broker.shutdown() + self.httpsrv.shutdown() self.tcpsrv.shutdown() self.up2k.shutdown() @@ -970,48 +940,6 @@ class SvcHub(object): if ex.errno != errno.EPIPE: raise - def check_mp_support(self) -> str: - if MACOS: - return "multiprocessing is wonky on mac osx;" - elif sys.version_info < (3, 3): - return "need python 3.3 or newer for multiprocessing;" - - try: - x: mp.Queue[tuple[str, str]] = mp.Queue(1) - x.put(("foo", "bar")) - if x.get()[0] != "foo": - raise Exception() - except: - return "multiprocessing is not supported on your platform;" - - return "" - - def check_mp_enable(self) -> bool: - if self.args.j == 1: - return False - - try: - if mp.cpu_count() <= 1: - raise Exception() - except: - self.log("svchub", "only one CPU detected; multiprocessing disabled") - return False - - try: - # support vscode debugger (bonus: same behavior as on windows) - mp.set_start_method("spawn", True) - except AttributeError: - # py2.7 probably, anyways dontcare - pass - - err = self.check_mp_support() - if not err: - return True - else: - self.log("svchub", err) - self.log("svchub", "cannot efficiently use multiple CPU cores") - return False - def sd_notify(self) -> None: try: zb = os.getenv("NOTIFY_SOCKET") diff --git a/copyparty/tcpsrv.py b/copyparty/tcpsrv.py index 4bbea2c9..fa47dccd 100644 --- a/copyparty/tcpsrv.py +++ b/copyparty/tcpsrv.py @@ -297,7 +297,7 @@ class TcpSrv(object): if self.args.q: print(msg) - self.hub.broker.say("listen", srv) + self.hub.httpsrv.listen(srv) self.srv = srvs self.bound = bound @@ -305,7 +305,7 @@ class TcpSrv(object): self._distribute_netdevs() def _distribute_netdevs(self): - self.hub.broker.say("set_netdevs", self.netdevs) + self.hub.httpsrv.set_netdevs(self.netdevs) self.hub.start_zeroconf() gencert(self.log, self.args, self.netdevs) self.hub.restart_ftpd() diff --git a/copyparty/th_cli.py b/copyparty/th_cli.py index 9cfef9aa..6fc3c167 100644 --- a/copyparty/th_cli.py +++ b/copyparty/th_cli.py @@ -7,7 +7,6 @@ from .__init__ import TYPE_CHECKING from .authsrv import VFS from .bos import bos from .th_srv import HAVE_WEBP, thumb_path -from .util import Cooldown if True: # pylint: disable=using-constant-test from typing import Optional, Union @@ -18,14 +17,11 @@ if TYPE_CHECKING: class ThumbCli(object): def __init__(self, hsrv: "HttpSrv") -> None: - self.broker = hsrv.broker + self.hub = hsrv.hub self.log_func = hsrv.log self.args = hsrv.args self.asrv = hsrv.asrv - # cache on both sides for less broker spam - self.cooldown = Cooldown(self.args.th_poke) - try: c = hsrv.th_cfg if not c: @@ -134,13 +130,11 @@ class ThumbCli(object): if ret: tdir = os.path.dirname(tpath) - if self.cooldown.poke(tdir): - self.broker.say("thumbsrv.poke", tdir) + self.hub.thumbsrv.poke(tdir) if want_opus: # audio files expire individually - if self.cooldown.poke(tpath): - self.broker.say("thumbsrv.poke", tpath) + self.hub.thumbsrv.poke(tpath) return ret @@ -150,5 +144,4 @@ class ThumbCli(object): if not bos.path.getsize(os.path.join(ptop, rem)): return None - x = self.broker.ask("thumbsrv.get", ptop, rem, mtime, fmt) - return x.get() # type: ignore + return self.hub.thumbsrv.get(ptop, rem, mtime, fmt) diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 39f386a0..10199f41 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -2745,9 +2745,9 @@ class Up2k(object): cj["size"], cj["ptop"], ap1, - self.hub.broker, + self.hub, reg, - "up2k._get_volsize", + "_get_volsize", ) bos.makedirs(ap2) vfs.lim.nup(cj["addr"]) diff --git a/scripts/pyinstaller/build.sh b/scripts/pyinstaller/build.sh index adf94c98..602cd702 100644 --- a/scripts/pyinstaller/build.sh +++ b/scripts/pyinstaller/build.sh @@ -69,8 +69,6 @@ sed -ri s/copyparty.exe/copyparty$esuf.exe/ loader.rc2 excl=( asyncio - copyparty.broker_mp - copyparty.broker_mpw copyparty.smbd ctypes.macholib curses diff --git a/scripts/sfx.ls b/scripts/sfx.ls index 407137be..c88d3af4 100644 --- a/scripts/sfx.ls +++ b/scripts/sfx.ls @@ -7,10 +7,6 @@ copyparty/bos, copyparty/bos/__init__.py, copyparty/bos/bos.py, copyparty/bos/path.py, -copyparty/broker_mp.py, -copyparty/broker_mpw.py, -copyparty/broker_thr.py, -copyparty/broker_util.py, copyparty/cert.py, copyparty/cfg.py, copyparty/dxml.py, diff --git a/tests/util.py b/tests/util.py index 415ac5a0..bbad57b4 100644 --- a/tests/util.py +++ b/tests/util.py @@ -170,12 +170,14 @@ class Cfg(Namespace): ) -class NullBroker(object): - def say(self, *args): +class NullUp2k(object): + def hash_file(*a): pass - def ask(self, *args): - pass + +class NullHub(object): + def __init__(self): + self.up2k = NullUp2k() class VSock(object): @@ -206,7 +208,7 @@ class VHttpSrv(object): self.asrv = asrv self.log = log - self.broker = NullBroker() + self.hub = NullHub() self.prism = None self.bans = {} self.nreq = 0