mirror of
https://github.com/9001/copyparty.git
synced 2025-08-17 09:02:15 -06:00
121 lines
3.4 KiB
Python
121 lines
3.4 KiB
Python
# coding: utf-8
|
|
from __future__ import print_function, unicode_literals
|
|
|
|
import sys
|
|
import time
|
|
import signal
|
|
import threading
|
|
|
|
from .__init__ import PY2, WINDOWS
|
|
from .broker_util import ExceptionalQueue
|
|
from .httpsrv import HttpSrv
|
|
from .util import FAKE_MP
|
|
|
|
if PY2 and not WINDOWS:
|
|
import pickle # nosec
|
|
|
|
|
|
class MpWorker(object):
|
|
"""one single mp instance"""
|
|
|
|
def __init__(self, q_pend, q_yield, args, n):
|
|
self.q_pend = q_pend
|
|
self.q_yield = q_yield
|
|
self.args = args
|
|
self.n = n
|
|
|
|
self.retpend = {}
|
|
self.retpend_mutex = threading.Lock()
|
|
self.mutex = threading.Lock()
|
|
self.workload_thr_active = False
|
|
|
|
# we inherited signal_handler from parent,
|
|
# replace it with something harmless
|
|
if not FAKE_MP:
|
|
signal.signal(signal.SIGINT, self.signal_handler)
|
|
|
|
# instantiate all services here (TODO: inheritance?)
|
|
self.httpsrv = HttpSrv(self)
|
|
self.httpsrv.disconnect_func = self.httpdrop
|
|
|
|
# on winxp and some other platforms,
|
|
# use thr.join() to block all signals
|
|
thr = threading.Thread(target=self.main)
|
|
thr.daemon = True
|
|
thr.start()
|
|
thr.join()
|
|
|
|
def signal_handler(self, signal, frame):
|
|
# print('k')
|
|
pass
|
|
|
|
def log(self, src, msg):
|
|
self.q_yield.put([0, "log", [src, msg]])
|
|
|
|
def logw(self, msg):
|
|
self.log("mp{}".format(self.n), msg)
|
|
|
|
def httpdrop(self, addr):
|
|
self.q_yield.put([0, "httpdrop", [addr]])
|
|
|
|
def main(self):
|
|
while True:
|
|
retq_id, dest, args = self.q_pend.get()
|
|
|
|
# self.logw("work: [{}]".format(d[0]))
|
|
if dest == "shutdown":
|
|
self.logw("ok bye")
|
|
sys.exit(0)
|
|
return
|
|
|
|
elif dest == "httpconn":
|
|
sck, addr = args
|
|
if PY2:
|
|
sck = pickle.loads(sck) # nosec
|
|
|
|
self.log("%s %s" % addr, "-" * 4 + "C-qpop")
|
|
self.httpsrv.accept(sck, addr)
|
|
|
|
with self.mutex:
|
|
if not self.workload_thr_active:
|
|
self.workload_thr_alive = True
|
|
thr = threading.Thread(target=self.thr_workload)
|
|
thr.daemon = True
|
|
thr.start()
|
|
|
|
elif dest == "retq":
|
|
# response from previous ipc call
|
|
with self.retpend_mutex:
|
|
retq = self.retpend.pop(retq_id)
|
|
|
|
retq.put(args)
|
|
|
|
else:
|
|
raise Exception("what is " + str(dest))
|
|
|
|
def put(self, want_retval, dest, *args):
|
|
if want_retval:
|
|
retq = ExceptionalQueue(1)
|
|
retq_id = id(retq)
|
|
with self.retpend_mutex:
|
|
self.retpend[retq_id] = retq
|
|
else:
|
|
retq = None
|
|
retq_id = 0
|
|
|
|
self.q_yield.put([retq_id, dest, args])
|
|
return retq
|
|
|
|
def thr_workload(self):
|
|
"""announce workloads to MpSrv (the mp controller / loadbalancer)"""
|
|
# avoid locking in extract_filedata by tracking difference here
|
|
while True:
|
|
time.sleep(0.2)
|
|
with self.mutex:
|
|
if self.httpsrv.num_clients() == 0:
|
|
# no clients rn, termiante thread
|
|
self.workload_thr_alive = False
|
|
return
|
|
|
|
self.q_yield.put([0, "workload", [self.httpsrv.workload]])
|