stack/thread monitors in mpw + better thread names

This commit is contained in:
ed 2021-07-12 23:03:52 +02:00
parent eacafb8a63
commit 578a915884
5 changed files with 29 additions and 18 deletions

View file

@ -27,18 +27,17 @@ class BrokerMp(object):
cores = mp.cpu_count() cores = mp.cpu_count()
self.log("broker", "booting {} subprocesses".format(cores)) self.log("broker", "booting {} subprocesses".format(cores))
for n in range(cores): for n in range(1, cores + 1):
q_pend = mp.Queue(1) q_pend = mp.Queue(1)
q_yield = mp.Queue(64) q_yield = mp.Queue(64)
proc = mp.Process(target=MpWorker, args=(q_pend, q_yield, self.args, n)) proc = mp.Process(target=MpWorker, args=(q_pend, q_yield, self.args, n))
proc.q_pend = q_pend proc.q_pend = q_pend
proc.q_yield = q_yield proc.q_yield = q_yield
proc.nid = n
proc.clients = {} proc.clients = {}
thr = threading.Thread( thr = threading.Thread(
target=self.collector, args=(proc,), name="mp-collector" target=self.collector, args=(proc,), name="mp-sink-{}".format(n)
) )
thr.daemon = True thr.daemon = True
thr.start() thr.start()

View file

@ -35,7 +35,7 @@ class MpWorker(object):
self.asrv = AuthSrv(args, None, False) self.asrv = AuthSrv(args, None, False)
# instantiate all services here (TODO: inheritance?) # instantiate all services here (TODO: inheritance?)
self.httpsrv = HttpSrv(self, True) self.httpsrv = HttpSrv(self, n)
# on winxp and some other platforms, # on winxp and some other platforms,
# use thr.join() to block all signals # use thr.join() to block all signals

View file

@ -19,7 +19,7 @@ class BrokerThr(object):
self.mutex = threading.Lock() self.mutex = threading.Lock()
# instantiate all services here (TODO: inheritance?) # instantiate all services here (TODO: inheritance?)
self.httpsrv = HttpSrv(self) self.httpsrv = HttpSrv(self, None)
def shutdown(self): def shutdown(self):
# self.log("broker", "shutting down") # self.log("broker", "shutting down")

View file

@ -34,7 +34,6 @@ class HttpConn(object):
self.args = hsrv.args self.args = hsrv.args
self.asrv = hsrv.asrv self.asrv = hsrv.asrv
self.is_mp = hsrv.is_mp
self.cert_path = hsrv.cert_path self.cert_path = hsrv.cert_path
enth = HAVE_PIL and not self.args.no_thumb enth = HAVE_PIL and not self.args.no_thumb

View file

@ -27,7 +27,7 @@ except ImportError:
sys.exit(1) sys.exit(1)
from .__init__ import E, PY2, MACOS from .__init__ import E, PY2, MACOS
from .util import spack, min_ex from .util import spack, min_ex, start_stackmon, start_log_thrs
from .httpconn import HttpConn from .httpconn import HttpConn
if PY2: if PY2:
@ -42,14 +42,14 @@ class HttpSrv(object):
relying on MpSrv for performance (HttpSrv is just plain threads) relying on MpSrv for performance (HttpSrv is just plain threads)
""" """
def __init__(self, broker, is_mp=False): def __init__(self, broker, nid):
self.broker = broker self.broker = broker
self.is_mp = is_mp self.nid = nid
self.args = broker.args self.args = broker.args
self.log = broker.log self.log = broker.log
self.asrv = broker.asrv self.asrv = broker.asrv
self.name = "httpsrv-i{:x}".format(os.getpid()) self.name = "httpsrv" + ("-n{}-i{:x}".format(nid, os.getpid()) if nid else "")
self.mutex = threading.Lock() self.mutex = threading.Lock()
self.stopping = False self.stopping = False
@ -81,10 +81,18 @@ class HttpSrv(object):
if self.tp_q: if self.tp_q:
self.start_threads(4) self.start_threads(4)
t = threading.Thread(target=self.thr_scaler) name = "httpsrv-scaler" + ("-{}".format(nid) if nid else "")
t = threading.Thread(target=self.thr_scaler, name=name)
t.daemon = True t.daemon = True
t.start() t.start()
if nid:
if self.args.stackmon:
start_stackmon(self.args.stackmon, nid)
if self.args.log_thrs:
start_log_thrs(self.log, self.args.log_thrs, nid)
def start_threads(self, n): def start_threads(self, n):
self.tp_nthr += n self.tp_nthr += n
if self.args.log_htp: if self.args.log_htp:
@ -93,7 +101,7 @@ class HttpSrv(object):
for _ in range(n): for _ in range(n):
thr = threading.Thread( thr = threading.Thread(
target=self.thr_poolw, target=self.thr_poolw,
name="httpsrv-poolw", name=self.name + "-poolw",
) )
thr.daemon = True thr.daemon = True
thr.start() thr.start()
@ -115,9 +123,14 @@ class HttpSrv(object):
self.stop_threads(4) self.stop_threads(4)
def listen(self, sck, nlisteners): def listen(self, sck, nlisteners):
ip, port = sck.getsockname()
self.srvs.append(sck) self.srvs.append(sck)
self.nclimax = math.ceil(self.args.nc * 1.0 / nlisteners) self.nclimax = math.ceil(self.args.nc * 1.0 / nlisteners)
t = threading.Thread(target=self.thr_listen, args=(sck,)) t = threading.Thread(
target=self.thr_listen,
args=(sck,),
name="httpsrv-n{}-listen-{}-{}".format(self.nid or "0", ip, port),
)
t.daemon = True t.daemon = True
t.start() t.start()
@ -181,7 +194,7 @@ class HttpSrv(object):
thr = threading.Thread( thr = threading.Thread(
target=self.thr_client, target=self.thr_client,
args=(sck, addr), args=(sck, addr),
name="httpsrv-{}-{}".format(addr[0].split(".", 2)[-1][-6:], addr[1]), name="httpconn-{}-{}".format(addr[0].split(".", 2)[-1][-6:], addr[1]),
) )
thr.daemon = True thr.daemon = True
thr.start() thr.start()
@ -198,11 +211,11 @@ class HttpSrv(object):
try: try:
sck, addr = task sck, addr = task
me = threading.current_thread() me = threading.current_thread()
me.name = ( me.name = "httpconn-{}-{}".format(
"httpsrv-{}-{}".format(addr[0].split(".", 2)[-1][-6:], addr[1]), addr[0].split(".", 2)[-1][-6:], addr[1]
) )
self.thr_client(sck, addr) self.thr_client(sck, addr)
me.name = "httpsrv-poolw" me.name = self.name + "-poolw"
except: except:
self.log(self.name, "thr_client: " + min_ex(), 3) self.log(self.name, "thr_client: " + min_ex(), 3)
@ -228,7 +241,7 @@ class HttpSrv(object):
if self.tp_q.empty(): if self.tp_q.empty():
break break
self.log("httpsrv-i" + str(os.getpid()), "ok bye") self.log(self.name, "ok bye")
def thr_client(self, sck, addr): def thr_client(self, sck, addr):
"""thread managing one tcp client""" """thread managing one tcp client"""