scale max-clients to mp-workers

This commit is contained in:
ed 2021-07-09 16:48:02 +02:00
parent 80cc26bd95
commit 6c3a976222
4 changed files with 11 additions and 8 deletions

View file

@ -102,7 +102,7 @@ class BrokerMp(object):
""" """
if dest == "listen": if dest == "listen":
for p in self.procs: for p in self.procs:
p.q_pend.put([0, dest, args]) p.q_pend.put([0, dest, [args[0], len(self.procs)]])
else: else:
raise Exception("what is " + str(dest)) raise Exception("what is " + str(dest))

View file

@ -64,7 +64,7 @@ class MpWorker(object):
return return
elif dest == "listen": elif dest == "listen":
self.httpsrv.listen(args[0]) self.httpsrv.listen(args[0], args[1])
elif dest == "retq": elif dest == "retq":
# response from previous ipc call # response from previous ipc call

View file

@ -28,7 +28,7 @@ class BrokerThr(object):
def put(self, want_retval, dest, *args): def put(self, want_retval, dest, *args):
if dest == "listen": if dest == "listen":
self.httpsrv.listen(args[0]) self.httpsrv.listen(args[0], 1)
else: else:
# new ipc invoking managed service in hub # new ipc invoking managed service in hub

View file

@ -4,6 +4,7 @@ from __future__ import print_function, unicode_literals
import os import os
import sys import sys
import time import time
import math
import base64 import base64
import socket import socket
import threading import threading
@ -58,8 +59,9 @@ class HttpSrv(object):
self.tp_q = None if self.args.no_htp else queue.LifoQueue() self.tp_q = None if self.args.no_htp else queue.LifoQueue()
self.srvs = [] self.srvs = []
self.ncli = 0 self.ncli = 0 # exact
self.clients = {} self.clients = {} # laggy
self.nclimax = 0
self.cb_ts = 0 self.cb_ts = 0
self.cb_v = 0 self.cb_v = 0
@ -112,8 +114,9 @@ class HttpSrv(object):
if self.tp_nthr > self.tp_ncli + 8: if self.tp_nthr > self.tp_ncli + 8:
self.stop_threads(4) self.stop_threads(4)
def listen(self, sck): def listen(self, sck, nlisteners):
self.srvs.append(sck) self.srvs.append(sck)
self.nclimax = math.ceil(self.args.nc * 1.0 / nlisteners)
t = threading.Thread(target=self.thr_listen, args=(sck,)) t = threading.Thread(target=self.thr_listen, args=(sck,))
t.daemon = True t.daemon = True
t.start() t.start()
@ -128,9 +131,9 @@ class HttpSrv(object):
if self.args.log_conn: if self.args.log_conn:
self.log(self.name, "|%sC-ncli" % ("-" * 1,), c="1;30") self.log(self.name, "|%sC-ncli" % ("-" * 1,), c="1;30")
if self.ncli >= self.args.nc: if self.ncli >= self.nclimax:
self.log(self.name, "at connection limit; waiting", 3) self.log(self.name, "at connection limit; waiting", 3)
while self.ncli >= self.args.nc: while self.ncli >= self.nclimax:
time.sleep(0.1) time.sleep(0.1)
if self.args.log_conn: if self.args.log_conn: