clean shutdown

This commit is contained in:
ed 2019-05-28 19:36:42 +00:00
parent cbbb809c98
commit 78c99e524b
4 changed files with 70 additions and 41 deletions

View file

@ -82,15 +82,19 @@ def main():
ap.add_argument("-nw", action="store_true", help="DEBUG: disable writing") ap.add_argument("-nw", action="store_true", help="DEBUG: disable writing")
al = ap.parse_args() al = ap.parse_args()
thr = threading.Thread(target=TcpSrv, args=(al,)) tcpsrv = TcpSrv(al)
thr = threading.Thread(target=tcpsrv.run)
thr.daemon = True thr.daemon = True
thr.start() thr.start()
# winxp/py2.7 support: thr.join() kills signals
try: try:
while True: while True:
time.sleep(9001) time.sleep(9001)
except: except KeyboardInterrupt:
print("bye") print("OPYTHAT")
tcpsrv.shutdown()
print("nailed it")
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -34,6 +34,9 @@ class HttpSrv(object):
with self.mutex: with self.mutex:
return len(self.clients) return len(self.clients)
def shutdown(self):
print("ok bye")
def thr_client(self, sck, addr, log): def thr_client(self, sck, addr, log):
"""thread managing one tcp client""" """thread managing one tcp client"""
try: try:

View file

@ -3,18 +3,17 @@
from __future__ import print_function from __future__ import print_function
import time import time
import signal
import threading import threading
import multiprocessing as mp import multiprocessing as mp
from multiprocessing.reduction import ForkingPickler
import pickle
from .__init__ import * from .__init__ import *
from .httpsrv import * from .httpsrv import *
if PY2: if PY2 and not WINDOWS:
from multiprocessing.reduction import ForkingPickler
from StringIO import StringIO as MemesIO from StringIO import StringIO as MemesIO
else: import pickle
from io import BytesIO as MemesIO
class MpWorker(object): class MpWorker(object):
@ -33,15 +32,20 @@ class MpWorker(object):
self.mutex = threading.Lock() self.mutex = threading.Lock()
self.workload_thr_active = False self.workload_thr_active = False
# we inherited signal_handler from parent,
# replace it with something harmless
signal.signal(signal.SIGINT, self.signal_handler)
# on winxp and some other platforms,
# use thr.join() to block all signals
thr = threading.Thread(target=self.main) thr = threading.Thread(target=self.main)
thr.daemon = True thr.daemon = True
thr.start() thr.start()
thr.join()
try: def signal_handler(self, signal, frame):
while True: # print('k')
time.sleep(9001) pass
except:
self.logw("bye")
def log(self, src, msg): def log(self, src, msg):
self.q_yield.put(["log", src, msg]) self.q_yield.put(["log", src, msg])
@ -57,16 +61,18 @@ class MpWorker(object):
self.httpsrv.disconnect_func = self.disconnect_cb self.httpsrv.disconnect_func = self.disconnect_cb
while True: while True:
self.logw("awaiting work")
d = self.q_pend.get() d = self.q_pend.get()
self.logw("work: [{}]".format(d[0])) # self.logw("work: [{}]".format(d[0]))
if d[0] == "terminate": if d[0] == "shutdown":
self.logw("bye") self.logw("ok bye")
sys.exit(0) sys.exit(0)
return return
sck = pickle.loads(d[1]) sck = d[1]
if PY2:
sck = pickle.loads(sck)
self.httpsrv.accept(sck, d[2]) self.httpsrv.accept(sck, d[2])
with self.mutex: with self.mutex:
@ -138,6 +144,23 @@ class MpSrv(object):
with self.mutex: with self.mutex:
return sum(len(x.clients) for x in self.procs) return sum(len(x.clients) for x in self.procs)
def shutdown(self):
self.log("mpsrv", "shutting down")
for proc in self.procs:
thr = threading.Thread(target=proc.q_pend.put(["shutdown"]))
thr.start()
with self.mutex:
procs = self.procs
self.procs = []
while procs:
if procs[-1].is_alive():
time.sleep(0.1)
continue
procs.pop()
def collector(self, proc): def collector(self, proc):
while True: while True:
msg = proc.q_yield.get() msg = proc.q_yield.get()
@ -164,10 +187,13 @@ class MpSrv(object):
def accept(self, sck, addr): def accept(self, sck, addr):
proc = sorted(self.procs, key=lambda x: x.workload)[0] proc = sorted(self.procs, key=lambda x: x.workload)[0]
# can't put unpickled sockets <3.4 sck2 = sck
buf = MemesIO() if PY2:
ForkingPickler(buf).dump(sck) buf = MemesIO()
proc.q_pend.put(["socket", buf.getvalue(), addr]) ForkingPickler(buf).dump(sck)
sck2 = buf.getvalue()
proc.q_pend.put(["socket", sck2, addr])
with self.mutex: with self.mutex:
proc.clients[addr] = 50 proc.clients[addr] = 50

View file

@ -9,7 +9,6 @@ from datetime import datetime, timedelta
import calendar import calendar
from .__init__ import * from .__init__ import *
from .msgsvc import *
class TcpSrv(object): class TcpSrv(object):
@ -23,11 +22,11 @@ class TcpSrv(object):
self.args = args self.args = args
self.log_mutex = threading.Lock() self.log_mutex = threading.Lock()
self.msgsvc = MsgSvc(self.log)
self.next_day = 0 self.next_day = 0
bind_ip = args.i def run(self):
bind_port = args.p bind_ip = self.args.i
bind_port = self.args.p
ip = "127.0.0.1" ip = "127.0.0.1"
if bind_ip != ip: if bind_ip != ip:
@ -44,37 +43,34 @@ class TcpSrv(object):
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind((bind_ip, bind_port)) srv.bind((bind_ip, bind_port))
srv.listen(100) srv.listen(self.args.nc)
self.log("root", "listening @ {0}:{1}".format(bind_ip, bind_port)) self.log("root", "listening @ {0}:{1}".format(bind_ip, bind_port))
httpsrv = self.create_server() self.httpsrv = self.create_server()
while True: while True:
if httpsrv.num_clients() >= args.nc: if self.httpsrv.num_clients() >= self.args.nc:
time.sleep(0.1) time.sleep(0.1)
continue continue
sck, addr = srv.accept() sck, addr = srv.accept()
httpsrv.accept(sck, addr) self.httpsrv.accept(sck, addr)
def shutdown(self):
self.httpsrv.shutdown()
def check_mp_support(self): def check_mp_support(self):
vmin = sys.version_info[1] vmin = sys.version_info[1]
if WINDOWS: if WINDOWS:
if PY2: if PY2:
# ForkingPickler doesn't support winsock # py2 pickler doesn't support winsock
return False return False
elif vmin < 4: elif vmin < 3:
return False return False
else: else:
if not PY2 and vmin < 4: if not PY2 and vmin < 3:
return False return False
try:
# fails on py3.3, works on py2.7
from multiprocessing.reduction import ForkingPickler
except:
return False
return True return True
def create_server(self): def create_server(self):
@ -84,9 +80,9 @@ class TcpSrv(object):
if not self.check_mp_support(): if not self.check_mp_support():
if WINDOWS: if WINDOWS:
self.log("root", "need python 3.4 or newer for multiprocessing;") self.log("root", "need python 3.3 or newer for multiprocessing;")
else: else:
self.log("root", "need python 2.7 or 3.4+ for multiprocessing;") self.log("root", "need python 2.7 or 3.3+ for multiprocessing;")
return self.create_threading_server() return self.create_threading_server()