From 78c99e524beecfa77747e4680cae545d35791abd Mon Sep 17 00:00:00 2001 From: ed Date: Tue, 28 May 2019 19:36:42 +0000 Subject: [PATCH] clean shutdown --- copyparty/__main__.py | 10 +++++-- copyparty/httpsrv.py | 3 ++ copyparty/mpsrv.py | 64 ++++++++++++++++++++++++++++++------------- copyparty/tcpsrv.py | 34 ++++++++++------------- 4 files changed, 70 insertions(+), 41 deletions(-) diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 40bb947a..ea8a4292 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -82,15 +82,19 @@ def main(): ap.add_argument("-nw", action="store_true", help="DEBUG: disable writing") al = ap.parse_args() - thr = threading.Thread(target=TcpSrv, args=(al,)) + tcpsrv = TcpSrv(al) + thr = threading.Thread(target=tcpsrv.run) thr.daemon = True thr.start() + # winxp/py2.7 support: thr.join() kills signals try: while True: time.sleep(9001) - except: - print("bye") + except KeyboardInterrupt: + print("OPYTHAT") + tcpsrv.shutdown() + print("nailed it") if __name__ == "__main__": diff --git a/copyparty/httpsrv.py b/copyparty/httpsrv.py index b696a10f..4d470e2c 100644 --- a/copyparty/httpsrv.py +++ b/copyparty/httpsrv.py @@ -34,6 +34,9 @@ class HttpSrv(object): with self.mutex: return len(self.clients) + def shutdown(self): + print("ok bye") + def thr_client(self, sck, addr, log): """thread managing one tcp client""" try: diff --git a/copyparty/mpsrv.py b/copyparty/mpsrv.py index cec7eb3e..73aaa17e 100644 --- a/copyparty/mpsrv.py +++ b/copyparty/mpsrv.py @@ -3,18 +3,17 @@ from __future__ import print_function import time +import signal import threading import multiprocessing as mp -from multiprocessing.reduction import ForkingPickler -import pickle from .__init__ import * from .httpsrv import * -if PY2: +if PY2 and not WINDOWS: + from multiprocessing.reduction import ForkingPickler from StringIO import StringIO as MemesIO -else: - from io import BytesIO as MemesIO + import pickle class MpWorker(object): @@ -33,15 +32,20 @@ class MpWorker(object): self.mutex = threading.Lock() self.workload_thr_active = False + # we inherited signal_handler from parent, + # replace it with something harmless + signal.signal(signal.SIGINT, self.signal_handler) + + # on winxp and some other platforms, + # use thr.join() to block all signals thr = threading.Thread(target=self.main) thr.daemon = True thr.start() + thr.join() - try: - while True: - time.sleep(9001) - except: - self.logw("bye") + def signal_handler(self, signal, frame): + # print('k') + pass def log(self, src, msg): self.q_yield.put(["log", src, msg]) @@ -57,16 +61,18 @@ class MpWorker(object): self.httpsrv.disconnect_func = self.disconnect_cb while True: - self.logw("awaiting work") d = self.q_pend.get() - self.logw("work: [{}]".format(d[0])) - if d[0] == "terminate": - self.logw("bye") + # self.logw("work: [{}]".format(d[0])) + if d[0] == "shutdown": + self.logw("ok bye") sys.exit(0) return - sck = pickle.loads(d[1]) + sck = d[1] + if PY2: + sck = pickle.loads(sck) + self.httpsrv.accept(sck, d[2]) with self.mutex: @@ -138,6 +144,23 @@ class MpSrv(object): with self.mutex: return sum(len(x.clients) for x in self.procs) + def shutdown(self): + self.log("mpsrv", "shutting down") + for proc in self.procs: + thr = threading.Thread(target=proc.q_pend.put(["shutdown"])) + thr.start() + + with self.mutex: + procs = self.procs + self.procs = [] + + while procs: + if procs[-1].is_alive(): + time.sleep(0.1) + continue + + procs.pop() + def collector(self, proc): while True: msg = proc.q_yield.get() @@ -164,10 +187,13 @@ class MpSrv(object): def accept(self, sck, addr): proc = sorted(self.procs, key=lambda x: x.workload)[0] - # can't put unpickled sockets <3.4 - buf = MemesIO() - ForkingPickler(buf).dump(sck) - proc.q_pend.put(["socket", buf.getvalue(), addr]) + sck2 = sck + if PY2: + buf = MemesIO() + ForkingPickler(buf).dump(sck) + sck2 = buf.getvalue() + + proc.q_pend.put(["socket", sck2, addr]) with self.mutex: proc.clients[addr] = 50 diff --git a/copyparty/tcpsrv.py b/copyparty/tcpsrv.py index 71089dad..ebcbaacc 100644 --- a/copyparty/tcpsrv.py +++ b/copyparty/tcpsrv.py @@ -9,7 +9,6 @@ from datetime import datetime, timedelta import calendar from .__init__ import * -from .msgsvc import * class TcpSrv(object): @@ -23,11 +22,11 @@ class TcpSrv(object): self.args = args self.log_mutex = threading.Lock() - self.msgsvc = MsgSvc(self.log) self.next_day = 0 - bind_ip = args.i - bind_port = args.p + def run(self): + bind_ip = self.args.i + bind_port = self.args.p ip = "127.0.0.1" if bind_ip != ip: @@ -44,37 +43,34 @@ class TcpSrv(object): srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) srv.bind((bind_ip, bind_port)) - srv.listen(100) + srv.listen(self.args.nc) self.log("root", "listening @ {0}:{1}".format(bind_ip, bind_port)) - httpsrv = self.create_server() + self.httpsrv = self.create_server() while True: - if httpsrv.num_clients() >= args.nc: + if self.httpsrv.num_clients() >= self.args.nc: time.sleep(0.1) continue sck, addr = srv.accept() - httpsrv.accept(sck, addr) + self.httpsrv.accept(sck, addr) + + def shutdown(self): + self.httpsrv.shutdown() def check_mp_support(self): vmin = sys.version_info[1] if WINDOWS: if PY2: - # ForkingPickler doesn't support winsock + # py2 pickler doesn't support winsock return False - elif vmin < 4: + elif vmin < 3: return False else: - if not PY2 and vmin < 4: + if not PY2 and vmin < 3: return False - try: - # fails on py3.3, works on py2.7 - from multiprocessing.reduction import ForkingPickler - except: - return False - return True def create_server(self): @@ -84,9 +80,9 @@ class TcpSrv(object): if not self.check_mp_support(): if WINDOWS: - self.log("root", "need python 3.4 or newer for multiprocessing;") + self.log("root", "need python 3.3 or newer for multiprocessing;") else: - self.log("root", "need python 2.7 or 3.4+ for multiprocessing;") + self.log("root", "need python 2.7 or 3.3+ for multiprocessing;") return self.create_threading_server()