nogil: remove -j (multiprocessing option)

in case cpython's free-threading / nogil performance proves to
be good enough that the multiprocessing option can be removed,
this is roughly how you'd do it  (not everything's been tested)

but while you'd expect this change to improve performance,
it somehow doesn't, not even measurably so

as the performance gain is negligible, the only win here is
simplifying the code, and maybe less chance of bugs in the future

as a result, this probably won't get merged anytime soon
(if at all), and would probably warrant bumping to v2
This commit is contained in:
ed 2024-03-24 21:53:28 +00:00
parent cb99fbf442
commit 7781e0529d
19 changed files with 81 additions and 613 deletions

View file

@ -841,7 +841,6 @@ def add_general(ap, nc, srvname):
ap2 = ap.add_argument_group('general options')
ap2.add_argument("-c", metavar="PATH", type=u, action="append", help="add config file")
ap2.add_argument("-nc", metavar="NUM", type=int, default=nc, help="max num clients")
ap2.add_argument("-j", metavar="CORES", type=int, default=1, help="max num cpu cores, 0=all")
ap2.add_argument("-a", metavar="ACCT", type=u, action="append", help="add account, \033[33mUSER\033[0m:\033[33mPASS\033[0m; example [\033[32med:wark\033[0m]")
ap2.add_argument("-v", metavar="VOL", type=u, action="append", help="add volume, \033[33mSRC\033[0m:\033[33mDST\033[0m:\033[33mFLAG\033[0m; examples [\033[32m.::r\033[0m], [\033[32m/mnt/nas/music:/music:r:aed\033[0m], see --help-accounts")
ap2.add_argument("--grp", metavar="G:N,N", type=u, action="append", help="add group, \033[33mNAME\033[0m:\033[33mUSER1\033[0m,\033[33mUSER2\033[0m,\033[33m...\033[0m; example [\033[32madmins:ed,foo,bar\033[0m]")
@ -881,7 +880,7 @@ def add_upload(ap):
ap2.add_argument("--blank-wt", metavar="SEC", type=int, default=300, help="file write grace period (any client can write to a blank file last-modified more recently than \033[33mSEC\033[0m seconds ago)")
ap2.add_argument("--reg-cap", metavar="N", type=int, default=38400, help="max number of uploads to keep in memory when running without \033[33m-e2d\033[0m; roughly 1 MiB RAM per 600")
ap2.add_argument("--no-fpool", action="store_true", help="disable file-handle pooling -- instead, repeatedly close and reopen files during upload (bad idea to enable this on windows and/or cow filesystems)")
ap2.add_argument("--use-fpool", action="store_true", help="force file-handle pooling, even when it might be dangerous (multiprocessing, filesystems lacking sparse-files support, ...)")
ap2.add_argument("--use-fpool", action="store_true", help="force file-handle pooling, even when it might be dangerous (filesystems lacking sparse-files support, ...)")
ap2.add_argument("--hardlink", action="store_true", help="prefer hardlinks instead of symlinks when possible (within same filesystem) (volflag=hardlink)")
ap2.add_argument("--never-symlink", action="store_true", help="do not fallback to symlinks when a hardlink cannot be made (volflag=neversymlink)")
ap2.add_argument("--no-dedup", action="store_true", help="disable symlink/hardlink creation; copy file contents instead (volflag=copydupes)")

View file

@ -44,9 +44,7 @@ if True: # pylint: disable=using-constant-test
from .util import NamedLogger, RootLogger
if TYPE_CHECKING:
from .broker_mp import BrokerMp
from .broker_thr import BrokerThr
from .broker_util import BrokerCli
from .svchub import SvcHub
# Vflags: TypeAlias = dict[str, str | bool | float | list[str]]
# Vflags: TypeAlias = dict[str, Any]
@ -141,9 +139,9 @@ class Lim(object):
sz: int,
ptop: str,
abspath: str,
broker: Optional[Union["BrokerCli", "BrokerMp", "BrokerThr"]] = None,
hub: Optional["SvcHub"] = None,
reg: Optional[dict[str, dict[str, Any]]] = None,
volgetter: str = "up2k.get_volsize",
volgetter: str = "get_volsize",
) -> tuple[str, str]:
if reg is not None and self.reg is None:
self.reg = reg
@ -154,7 +152,7 @@ class Lim(object):
self.chk_rem(rem)
if sz != -1:
self.chk_sz(sz)
self.chk_vsz(broker, ptop, sz, volgetter)
self.chk_vsz(hub, ptop, sz, volgetter)
self.chk_df(abspath, sz) # side effects; keep last-ish
ap2, vp2 = self.rot(abspath)
@ -172,16 +170,15 @@ class Lim(object):
def chk_vsz(
self,
broker: Optional[Union["BrokerCli", "BrokerMp", "BrokerThr"]],
hub: Optional["SvcHub"],
ptop: str,
sz: int,
volgetter: str = "up2k.get_volsize",
) -> None:
if not broker or not self.vbmax + self.vnmax:
if not hub or not self.vbmax + self.vnmax:
return
x = broker.ask(volgetter, ptop)
nbytes, nfiles = x.get()
nbytes, nfiles = hub.up2k.getattr(volgetter)(ptop)
if self.vbmax and self.vbmax < nbytes + sz:
raise Pebkac(400, "volume has exceeded max size")
@ -815,9 +812,7 @@ class AuthSrv(object):
yield prev, True
def idp_checkin(
self, broker: Optional["BrokerCli"], uname: str, gname: str
) -> bool:
def idp_checkin(self, hub: Optional["SvcHub"], uname: str, gname: str) -> bool:
if uname in self.acct:
return False
@ -837,12 +832,12 @@ class AuthSrv(object):
t = "reinitializing due to new user from IdP: [%s:%s]"
self.log(t % (uname, gnames), 3)
if not broker:
if not hub:
# only true for tests
self._reload()
return True
broker.ask("_reload_blocking", False).get()
hub._reload_blocking(False)
return True
def _map_volume_idp(

View file

@ -1,141 +0,0 @@
# coding: utf-8
from __future__ import print_function, unicode_literals
import threading
import time
import traceback
import queue
from .__init__ import CORES, TYPE_CHECKING
from .broker_mpw import MpWorker
from .broker_util import ExceptionalQueue, try_exec
from .util import Daemon, mp
if TYPE_CHECKING:
from .svchub import SvcHub
if True: # pylint: disable=using-constant-test
from typing import Any
class MProcess(mp.Process):
def __init__(
self,
q_pend: queue.Queue[tuple[int, str, list[Any]]],
q_yield: queue.Queue[tuple[int, str, list[Any]]],
target: Any,
args: Any,
) -> None:
super(MProcess, self).__init__(target=target, args=args)
self.q_pend = q_pend
self.q_yield = q_yield
class BrokerMp(object):
"""external api; manages MpWorkers"""
def __init__(self, hub: "SvcHub") -> None:
self.hub = hub
self.log = hub.log
self.args = hub.args
self.procs = []
self.mutex = threading.Lock()
self.num_workers = self.args.j or CORES
self.log("broker", "booting {} subprocesses".format(self.num_workers))
for n in range(1, self.num_workers + 1):
q_pend: queue.Queue[tuple[int, str, list[Any]]] = mp.Queue(1) # type: ignore
q_yield: queue.Queue[tuple[int, str, list[Any]]] = mp.Queue(64) # type: ignore
proc = MProcess(q_pend, q_yield, MpWorker, (q_pend, q_yield, self.args, n))
Daemon(self.collector, "mp-sink-{}".format(n), (proc,))
self.procs.append(proc)
proc.start()
def shutdown(self) -> None:
self.log("broker", "shutting down")
for n, proc in enumerate(self.procs):
thr = threading.Thread(
target=proc.q_pend.put((0, "shutdown", [])),
name="mp-shutdown-{}-{}".format(n, len(self.procs)),
)
thr.start()
with self.mutex:
procs = self.procs
self.procs = []
while procs:
if procs[-1].is_alive():
time.sleep(0.05)
continue
procs.pop()
def reload(self) -> None:
self.log("broker", "reloading")
for _, proc in enumerate(self.procs):
proc.q_pend.put((0, "reload", []))
def collector(self, proc: MProcess) -> None:
"""receive message from hub in other process"""
while True:
msg = proc.q_yield.get()
retq_id, dest, args = msg
if dest == "log":
self.log(*args)
elif dest == "retq":
# response from previous ipc call
raise Exception("invalid broker_mp usage")
else:
# new ipc invoking managed service in hub
try:
obj = self.hub
for node in dest.split("."):
obj = getattr(obj, node)
# TODO will deadlock if dest performs another ipc
rv = try_exec(retq_id, obj, *args)
except:
rv = ["exception", "stack", traceback.format_exc()]
if retq_id:
proc.q_pend.put((retq_id, "retq", rv))
def ask(self, dest: str, *args: Any) -> ExceptionalQueue:
# new non-ipc invoking managed service in hub
obj = self.hub
for node in dest.split("."):
obj = getattr(obj, node)
rv = try_exec(True, obj, *args)
retq = ExceptionalQueue(1)
retq.put(rv)
return retq
def say(self, dest: str, *args: Any) -> None:
"""
send message to non-hub component in other process,
returns a Queue object which eventually contains the response if want_retval
(not-impl here since nothing uses it yet)
"""
if dest == "listen":
for p in self.procs:
p.q_pend.put((0, dest, [args[0], len(self.procs)]))
elif dest == "set_netdevs":
for p in self.procs:
p.q_pend.put((0, dest, list(args)))
elif dest == "cb_httpsrv_up":
self.hub.cb_httpsrv_up()
else:
raise Exception("what is " + str(dest))

View file

@ -1,123 +0,0 @@
# coding: utf-8
from __future__ import print_function, unicode_literals
import argparse
import os
import signal
import sys
import threading
import queue
from .__init__ import ANYWIN
from .authsrv import AuthSrv
from .broker_util import BrokerCli, ExceptionalQueue
from .httpsrv import HttpSrv
from .util import FAKE_MP, Daemon, HMaccas
if True: # pylint: disable=using-constant-test
from types import FrameType
from typing import Any, Optional, Union
class MpWorker(BrokerCli):
"""one single mp instance"""
def __init__(
self,
q_pend: queue.Queue[tuple[int, str, list[Any]]],
q_yield: queue.Queue[tuple[int, str, list[Any]]],
args: argparse.Namespace,
n: int,
) -> None:
super(MpWorker, self).__init__()
self.q_pend = q_pend
self.q_yield = q_yield
self.args = args
self.n = n
self.log = self._log_disabled if args.q and not args.lo else self._log_enabled
self.retpend: dict[int, Any] = {}
self.retpend_mutex = threading.Lock()
self.mutex = threading.Lock()
# we inherited signal_handler from parent,
# replace it with something harmless
if not FAKE_MP:
sigs = [signal.SIGINT, signal.SIGTERM]
if not ANYWIN:
sigs.append(signal.SIGUSR1)
for sig in sigs:
signal.signal(sig, self.signal_handler)
# starting to look like a good idea
self.asrv = AuthSrv(args, None, False)
# instantiate all services here (TODO: inheritance?)
self.iphash = HMaccas(os.path.join(self.args.E.cfg, "iphash"), 8)
self.httpsrv = HttpSrv(self, n)
# on winxp and some other platforms,
# use thr.join() to block all signals
Daemon(self.main, "mpw-main").join()
def signal_handler(self, sig: Optional[int], frame: Optional[FrameType]) -> None:
# print('k')
pass
def _log_enabled(self, src: str, msg: str, c: Union[int, str] = 0) -> None:
self.q_yield.put((0, "log", [src, msg, c]))
def _log_disabled(self, src: str, msg: str, c: Union[int, str] = 0) -> None:
pass
def logw(self, msg: str, c: Union[int, str] = 0) -> None:
self.log("mp%d" % (self.n,), msg, c)
def main(self) -> None:
while True:
retq_id, dest, args = self.q_pend.get()
# self.logw("work: [{}]".format(d[0]))
if dest == "shutdown":
self.httpsrv.shutdown()
self.logw("ok bye")
sys.exit(0)
return
elif dest == "reload":
self.logw("mpw.asrv reloading")
self.asrv.reload()
self.logw("mpw.asrv reloaded")
elif dest == "listen":
self.httpsrv.listen(args[0], args[1])
elif dest == "set_netdevs":
self.httpsrv.set_netdevs(args[0])
elif dest == "retq":
# response from previous ipc call
with self.retpend_mutex:
retq = self.retpend.pop(retq_id)
retq.put(args)
else:
raise Exception("what is " + str(dest))
def ask(self, dest: str, *args: Any) -> ExceptionalQueue:
retq = ExceptionalQueue(1)
retq_id = id(retq)
with self.retpend_mutex:
self.retpend[retq_id] = retq
self.q_yield.put((retq_id, dest, list(args)))
return retq
def say(self, dest: str, *args: Any) -> None:
self.q_yield.put((0, dest, list(args)))

View file

@ -1,73 +0,0 @@
# coding: utf-8
from __future__ import print_function, unicode_literals
import os
import threading
from .__init__ import TYPE_CHECKING
from .broker_util import BrokerCli, ExceptionalQueue, try_exec
from .httpsrv import HttpSrv
from .util import HMaccas
if TYPE_CHECKING:
from .svchub import SvcHub
if True: # pylint: disable=using-constant-test
from typing import Any
class BrokerThr(BrokerCli):
"""external api; behaves like BrokerMP but using plain threads"""
def __init__(self, hub: "SvcHub") -> None:
super(BrokerThr, self).__init__()
self.hub = hub
self.log = hub.log
self.args = hub.args
self.asrv = hub.asrv
self.mutex = threading.Lock()
self.num_workers = 1
# instantiate all services here (TODO: inheritance?)
self.iphash = HMaccas(os.path.join(self.args.E.cfg, "iphash"), 8)
self.httpsrv = HttpSrv(self, None)
self.reload = self.noop
def shutdown(self) -> None:
# self.log("broker", "shutting down")
self.httpsrv.shutdown()
def noop(self) -> None:
pass
def ask(self, dest: str, *args: Any) -> ExceptionalQueue:
# new ipc invoking managed service in hub
obj = self.hub
for node in dest.split("."):
obj = getattr(obj, node)
rv = try_exec(True, obj, *args)
# pretend we're broker_mp
retq = ExceptionalQueue(1)
retq.put(rv)
return retq
def say(self, dest: str, *args: Any) -> None:
if dest == "listen":
self.httpsrv.listen(args[0], 1)
return
if dest == "set_netdevs":
self.httpsrv.set_netdevs(args[0])
return
# new ipc invoking managed service in hub
obj = self.hub
for node in dest.split("."):
obj = getattr(obj, node)
try_exec(False, obj, *args)

View file

@ -1,72 +0,0 @@
# coding: utf-8
from __future__ import print_function, unicode_literals
import argparse
import traceback
from queue import Queue
from .__init__ import TYPE_CHECKING
from .authsrv import AuthSrv
from .util import HMaccas, Pebkac
if True: # pylint: disable=using-constant-test
from typing import Any, Optional, Union
from .util import RootLogger
if TYPE_CHECKING:
from .httpsrv import HttpSrv
class ExceptionalQueue(Queue, object):
def get(self, block: bool = True, timeout: Optional[float] = None) -> Any:
rv = super(ExceptionalQueue, self).get(block, timeout)
if isinstance(rv, list):
if rv[0] == "exception":
if rv[1] == "pebkac":
raise Pebkac(*rv[2:])
else:
raise Exception(rv[2])
return rv
class BrokerCli(object):
"""
helps mypy understand httpsrv.broker but still fails a few levels deeper,
for example resolving httpconn.* in httpcli -- see lines tagged #mypy404
"""
log: "RootLogger"
args: argparse.Namespace
asrv: AuthSrv
httpsrv: "HttpSrv"
iphash: HMaccas
def __init__(self) -> None:
pass
def ask(self, dest: str, *args: Any) -> ExceptionalQueue:
return ExceptionalQueue(1)
def say(self, dest: str, *args: Any) -> None:
pass
def try_exec(want_retval: Union[bool, int], func: Any, *args: list[Any]) -> Any:
try:
return func(*args)
except Pebkac as ex:
if not want_retval:
raise
return ["exception", "pebkac", ex.code, str(ex)]
except:
if not want_retval:
raise
return ["exception", "stack", traceback.format_exc()]

View file

@ -88,12 +88,8 @@ class FtpAuth(DummyAuthorizer):
if bonk:
logging.warning("client banned: invalid passwords")
bans[ip] = bonk
try:
# only possible if multiprocessing disabled
self.hub.broker.httpsrv.bans[ip] = bonk # type: ignore
self.hub.broker.httpsrv.nban += 1 # type: ignore
except:
pass
self.hub.httpsrv.bans[ip] = bonk
self.hub.httpsrv.nban += 1
raise AuthenticationFailed("Authentication failed.")

View file

@ -115,6 +115,7 @@ class HttpCli(object):
self.t0 = time.time()
self.conn = conn
self.hub = conn.hsrv.hub
self.u2mutex = conn.u2mutex # mypy404
self.s = conn.s
self.sr = conn.sr
@ -475,7 +476,7 @@ class HttpCli(object):
) or self.args.idp_h_key in self.headers
if trusted_key and trusted_xff:
self.asrv.idp_checkin(self.conn.hsrv.broker, idp_usr, idp_grp)
self.asrv.idp_checkin(self.hub, idp_usr, idp_grp)
else:
if not trusted_key:
t = 'the idp-h-key header ("%s") is not present in the request; will NOT trust the other headers saying that the client\'s username is "%s" and group is "%s"'
@ -626,7 +627,7 @@ class HttpCli(object):
msg += "hint: %s\r\n" % (self.hint,)
if "database is locked" in em:
self.conn.hsrv.broker.say("log_stacks")
self.hub.log_stacks()
msg += "hint: important info in the server log\r\n"
zb = b"<pre>" + html_escape(msg).encode("utf-8", "replace")
@ -1629,9 +1630,7 @@ class HttpCli(object):
lim = vfs.get_dbv(rem)[0].lim
fdir = vfs.canonical(rem)
if lim:
fdir, rem = lim.all(
self.ip, rem, remains, vfs.realpath, fdir, self.conn.hsrv.broker
)
fdir, rem = lim.all(self.ip, rem, remains, vfs.realpath, fdir, self.hub)
fn = None
if rem and not self.trailing_slash and not bos.path.isdir(fdir):
@ -1764,7 +1763,7 @@ class HttpCli(object):
lim.bup(self.ip, post_sz)
try:
lim.chk_sz(post_sz)
lim.chk_vsz(self.conn.hsrv.broker, vfs.realpath, post_sz)
lim.chk_vsz(self.hub, vfs.realpath, post_sz)
except:
wunlink(self.log, path, vfs.flags)
raise
@ -1823,8 +1822,7 @@ class HttpCli(object):
raise Pebkac(403, t)
vfs, rem = vfs.get_dbv(rem)
self.conn.hsrv.broker.say(
"up2k.hash_file",
self.hub.up2k.hash_file(
vfs.realpath,
vfs.vpath,
vfs.flags,
@ -2049,8 +2047,7 @@ class HttpCli(object):
# not to protect u2fh, but to prevent handshakes while files are closing
with self.u2mutex:
x = self.conn.hsrv.broker.ask("up2k.handle_json", body, self.u2fh.aps)
ret = x.get()
ret = self.hub.up2k.handle_json(body, self.u2fh.aps)
if self.is_vproxied:
if "purl" in ret:
@ -2138,7 +2135,7 @@ class HttpCli(object):
vfs, _ = self.asrv.vfs.get(self.vpath, self.uname, False, True)
ptop = (vfs.dbv or vfs).realpath
x = self.conn.hsrv.broker.ask("up2k.handle_chunk", ptop, wark, chash)
x = self.hub.up2k.handle_chunk(ptop, wark, chash)
response = x.get()
chunksize, cstart, path, lastmod, sprs = response
@ -2207,11 +2204,9 @@ class HttpCli(object):
f.close()
raise
finally:
x = self.conn.hsrv.broker.ask("up2k.release_chunk", ptop, wark, chash)
x.get() # block client until released
self.hub.up2k.release_chunk(ptop, wark, chash)
x = self.conn.hsrv.broker.ask("up2k.confirm_chunk", ptop, wark, chash)
ztis = x.get()
ztis = self.hub.up2k.confirm_chunk(ptop, wark, chash)
try:
num_left, fin_path = ztis
except:
@ -2223,9 +2218,7 @@ class HttpCli(object):
self.u2fh.close(path)
if not num_left and not self.args.nw:
self.conn.hsrv.broker.ask(
"up2k.finish_upload", ptop, wark, self.u2fh.aps
).get()
self.hub.up2k.finish_upload(ptop, wark, self.u2fh.aps)
cinf = self.headers.get("x-up2k-stat", "")
@ -2403,7 +2396,7 @@ class HttpCli(object):
fdir_base = vfs.canonical(rem)
if lim:
fdir_base, rem = lim.all(
self.ip, rem, -1, vfs.realpath, fdir_base, self.conn.hsrv.broker
self.ip, rem, -1, vfs.realpath, fdir_base, self.hub
)
upload_vpath = "{}/{}".format(vfs.vpath, rem).strip("/")
if not nullwrite:
@ -2511,7 +2504,7 @@ class HttpCli(object):
try:
lim.chk_df(tabspath, sz, True)
lim.chk_sz(sz)
lim.chk_vsz(self.conn.hsrv.broker, vfs.realpath, sz)
lim.chk_vsz(self.hub, vfs.realpath, sz)
lim.chk_bup(self.ip)
lim.chk_nup(self.ip)
except:
@ -2549,8 +2542,7 @@ class HttpCli(object):
raise Pebkac(403, t)
dbv, vrem = vfs.get_dbv(rem)
self.conn.hsrv.broker.say(
"up2k.hash_file",
self.hub.up2k.hash_file(
dbv.realpath,
vfs.vpath,
dbv.flags,
@ -2697,7 +2689,7 @@ class HttpCli(object):
fp = vfs.canonical(rp)
lim = vfs.get_dbv(rem)[0].lim
if lim:
fp, rp = lim.all(self.ip, rp, clen, vfs.realpath, fp, self.conn.hsrv.broker)
fp, rp = lim.all(self.ip, rp, clen, vfs.realpath, fp, self.hub)
bos.makedirs(fp)
fp = os.path.join(fp, fn)
@ -2799,7 +2791,7 @@ class HttpCli(object):
lim.bup(self.ip, sz)
try:
lim.chk_sz(sz)
lim.chk_vsz(self.conn.hsrv.broker, vfs.realpath, sz)
lim.chk_vsz(self.hub, vfs.realpath, sz)
except:
wunlink(self.log, fp, vfs.flags)
raise
@ -2828,8 +2820,7 @@ class HttpCli(object):
raise Pebkac(403, t)
vfs, rem = vfs.get_dbv(rem)
self.conn.hsrv.broker.say(
"up2k.hash_file",
self.hub.up2k.hash_file(
vfs.realpath,
vfs.vpath,
vfs.flags,
@ -3363,8 +3354,8 @@ class HttpCli(object):
]
if self.avol and not self.args.no_rescan:
x = self.conn.hsrv.broker.ask("up2k.get_state")
vs = json.loads(x.get())
zs = self.hub.up2k.get_state()
vs = json.loads(zs)
vstate = {("/" + k).rstrip("/") + "/": v for k, v in vs["volstate"].items()}
else:
vstate = {}
@ -3508,10 +3499,8 @@ class HttpCli(object):
vn, _ = self.asrv.vfs.get(self.vpath, self.uname, True, True)
args = [self.asrv.vfs.all_vols, [vn.vpath], False, True]
err = self.hub.up2k.rescan(self.asrv.vfs.all_vols, [vn.vpath], False, True)
x = self.conn.hsrv.broker.ask("up2k.rescan", *args)
err = x.get()
if not err:
self.redirect("", "?h")
return True
@ -3529,8 +3518,8 @@ class HttpCli(object):
if self.args.no_reload:
raise Pebkac(403, "the reload feature is disabled in server config")
x = self.conn.hsrv.broker.ask("reload")
return self.redirect("", "?h", x.get(), "return to", False)
zs = self.hub.reload()
return self.redirect("", "?h", zs, "return to", False)
def tx_stack(self) -> bool:
if not self.avol and not [x for x in self.wvol if x in self.rvol]:
@ -3632,10 +3621,7 @@ class HttpCli(object):
and (self.uname in vol.axs.uread or self.uname in vol.axs.upget)
}
x = self.conn.hsrv.broker.ask(
"up2k.get_unfinished_by_user", self.uname, self.ip
)
uret = x.get()
uret = self.hub.up2k.get_unfinished_by_user(self.uname, self.ip)
if not self.args.unpost:
allvols = []
@ -3721,10 +3707,8 @@ class HttpCli(object):
nlim = int(self.uparam.get("lim") or 0)
lim = [nlim, nlim] if nlim else []
x = self.conn.hsrv.broker.ask(
"up2k.handle_rm", self.uname, self.ip, req, lim, False, unpost
)
self.loud_reply(x.get())
zs = self.hub.up2k.handle_rm(self.uname, self.ip, req, lim, False, unpost)
self.loud_reply(zs)
return True
def handle_mv(self) -> bool:
@ -3746,8 +3730,8 @@ class HttpCli(object):
if self.args.no_mv:
raise Pebkac(403, "the rename/move feature is disabled in server config")
x = self.conn.hsrv.broker.ask("up2k.handle_mv", self.uname, vsrc, vdst)
self.loud_reply(x.get(), status=201)
zs = self.hub.up2k.handle_mv(self.uname, vsrc, vdst)
self.loud_reply(zs, status=201)
return True
def tx_ls(self, ls: dict[str, Any]) -> bool:

View file

@ -58,7 +58,7 @@ class HttpConn(object):
self.ipa_nm: Optional[NetMap] = hsrv.ipa_nm
self.xff_nm: Optional[NetMap] = hsrv.xff_nm
self.xff_lan: NetMap = hsrv.xff_lan # type: ignore
self.iphash: HMaccas = hsrv.broker.iphash
self.iphash: HMaccas = hsrv.hub.iphash
self.bans: dict[str, int] = hsrv.bans
self.aclose: dict[str, int] = hsrv.aclose

View file

@ -77,7 +77,7 @@ from .util import (
)
if TYPE_CHECKING:
from .broker_util import BrokerCli
from .svchub import SvcHub
from .ssdp import SSDPr
if True: # pylint: disable=using-constant-test
@ -90,16 +90,13 @@ class HttpSrv(object):
relying on MpSrv for performance (HttpSrv is just plain threads)
"""
def __init__(self, broker: "BrokerCli", nid: Optional[int]) -> None:
self.broker = broker
def __init__(self, hub: "SvcHub", nid: Optional[int]) -> None:
self.hub = hub
self.nid = nid
self.args = broker.args
self.args = hub.args
self.E: EnvParams = self.args.E
self.log = broker.log
self.asrv = broker.asrv
# redefine in case of multiprocessing
socket.setdefaulttimeout(120)
self.log = hub.log
self.asrv = hub.asrv
self.t0 = time.time()
nsuf = "-n{}-i{:x}".format(nid, os.getpid()) if nid else ""
@ -169,7 +166,7 @@ class HttpSrv(object):
if self.args.zs:
from .ssdp import SSDPr
self.ssdp = SSDPr(broker)
self.ssdp = SSDPr(hub)
if self.tp_q:
self.start_threads(4)
@ -186,8 +183,7 @@ class HttpSrv(object):
def post_init(self) -> None:
try:
x = self.broker.ask("thumbsrv.getcfg")
self.th_cfg = x.get()
self.th_cfg = self.hub.thumbsrv.getcfg()
except:
pass
@ -237,19 +233,11 @@ class HttpSrv(object):
self.t_periodic = None
return
def listen(self, sck: socket.socket, nlisteners: int) -> None:
if self.args.j != 1:
# lost in the pickle; redefine
if not ANYWIN or self.args.reuseaddr:
sck.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sck.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sck.settimeout(None) # < does not inherit, ^ opts above do
def listen(self, sck: socket.socket) -> None:
ip, port = sck.getsockname()[:2]
self.srvs.append(sck)
self.bound.add((ip, port))
self.nclimax = math.ceil(self.args.nc * 1.0 / nlisteners)
self.nclimax = self.args.nc
Daemon(
self.thr_listen,
"httpsrv-n{}-listen-{}-{}".format(self.nid or "0", ip, port),
@ -265,7 +253,7 @@ class HttpSrv(object):
self.log(self.name, msg)
def fun() -> None:
self.broker.say("cb_httpsrv_up")
self.hub.cb_httpsrv_up()
threading.Thread(target=fun, name="sig-hsrv-up1").start()

View file

@ -15,6 +15,7 @@ if TYPE_CHECKING:
class Metrics(object):
def __init__(self, hsrv: "HttpSrv") -> None:
self.hsrv = hsrv
self.hub = hsrv.hub
def tx(self, cli: "HttpCli") -> bool:
if not cli.avol:
@ -88,8 +89,8 @@ class Metrics(object):
addg("cpp_total_bans", str(self.hsrv.nban), t)
if not args.nos_vst:
x = self.hsrv.broker.ask("up2k.get_state")
vs = json.loads(x.get())
zs = self.hub.up2k.get_state()
vs = json.loads(zs.get())
nvidle = 0
nvbusy = 0
@ -146,8 +147,7 @@ class Metrics(object):
volsizes = []
try:
ptops = [x.realpath for _, x in allvols]
x = self.hsrv.broker.ask("up2k.get_volsizes", ptops)
volsizes = x.get()
volsizes = self.hub.up2k.get_volsizes(ptops)
except Exception as ex:
cli.log("tx_stats get_volsizes: {!r}".format(ex), 3)
@ -204,8 +204,7 @@ class Metrics(object):
tnbytes = 0
tnfiles = 0
try:
x = self.hsrv.broker.ask("up2k.get_unfinished")
xs = x.get()
xs = self.hub.up2k.get_unfinished()
if not xs:
raise Exception("up2k mutex acquisition timed out")

View file

@ -12,7 +12,6 @@ from .multicast import MC_Sck, MCast
from .util import CachedSet, html_escape, min_ex
if TYPE_CHECKING:
from .broker_util import BrokerCli
from .httpcli import HttpCli
from .svchub import SvcHub
@ -32,9 +31,9 @@ class SSDP_Sck(MC_Sck):
class SSDPr(object):
"""generates http responses for httpcli"""
def __init__(self, broker: "BrokerCli") -> None:
self.broker = broker
self.args = broker.args
def __init__(self, hub: "SvcHub") -> None:
self.hub = hub
self.args = hub.args
def reply(self, hc: "HttpCli") -> bool:
if hc.vpath.endswith("device.xml"):

View file

@ -28,9 +28,10 @@ if True: # pylint: disable=using-constant-test
import typing
from typing import Any, Optional, Union
from .__init__ import ANYWIN, EXE, MACOS, TYPE_CHECKING, E, EnvParams, unicode
from .__init__ import ANYWIN, EXE, TYPE_CHECKING, E, EnvParams, unicode
from .authsrv import BAD_CFG, AuthSrv
from .cert import ensure_cert
from .httpsrv import HttpSrv
from .mtag import HAVE_FFMPEG, HAVE_FFPROBE
from .tcpsrv import TcpSrv
from .th_srv import HAVE_PIL, HAVE_VIPS, HAVE_WEBP, ThumbSrv
@ -51,7 +52,6 @@ from .util import (
ansi_re,
build_netmap,
min_ex,
mp,
odfusion,
pybin,
start_log_thrs,
@ -67,16 +67,6 @@ if TYPE_CHECKING:
class SvcHub(object):
"""
Hosts all services which cannot be parallelized due to reliance on monolithic resources.
Creates a Broker which does most of the heavy stuff; hosted services can use this to perform work:
hub.broker.<say|ask>(destination, args_list).
Either BrokerThr (plain threads) or BrokerMP (multiprocessing) is used depending on configuration.
Nothing is returned synchronously; if you want any value returned from the call,
put() can return a queue (if want_reply=True) which has a blocking get() with the response.
"""
def __init__(
self,
args: argparse.Namespace,
@ -163,16 +153,6 @@ class SvcHub(object):
if args.log_thrs:
start_log_thrs(self.log, args.log_thrs, 0)
if not args.use_fpool and args.j != 1:
args.no_fpool = True
t = "multithreading enabled with -j {}, so disabling fpool -- this can reduce upload performance on some filesystems"
self.log("root", t.format(args.j))
if not args.no_fpool and args.j != 1:
t = "WARNING: ignoring --use-fpool because multithreading (-j{}) is enabled"
self.log("root", t.format(args.j), c=3)
args.no_fpool = True
for name, arg in (
("iobuf", "iobuf"),
("s-rd-sz", "s_rd_sz"),
@ -316,13 +296,7 @@ class SvcHub(object):
self.mdns: Optional["MDNS"] = None
self.ssdp: Optional["SSDPd"] = None
# decide which worker impl to use
if self.check_mp_enable():
from .broker_mp import BrokerMp as Broker
else:
from .broker_thr import BrokerThr as Broker # type: ignore
self.broker = Broker(self)
self.httpsrv = HttpSrv(self, None)
def start_ftpd(self) -> None:
time.sleep(30)
@ -361,15 +335,14 @@ class SvcHub(object):
def thr_httpsrv_up(self) -> None:
time.sleep(1 if self.args.ign_ebind_all else 5)
expected = self.broker.num_workers * self.tcpsrv.nsrv
expected = self.tcpsrv.nsrv
failed = expected - self.httpsrv_up
if not failed:
return
if self.args.ign_ebind_all:
if not self.tcpsrv.srv:
for _ in range(self.broker.num_workers):
self.broker.say("cb_httpsrv_up")
self.cb_httpsrv_up()
return
if self.args.ign_ebind and self.tcpsrv.srv:
@ -387,8 +360,6 @@ class SvcHub(object):
def cb_httpsrv_up(self) -> None:
self.httpsrv_up += 1
if self.httpsrv_up != self.broker.num_workers:
return
ar = self.args
for _ in range(10 if ar.ftp or ar.ftps else 0):
@ -723,7 +694,6 @@ class SvcHub(object):
self.log("root", "reloading config")
self.asrv.reload()
self.up2k.reload(rescan_all_vols)
self.broker.reload()
self.reloading = 0
def _reload_blocking(self, rescan_all_vols: bool = True) -> None:
@ -808,7 +778,7 @@ class SvcHub(object):
tasks.append(Daemon(self.ssdp.stop, "ssdp"))
slp = time.time() + 0.5
self.broker.shutdown()
self.httpsrv.shutdown()
self.tcpsrv.shutdown()
self.up2k.shutdown()
@ -970,48 +940,6 @@ class SvcHub(object):
if ex.errno != errno.EPIPE:
raise
def check_mp_support(self) -> str:
if MACOS:
return "multiprocessing is wonky on mac osx;"
elif sys.version_info < (3, 3):
return "need python 3.3 or newer for multiprocessing;"
try:
x: mp.Queue[tuple[str, str]] = mp.Queue(1)
x.put(("foo", "bar"))
if x.get()[0] != "foo":
raise Exception()
except:
return "multiprocessing is not supported on your platform;"
return ""
def check_mp_enable(self) -> bool:
if self.args.j == 1:
return False
try:
if mp.cpu_count() <= 1:
raise Exception()
except:
self.log("svchub", "only one CPU detected; multiprocessing disabled")
return False
try:
# support vscode debugger (bonus: same behavior as on windows)
mp.set_start_method("spawn", True)
except AttributeError:
# py2.7 probably, anyways dontcare
pass
err = self.check_mp_support()
if not err:
return True
else:
self.log("svchub", err)
self.log("svchub", "cannot efficiently use multiple CPU cores")
return False
def sd_notify(self) -> None:
try:
zb = os.getenv("NOTIFY_SOCKET")

View file

@ -297,7 +297,7 @@ class TcpSrv(object):
if self.args.q:
print(msg)
self.hub.broker.say("listen", srv)
self.hub.httpsrv.listen(srv)
self.srv = srvs
self.bound = bound
@ -305,7 +305,7 @@ class TcpSrv(object):
self._distribute_netdevs()
def _distribute_netdevs(self):
self.hub.broker.say("set_netdevs", self.netdevs)
self.hub.httpsrv.set_netdevs(self.netdevs)
self.hub.start_zeroconf()
gencert(self.log, self.args, self.netdevs)
self.hub.restart_ftpd()

View file

@ -7,7 +7,6 @@ from .__init__ import TYPE_CHECKING
from .authsrv import VFS
from .bos import bos
from .th_srv import HAVE_WEBP, thumb_path
from .util import Cooldown
if True: # pylint: disable=using-constant-test
from typing import Optional, Union
@ -18,14 +17,11 @@ if TYPE_CHECKING:
class ThumbCli(object):
def __init__(self, hsrv: "HttpSrv") -> None:
self.broker = hsrv.broker
self.hub = hsrv.hub
self.log_func = hsrv.log
self.args = hsrv.args
self.asrv = hsrv.asrv
# cache on both sides for less broker spam
self.cooldown = Cooldown(self.args.th_poke)
try:
c = hsrv.th_cfg
if not c:
@ -134,13 +130,11 @@ class ThumbCli(object):
if ret:
tdir = os.path.dirname(tpath)
if self.cooldown.poke(tdir):
self.broker.say("thumbsrv.poke", tdir)
self.hub.thumbsrv.poke(tdir)
if want_opus:
# audio files expire individually
if self.cooldown.poke(tpath):
self.broker.say("thumbsrv.poke", tpath)
self.hub.thumbsrv.poke(tpath)
return ret
@ -150,5 +144,4 @@ class ThumbCli(object):
if not bos.path.getsize(os.path.join(ptop, rem)):
return None
x = self.broker.ask("thumbsrv.get", ptop, rem, mtime, fmt)
return x.get() # type: ignore
return self.hub.thumbsrv.get(ptop, rem, mtime, fmt)

View file

@ -2745,9 +2745,9 @@ class Up2k(object):
cj["size"],
cj["ptop"],
ap1,
self.hub.broker,
self.hub,
reg,
"up2k._get_volsize",
"_get_volsize",
)
bos.makedirs(ap2)
vfs.lim.nup(cj["addr"])

View file

@ -69,8 +69,6 @@ sed -ri s/copyparty.exe/copyparty$esuf.exe/ loader.rc2
excl=(
asyncio
copyparty.broker_mp
copyparty.broker_mpw
copyparty.smbd
ctypes.macholib
curses

View file

@ -7,10 +7,6 @@ copyparty/bos,
copyparty/bos/__init__.py,
copyparty/bos/bos.py,
copyparty/bos/path.py,
copyparty/broker_mp.py,
copyparty/broker_mpw.py,
copyparty/broker_thr.py,
copyparty/broker_util.py,
copyparty/cert.py,
copyparty/cfg.py,
copyparty/dxml.py,

View file

@ -170,12 +170,14 @@ class Cfg(Namespace):
)
class NullBroker(object):
def say(self, *args):
class NullUp2k(object):
def hash_file(*a):
pass
def ask(self, *args):
pass
class NullHub(object):
def __init__(self):
self.up2k = NullUp2k()
class VSock(object):
@ -206,7 +208,7 @@ class VHttpSrv(object):
self.asrv = asrv
self.log = log
self.broker = NullBroker()
self.hub = NullHub()
self.prism = None
self.bans = {}
self.nreq = 0