From 578a915884ba0f4aa56dfb3d732043a6875ac92f Mon Sep 17 00:00:00 2001 From: ed Date: Mon, 12 Jul 2021 23:03:52 +0200 Subject: [PATCH] stack/thread monitors in mpw + better thread names --- copyparty/broker_mp.py | 5 ++--- copyparty/broker_mpw.py | 2 +- copyparty/broker_thr.py | 2 +- copyparty/httpconn.py | 1 - copyparty/httpsrv.py | 37 +++++++++++++++++++++++++------------ 5 files changed, 29 insertions(+), 18 deletions(-) diff --git a/copyparty/broker_mp.py b/copyparty/broker_mp.py index 5a1f23ad..2bd18f5d 100644 --- a/copyparty/broker_mp.py +++ b/copyparty/broker_mp.py @@ -27,18 +27,17 @@ class BrokerMp(object): cores = mp.cpu_count() self.log("broker", "booting {} subprocesses".format(cores)) - for n in range(cores): + for n in range(1, cores + 1): q_pend = mp.Queue(1) q_yield = mp.Queue(64) proc = mp.Process(target=MpWorker, args=(q_pend, q_yield, self.args, n)) proc.q_pend = q_pend proc.q_yield = q_yield - proc.nid = n proc.clients = {} thr = threading.Thread( - target=self.collector, args=(proc,), name="mp-collector" + target=self.collector, args=(proc,), name="mp-sink-{}".format(n) ) thr.daemon = True thr.start() diff --git a/copyparty/broker_mpw.py b/copyparty/broker_mpw.py index 979d6874..c5c63bcb 100644 --- a/copyparty/broker_mpw.py +++ b/copyparty/broker_mpw.py @@ -35,7 +35,7 @@ class MpWorker(object): self.asrv = AuthSrv(args, None, False) # instantiate all services here (TODO: inheritance?) - self.httpsrv = HttpSrv(self, True) + self.httpsrv = HttpSrv(self, n) # on winxp and some other platforms, # use thr.join() to block all signals diff --git a/copyparty/broker_thr.py b/copyparty/broker_thr.py index a290519e..c8d88a95 100644 --- a/copyparty/broker_thr.py +++ b/copyparty/broker_thr.py @@ -19,7 +19,7 @@ class BrokerThr(object): self.mutex = threading.Lock() # instantiate all services here (TODO: inheritance?) - self.httpsrv = HttpSrv(self) + self.httpsrv = HttpSrv(self, None) def shutdown(self): # self.log("broker", "shutting down") diff --git a/copyparty/httpconn.py b/copyparty/httpconn.py index 9fe6a944..32681640 100644 --- a/copyparty/httpconn.py +++ b/copyparty/httpconn.py @@ -34,7 +34,6 @@ class HttpConn(object): self.args = hsrv.args self.asrv = hsrv.asrv - self.is_mp = hsrv.is_mp self.cert_path = hsrv.cert_path enth = HAVE_PIL and not self.args.no_thumb diff --git a/copyparty/httpsrv.py b/copyparty/httpsrv.py index dfb8c458..3c265df8 100644 --- a/copyparty/httpsrv.py +++ b/copyparty/httpsrv.py @@ -27,7 +27,7 @@ except ImportError: sys.exit(1) from .__init__ import E, PY2, MACOS -from .util import spack, min_ex +from .util import spack, min_ex, start_stackmon, start_log_thrs from .httpconn import HttpConn if PY2: @@ -42,14 +42,14 @@ class HttpSrv(object): relying on MpSrv for performance (HttpSrv is just plain threads) """ - def __init__(self, broker, is_mp=False): + def __init__(self, broker, nid): self.broker = broker - self.is_mp = is_mp + self.nid = nid self.args = broker.args self.log = broker.log self.asrv = broker.asrv - self.name = "httpsrv-i{:x}".format(os.getpid()) + self.name = "httpsrv" + ("-n{}-i{:x}".format(nid, os.getpid()) if nid else "") self.mutex = threading.Lock() self.stopping = False @@ -81,10 +81,18 @@ class HttpSrv(object): if self.tp_q: self.start_threads(4) - t = threading.Thread(target=self.thr_scaler) + name = "httpsrv-scaler" + ("-{}".format(nid) if nid else "") + t = threading.Thread(target=self.thr_scaler, name=name) t.daemon = True t.start() + if nid: + if self.args.stackmon: + start_stackmon(self.args.stackmon, nid) + + if self.args.log_thrs: + start_log_thrs(self.log, self.args.log_thrs, nid) + def start_threads(self, n): self.tp_nthr += n if self.args.log_htp: @@ -93,7 +101,7 @@ class HttpSrv(object): for _ in range(n): thr = threading.Thread( target=self.thr_poolw, - name="httpsrv-poolw", + name=self.name + "-poolw", ) thr.daemon = True thr.start() @@ -115,9 +123,14 @@ class HttpSrv(object): self.stop_threads(4) def listen(self, sck, nlisteners): + ip, port = sck.getsockname() self.srvs.append(sck) self.nclimax = math.ceil(self.args.nc * 1.0 / nlisteners) - t = threading.Thread(target=self.thr_listen, args=(sck,)) + t = threading.Thread( + target=self.thr_listen, + args=(sck,), + name="httpsrv-n{}-listen-{}-{}".format(self.nid or "0", ip, port), + ) t.daemon = True t.start() @@ -181,7 +194,7 @@ class HttpSrv(object): thr = threading.Thread( target=self.thr_client, args=(sck, addr), - name="httpsrv-{}-{}".format(addr[0].split(".", 2)[-1][-6:], addr[1]), + name="httpconn-{}-{}".format(addr[0].split(".", 2)[-1][-6:], addr[1]), ) thr.daemon = True thr.start() @@ -198,11 +211,11 @@ class HttpSrv(object): try: sck, addr = task me = threading.current_thread() - me.name = ( - "httpsrv-{}-{}".format(addr[0].split(".", 2)[-1][-6:], addr[1]), + me.name = "httpconn-{}-{}".format( + addr[0].split(".", 2)[-1][-6:], addr[1] ) self.thr_client(sck, addr) - me.name = "httpsrv-poolw" + me.name = self.name + "-poolw" except: self.log(self.name, "thr_client: " + min_ex(), 3) @@ -228,7 +241,7 @@ class HttpSrv(object): if self.tp_q.empty(): break - self.log("httpsrv-i" + str(os.getpid()), "ok bye") + self.log(self.name, "ok bye") def thr_client(self, sck, addr): """thread managing one tcp client"""