From 6c3a976222fe0fcae63ab428cb11849aa41c81a3 Mon Sep 17 00:00:00 2001 From: ed Date: Fri, 9 Jul 2021 16:48:02 +0200 Subject: [PATCH] scale max-clients to mp-workers --- copyparty/broker_mp.py | 2 +- copyparty/broker_mpw.py | 2 +- copyparty/broker_thr.py | 2 +- copyparty/httpsrv.py | 13 ++++++++----- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/copyparty/broker_mp.py b/copyparty/broker_mp.py index efcc3105..5a1f23ad 100644 --- a/copyparty/broker_mp.py +++ b/copyparty/broker_mp.py @@ -102,7 +102,7 @@ class BrokerMp(object): """ if dest == "listen": for p in self.procs: - p.q_pend.put([0, dest, args]) + p.q_pend.put([0, dest, [args[0], len(self.procs)]]) else: raise Exception("what is " + str(dest)) diff --git a/copyparty/broker_mpw.py b/copyparty/broker_mpw.py index 7feaa1de..6ca61bdb 100644 --- a/copyparty/broker_mpw.py +++ b/copyparty/broker_mpw.py @@ -64,7 +64,7 @@ class MpWorker(object): return elif dest == "listen": - self.httpsrv.listen(args[0]) + self.httpsrv.listen(args[0], args[1]) elif dest == "retq": # response from previous ipc call diff --git a/copyparty/broker_thr.py b/copyparty/broker_thr.py index c33fa015..a290519e 100644 --- a/copyparty/broker_thr.py +++ b/copyparty/broker_thr.py @@ -28,7 +28,7 @@ class BrokerThr(object): def put(self, want_retval, dest, *args): if dest == "listen": - self.httpsrv.listen(args[0]) + self.httpsrv.listen(args[0], 1) else: # new ipc invoking managed service in hub diff --git a/copyparty/httpsrv.py b/copyparty/httpsrv.py index a4255ee1..dfb8c458 100644 --- a/copyparty/httpsrv.py +++ b/copyparty/httpsrv.py @@ -4,6 +4,7 @@ from __future__ import print_function, unicode_literals import os import sys import time +import math import base64 import socket import threading @@ -58,8 +59,9 @@ class HttpSrv(object): self.tp_q = None if self.args.no_htp else queue.LifoQueue() self.srvs = [] - self.ncli = 0 - self.clients = {} + self.ncli = 0 # exact + self.clients = {} # laggy + self.nclimax = 0 self.cb_ts = 0 self.cb_v = 0 @@ -112,8 +114,9 @@ class HttpSrv(object): if self.tp_nthr > self.tp_ncli + 8: self.stop_threads(4) - def listen(self, sck): + def listen(self, sck, nlisteners): self.srvs.append(sck) + self.nclimax = math.ceil(self.args.nc * 1.0 / nlisteners) t = threading.Thread(target=self.thr_listen, args=(sck,)) t.daemon = True t.start() @@ -128,9 +131,9 @@ class HttpSrv(object): if self.args.log_conn: self.log(self.name, "|%sC-ncli" % ("-" * 1,), c="1;30") - if self.ncli >= self.args.nc: + if self.ncli >= self.nclimax: self.log(self.name, "at connection limit; waiting", 3) - while self.ncli >= self.args.nc: + while self.ncli >= self.nclimax: time.sleep(0.1) if self.args.log_conn: