From 8fcde2a5796c4d507fc12359fcaf03fc418e7963 Mon Sep 17 00:00:00 2001 From: ed Date: Fri, 9 Jul 2021 15:49:36 +0200 Subject: [PATCH] move tcp accept into mp-worker --- copyparty/broker_mp.py | 63 ++------------------------ copyparty/broker_mpw.py | 43 +----------------- copyparty/broker_thr.py | 13 +----- copyparty/httpcli.py | 11 +++-- copyparty/httpconn.py | 6 --- copyparty/httpsrv.py | 99 +++++++++++++++++++++-------------------- copyparty/svchub.py | 9 ++-- copyparty/tcpsrv.py | 42 ++--------------- copyparty/up2k.py | 8 ++-- copyparty/util.py | 15 +------ 10 files changed, 76 insertions(+), 233 deletions(-) diff --git a/copyparty/broker_mp.py b/copyparty/broker_mp.py index 52cc9f96..efcc3105 100644 --- a/copyparty/broker_mp.py +++ b/copyparty/broker_mp.py @@ -4,17 +4,11 @@ from __future__ import print_function, unicode_literals import time import threading -from .__init__ import PY2, WINDOWS, VT100 from .broker_util import try_exec from .broker_mpw import MpWorker from .util import mp -if PY2 and not WINDOWS: - from multiprocessing.reduction import ForkingPickler - from StringIO import StringIO as MemesIO # pylint: disable=import-error - - class BrokerMp(object): """external api; manages MpWorkers""" @@ -42,7 +36,6 @@ class BrokerMp(object): proc.q_yield = q_yield proc.nid = n proc.clients = {} - proc.workload = 0 thr = threading.Thread( target=self.collector, args=(proc,), name="mp-collector" @@ -53,13 +46,6 @@ class BrokerMp(object): self.procs.append(proc) proc.start() - if not self.args.q: - thr = threading.Thread( - target=self.debug_load_balancer, name="mp-dbg-loadbalancer" - ) - thr.daemon = True - thr.start() - def shutdown(self): self.log("broker", "shutting down") for n, proc in enumerate(self.procs): @@ -89,20 +75,6 @@ class BrokerMp(object): if dest == "log": self.log(*args) - elif dest == "workload": - with self.mutex: - proc.workload = args[0] - - elif dest == "httpdrop": - addr = args[0] - - with self.mutex: - del proc.clients[addr] - if not proc.clients: - proc.workload = 0 - - self.hub.tcpsrv.num_clients.add(-1) - elif dest == "retq": # response from previous ipc call with self.retpend_mutex: @@ -128,38 +100,9 @@ class BrokerMp(object): returns a Queue object which eventually contains the response if want_retval (not-impl here since nothing uses it yet) """ - if dest == "httpconn": - sck, addr = args - sck2 = sck - if PY2: - buf = MemesIO() - ForkingPickler(buf).dump(sck) - sck2 = buf.getvalue() - - proc = sorted(self.procs, key=lambda x: x.workload)[0] - proc.q_pend.put([0, dest, [sck2, addr]]) - - with self.mutex: - proc.clients[addr] = 50 - proc.workload += 50 + if dest == "listen": + for p in self.procs: + p.q_pend.put([0, dest, args]) else: raise Exception("what is " + str(dest)) - - def debug_load_balancer(self): - fmt = "\033[1m{}\033[0;36m{:4}\033[0m " - if not VT100: - fmt = "({}{:4})" - - last = "" - while self.procs: - msg = "" - for proc in self.procs: - msg += fmt.format(len(proc.clients), proc.workload) - - if msg != last: - last = msg - with self.hub.log_mutex: - print(msg) - - time.sleep(0.1) diff --git a/copyparty/broker_mpw.py b/copyparty/broker_mpw.py index 7ff2f2ef..7feaa1de 100644 --- a/copyparty/broker_mpw.py +++ b/copyparty/broker_mpw.py @@ -3,18 +3,13 @@ from __future__ import print_function, unicode_literals from copyparty.authsrv import AuthSrv import sys -import time import signal import threading -from .__init__ import PY2, WINDOWS from .broker_util import ExceptionalQueue from .httpsrv import HttpSrv from .util import FAKE_MP -if PY2 and not WINDOWS: - import pickle # nosec - class MpWorker(object): """one single mp instance""" @@ -28,7 +23,6 @@ class MpWorker(object): self.retpend = {} self.retpend_mutex = threading.Lock() self.mutex = threading.Lock() - self.workload_thr_alive = False # we inherited signal_handler from parent, # replace it with something harmless @@ -40,7 +34,6 @@ class MpWorker(object): # instantiate all services here (TODO: inheritance?) self.httpsrv = HttpSrv(self, True) - self.httpsrv.disconnect_func = self.httpdrop # on winxp and some other platforms, # use thr.join() to block all signals @@ -59,9 +52,6 @@ class MpWorker(object): def logw(self, msg, c=0): self.log("mp{}".format(self.n), msg, c) - def httpdrop(self, addr): - self.q_yield.put([0, "httpdrop", [addr]]) - def main(self): while True: retq_id, dest, args = self.q_pend.get() @@ -73,24 +63,8 @@ class MpWorker(object): sys.exit(0) return - elif dest == "httpconn": - sck, addr = args - if PY2: - sck = pickle.loads(sck) # nosec - - if self.args.log_conn: - self.log("%s %s" % addr, "|%sC-qpop" % ("-" * 4,), c="1;30") - - self.httpsrv.accept(sck, addr) - - with self.mutex: - if not self.workload_thr_alive: - self.workload_thr_alive = True - thr = threading.Thread( - target=self.thr_workload, name="mpw-workload" - ) - thr.daemon = True - thr.start() + elif dest == "listen": + self.httpsrv.listen(args[0]) elif dest == "retq": # response from previous ipc call @@ -114,16 +88,3 @@ class MpWorker(object): self.q_yield.put([retq_id, dest, args]) return retq - - def thr_workload(self): - """announce workloads to MpSrv (the mp controller / loadbalancer)""" - # avoid locking in extract_filedata by tracking difference here - while True: - time.sleep(0.2) - with self.mutex: - if self.httpsrv.num_clients() == 0: - # no clients rn, termiante thread - self.workload_thr_alive = False - return - - self.q_yield.put([0, "workload", [self.httpsrv.workload]]) diff --git a/copyparty/broker_thr.py b/copyparty/broker_thr.py index da2e056b..c33fa015 100644 --- a/copyparty/broker_thr.py +++ b/copyparty/broker_thr.py @@ -3,7 +3,6 @@ from __future__ import print_function, unicode_literals import threading -from .authsrv import AuthSrv from .httpsrv import HttpSrv from .broker_util import ExceptionalQueue, try_exec @@ -21,7 +20,6 @@ class BrokerThr(object): # instantiate all services here (TODO: inheritance?) self.httpsrv = HttpSrv(self) - self.httpsrv.disconnect_func = self.httpdrop def shutdown(self): # self.log("broker", "shutting down") @@ -29,12 +27,8 @@ class BrokerThr(object): pass def put(self, want_retval, dest, *args): - if dest == "httpconn": - sck, addr = args - if self.args.log_conn: - self.log("%s %s" % addr, "|%sC-qpop" % ("-" * 4,), c="1;30") - - self.httpsrv.accept(sck, addr) + if dest == "listen": + self.httpsrv.listen(args[0]) else: # new ipc invoking managed service in hub @@ -51,6 +45,3 @@ class BrokerThr(object): retq = ExceptionalQueue(1) retq.put(rv) return retq - - def httpdrop(self, addr): - self.hub.tcpsrv.num_clients.add(-1) diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 77b508a1..8d015829 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -483,7 +483,7 @@ class HttpCli(object): path = os.devnull with open(fsenc(path), "wb", 512 * 1024) as f: - post_sz, _, sha_b64 = hashcopy(self.conn, reader, f) + post_sz, _, sha_b64 = hashcopy(reader, f) if not self.args.nw: vfs, vrem = vfs.get_dbv(rem) @@ -715,7 +715,7 @@ class HttpCli(object): with open(fsenc(path), "rb+", 512 * 1024) as f: f.seek(cstart[0]) - post_sz, _, sha_b64 = hashcopy(self.conn, reader, f) + post_sz, _, sha_b64 = hashcopy(reader, f) if sha_b64 != chash: raise Pebkac( @@ -882,7 +882,7 @@ class HttpCli(object): with ren_open(fname, "wb", 512 * 1024, **open_args) as f: f, fname = f["orz"] self.log("writing to {}/{}".format(fdir, fname)) - sz, sha512_hex, _ = hashcopy(self.conn, p_data, f) + sz, sha512_hex, _ = hashcopy(p_data, f) if sz == 0: raise Pebkac(400, "empty files in post") @@ -1065,7 +1065,7 @@ class HttpCli(object): raise Pebkac(400, "expected body, got {}".format(p_field)) with open(fsenc(fp), "wb", 512 * 1024) as f: - sz, sha512, _ = hashcopy(self.conn, p_data, f) + sz, sha512, _ = hashcopy(p_data, f) new_lastmod = os.stat(fsenc(fp)).st_mtime new_lastmod3 = int(new_lastmod * 1000) @@ -1255,8 +1255,7 @@ class HttpCli(object): if use_sendfile: remains = sendfile_kern(lower, upper, f, self.s) else: - actor = self.conn if self.is_mp else None - remains = sendfile_py(lower, upper, f, self.s, actor) + remains = sendfile_py(lower, upper, f, self.s) if remains > 0: logmsg += " \033[31m" + unicode(upper - remains) + "\033[0m" diff --git a/copyparty/httpconn.py b/copyparty/httpconn.py index 342683bd..9fe6a944 100644 --- a/copyparty/httpconn.py +++ b/copyparty/httpconn.py @@ -45,7 +45,6 @@ class HttpConn(object): self.stopping = False self.nreq = 0 self.nbyte = 0 - self.workload = 0 self.u2idx = None self.log_func = hsrv.log self.lf_url = re.compile(self.args.lf_url) if self.args.lf_url else None @@ -184,11 +183,6 @@ class HttpConn(object): self.sr = Unrecv(self.s) while not self.stopping: - if self.is_mp: - self.workload += 50 - if self.workload >= 2 ** 31: - self.workload = 100 - self.nreq += 1 cli = HttpCli(self) if not cli.run(): diff --git a/copyparty/httpsrv.py b/copyparty/httpsrv.py index 2ed9a6a0..df8600db 100644 --- a/copyparty/httpsrv.py +++ b/copyparty/httpsrv.py @@ -48,17 +48,16 @@ class HttpSrv(object): self.log = broker.log self.asrv = broker.asrv - self.disconnect_func = None self.mutex = threading.Lock() + self.stopping = False self.tp_nthr = 0 # actual self.tp_ncli = 0 # fading self.tp_time = None # latest worker collect self.tp_q = None if self.args.no_htp else queue.LifoQueue() + self.srvs = [] self.clients = {} - self.workload = 0 - self.workload_thr_alive = False self.cb_ts = 0 self.cb_v = 0 @@ -111,11 +110,47 @@ class HttpSrv(object): if self.tp_nthr > self.tp_ncli + 8: self.stop_threads(4) + def listen(self, sck): + self.srvs.append(sck) + t = threading.Thread(target=self.thr_listen, args=(sck,)) + t.daemon = True + t.start() + + def thr_listen(self, srv_sck): + """listens on a shared tcp server""" + ip, port = srv_sck.getsockname() + fno = srv_sck.fileno() + msg = "subscribed @ {}:{} f{},p{}".format(ip, port, fno, os.getpid()) + self.log("httpsrv", msg) + while not self.stopping: + if self.args.log_conn: + self.log("httpsrv", "|%sC-ncli" % ("-" * 1,), c="1;30") + + if len(self.clients) >= self.args.nc: + time.sleep(0.1) + continue + + if self.args.log_conn: + self.log("httpsrv", "|%sC-acc1" % ("-" * 2,), c="1;30") + + try: + sck, addr = srv_sck.accept() + except (OSError, socket.error) as ex: + self.log("httpsrv", "accept({}): {}".format(fno, ex), c=6) + if ex.errno not in [10038, 10054, 107, 57, 49, 9]: + raise + continue + + if self.args.log_conn: + m = "|{}C-acc2 \033[0;36m{} \033[3{}m{}".format( + "-" * 3, ip, port % 8, port + ) + self.log("%s %s" % addr, m, c="1;30") + + self.accept(sck, addr) + def accept(self, sck, addr): """takes an incoming tcp connection and creates a thread to handle it""" - if self.args.log_conn: - self.log("%s %s" % addr, "|%sC-cthr" % ("-" * 5,), c="1;30") - now = time.time() if self.tp_time and now - self.tp_time > 300: @@ -167,6 +202,13 @@ class HttpSrv(object): return len(self.clients) def shutdown(self): + self.stopping = True + for srv in self.srvs: + try: + srv.close() + except: + pass + clients = list(self.clients.keys()) for cli in clients: try: @@ -184,25 +226,15 @@ class HttpSrv(object): with self.mutex: self.clients[cli] = 0 - if self.is_mp: - self.workload += 50 - if not self.workload_thr_alive: - self.workload_thr_alive = True - thr = threading.Thread( - target=self.thr_workload, name="httpsrv-workload" - ) - thr.daemon = True - thr.start() - fno = sck.fileno() try: if self.args.log_conn: - self.log("%s %s" % addr, "|%sC-crun" % ("-" * 6,), c="1;30") + self.log("%s %s" % addr, "|%sC-crun" % ("-" * 4,), c="1;30") cli.run() except (OSError, socket.error) as ex: - if ex.errno not in [10038, 10054, 107, 57, 9]: + if ex.errno not in [10038, 10054, 107, 57, 49, 9]: self.log( "%s %s" % addr, "run({}): {}".format(fno, ex), @@ -212,7 +244,7 @@ class HttpSrv(object): finally: sck = cli.s if self.args.log_conn: - self.log("%s %s" % addr, "|%sC-cdone" % ("-" * 7,), c="1;30") + self.log("%s %s" % addr, "|%sC-cdone" % ("-" * 5,), c="1;30") try: fno = sck.fileno() @@ -237,35 +269,6 @@ class HttpSrv(object): with self.mutex: del self.clients[cli] - if self.disconnect_func: - self.disconnect_func(addr) # pylint: disable=not-callable - - def thr_workload(self): - """indicates the python interpreter workload caused by this HttpSrv""" - # avoid locking in extract_filedata by tracking difference here - while True: - time.sleep(0.2) - with self.mutex: - if not self.clients: - # no clients rn, termiante thread - self.workload_thr_alive = False - self.workload = 0 - return - - total = 0 - with self.mutex: - for cli in self.clients.keys(): - now = cli.workload - delta = now - self.clients[cli] - if delta < 0: - # was reset in HttpCli to prevent overflow - delta = now - - total += delta - self.clients[cli] = now - - self.workload = total - def cachebuster(self): if time.time() - self.cb_ts < 1: return self.cb_v diff --git a/copyparty/svchub.py b/copyparty/svchub.py index 11934c5f..c72f1fac 100644 --- a/copyparty/svchub.py +++ b/copyparty/svchub.py @@ -222,16 +222,13 @@ class SvcHub(object): vmin = sys.version_info[1] if WINDOWS: msg = "need python 3.3 or newer for multiprocessing;" - if PY2: - # py2 pickler doesn't support winsock - return msg - elif vmin < 3: + if PY2 or vmin < 3: return msg elif MACOS: return "multiprocessing is wonky on mac osx;" else: - msg = "need python 2.7 or 3.3+ for multiprocessing;" - if not PY2 and vmin < 3: + msg = "need python 3.3+ for multiprocessing;" + if PY2 or vmin < 3: return msg try: diff --git a/copyparty/tcpsrv.py b/copyparty/tcpsrv.py index 0ae1b483..c3c3bbca 100644 --- a/copyparty/tcpsrv.py +++ b/copyparty/tcpsrv.py @@ -2,9 +2,7 @@ from __future__ import print_function, unicode_literals import re -import time import socket -import select from .util import chkcmd, Counter @@ -66,47 +64,13 @@ class TcpSrv(object): for srv in self.srv: srv.listen(self.args.nc) ip, port = srv.getsockname() - msg = "listening @ {0}:{1}".format(ip, port) + fno = srv.fileno() + msg = "listening @ {}:{} f{}".format(ip, port, fno) self.log("tcpsrv", msg) if self.args.q: print(msg) - while not self.stopping: - if self.args.log_conn: - self.log("tcpsrv", "|%sC-ncli" % ("-" * 1,), c="1;30") - - if self.num_clients.v >= self.args.nc: - time.sleep(0.1) - continue - - if self.args.log_conn: - self.log("tcpsrv", "|%sC-acc1" % ("-" * 2,), c="1;30") - - try: - # macos throws bad-fd - ready, _, _ = select.select(self.srv, [], []) - except: - ready = [] - if not self.stopping: - raise - - for srv in ready: - if self.stopping: - break - - sck, addr = srv.accept() - sip, sport = srv.getsockname() - if self.args.log_conn: - self.log( - "%s %s" % addr, - "|{}C-acc2 \033[0;36m{} \033[3{}m{}".format( - "-" * 3, sip, sport % 8, sport - ), - c="1;30", - ) - - self.num_clients.add() - self.hub.broker.put(False, "httpconn", sck, addr) + self.hub.broker.put(False, "listen", srv) def shutdown(self): self.stopping = True diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 6f749d7b..bbcf87eb 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -103,13 +103,15 @@ class Up2k(object): self.deferred_init() else: t = threading.Thread( - target=self.deferred_init, - name="up2k-deferred-init", + target=self.deferred_init, name="up2k-deferred-init", args=(0.5,) ) t.daemon = True t.start() - def deferred_init(self): + def deferred_init(self, wait=0): + if wait: + time.sleep(wait) + all_vols = self.asrv.vfs.all_vols have_e2d = self.init_indexes(all_vols) diff --git a/copyparty/util.py b/copyparty/util.py index 7d9445b6..86eaf596 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -918,16 +918,10 @@ def yieldfile(fn): yield buf -def hashcopy(actor, fin, fout): - is_mp = actor.is_mp +def hashcopy(fin, fout): hashobj = hashlib.sha512() tlen = 0 for buf in fin: - if is_mp: - actor.workload += 1 - if actor.workload > 2 ** 31: - actor.workload = 100 - tlen += len(buf) hashobj.update(buf) fout.write(buf) @@ -938,15 +932,10 @@ def hashcopy(actor, fin, fout): return tlen, hashobj.hexdigest(), digest_b64 -def sendfile_py(lower, upper, f, s, actor=None): +def sendfile_py(lower, upper, f, s): remains = upper - lower f.seek(lower) while remains > 0: - if actor: - actor.workload += 1 - if actor.workload > 2 ** 31: - actor.workload = 100 - # time.sleep(0.01) buf = f.read(min(1024 * 32, remains)) if not buf: