full-duplex broker for up2k-registry

This commit is contained in:
ed 2019-07-01 02:42:29 +02:00
parent bebc49595d
commit 9da6a73044
15 changed files with 168 additions and 78 deletions

2
.vscode/launch.json vendored
View file

@ -11,8 +11,6 @@
"args": [
"-j",
"0",
"-nc",
"4",
//"-nw",
"-a",
"ed:wark",

View file

@ -1,4 +1,3 @@
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, unicode_literals

View file

@ -1,4 +1,3 @@
#!/usr/bin/env python
# coding: utf-8
VERSION = (0, 1, 0)

View file

@ -1,4 +1,3 @@
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, unicode_literals

View file

@ -1,13 +1,12 @@
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, unicode_literals
import time
import threading
import multiprocessing as mp
from .__init__ import PY2, WINDOWS
from .broker_mpw import MpWorker
from .util import mp
if PY2 and not WINDOWS:
@ -23,9 +22,10 @@ class BrokerMp(object):
self.log = hub.log
self.args = hub.args
self.mutex = threading.Lock()
self.procs = []
self.retpend = {}
self.retpend_mutex = threading.Lock()
self.mutex = threading.Lock()
cores = self.args.j
if cores is None:
@ -58,7 +58,7 @@ class BrokerMp(object):
def shutdown(self):
self.log("broker", "shutting down")
for proc in self.procs:
thr = threading.Thread(target=proc.q_pend.put(["shutdown"]))
thr = threading.Thread(target=proc.q_pend.put([0, "shutdown", []]))
thr.start()
with self.mutex:
@ -73,19 +73,20 @@ class BrokerMp(object):
procs.pop()
def collector(self, proc):
"""receive message from hub in other process"""
while True:
msg = proc.q_yield.get()
k = msg[0]
retq_id, dest, args = msg
if k == "log":
self.log(*msg[1:])
if dest == "log":
self.log(*args)
elif k == "workload":
elif dest == "workload":
with self.mutex:
proc.workload = msg[1]
proc.workload = args[0]
elif k == "httpdrop":
addr = msg[1]
elif dest == "httpdrop":
addr = args[0]
with self.mutex:
del proc.clients[addr]
@ -94,8 +95,32 @@ class BrokerMp(object):
self.hub.tcpsrv.num_clients.add(-1)
def put(self, retq, act, *args):
if act == "httpconn":
elif dest == "retq":
# response from previous ipc call
with self.retpend_mutex:
retq = self.retpend.pop(retq_id)
retq.put(args)
else:
# new ipc invoking managed service in hub
obj = self.hub
for node in dest.split("."):
obj = getattr(obj, node)
# TODO will deadlock if dest performs another ipc
rv = obj(*args)
if retq_id:
proc.q_pend.put([retq_id, "retq", rv])
def put(self, want_retval, dest, *args):
"""
send message to non-hub component in other process,
returns a Queue object which eventually contains the response if want_retval
(not-impl here since nothing uses it yet)
"""
if dest == "httpconn":
sck, addr = args
sck2 = sck
if PY2:
@ -104,13 +129,14 @@ class BrokerMp(object):
sck2 = buf.getvalue()
proc = sorted(self.procs, key=lambda x: x.workload)[0]
proc.q_pend.put(["httpconn", sck2, addr])
proc.q_pend.put([0, dest, [sck2, addr]])
with self.mutex:
proc.clients[addr] = 50
proc.workload += 50
else:
raise Exception("what is " + str(act))
raise Exception("what is " + str(dest))
def debug_load_balancer(self):
last = ""

View file

@ -1,4 +1,3 @@
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, unicode_literals
@ -9,6 +8,7 @@ import threading
from .__init__ import PY2, WINDOWS
from .httpsrv import HttpSrv
from .util import Queue
if PY2 and not WINDOWS:
import pickle # nosec
@ -23,6 +23,8 @@ class MpWorker(object):
self.args = args
self.n = n
self.retpend = {}
self.retpend_mutex = threading.Lock()
self.mutex = threading.Lock()
self.workload_thr_active = False
@ -30,7 +32,8 @@ class MpWorker(object):
# replace it with something harmless
signal.signal(signal.SIGINT, self.signal_handler)
self.httpsrv = HttpSrv(self.args, self.log)
# instantiate all services here (TODO: inheritance?)
self.httpsrv = HttpSrv(self)
self.httpsrv.disconnect_func = self.httpdrop
# on winxp and some other platforms,
@ -45,31 +48,31 @@ class MpWorker(object):
pass
def log(self, src, msg):
self.q_yield.put(["log", src, msg])
self.q_yield.put([0, "log", [src, msg]])
def logw(self, msg):
self.log("mp{}".format(self.n), msg)
def httpdrop(self, addr):
self.q_yield.put(["httpdrop", addr])
self.q_yield.put([0, "httpdrop", [addr]])
def main(self):
while True:
d = self.q_pend.get()
retq_id, dest, args = self.q_pend.get()
# self.logw("work: [{}]".format(d[0]))
if d[0] == "shutdown":
if dest == "shutdown":
self.logw("ok bye")
sys.exit(0)
return
elif d[0] == "httpconn":
sck = d[1]
elif dest == "httpconn":
sck, addr = args
if PY2:
sck = pickle.loads(sck) # nosec
self.log(str(d[2]), "-" * 4 + "C-qpop")
self.httpsrv.accept(sck, d[2])
self.log(str(addr), "-" * 4 + "C-qpop")
self.httpsrv.accept(sck, addr)
with self.mutex:
if not self.workload_thr_active:
@ -78,8 +81,28 @@ class MpWorker(object):
thr.daemon = True
thr.start()
elif dest == "retq":
# response from previous ipc call
with self.retpend_mutex:
retq = self.retpend.pop(retq_id)
retq.put(args)
else:
raise Exception("what is " + str(d[0]))
raise Exception("what is " + str(dest))
def put(self, want_retval, dest, *args):
if want_retval:
retq = Queue(1)
retq_id = id(retq)
with self.retpend_mutex:
self.retpend[retq_id] = retq
else:
retq = None
retq_id = 0
self.q_yield.put([retq_id, dest, args])
return retq
def thr_workload(self):
"""announce workloads to MpSrv (the mp controller / loadbalancer)"""
@ -92,4 +115,4 @@ class MpWorker(object):
self.workload_thr_alive = False
return
self.q_yield.put(["workload", self.httpsrv.workload])
self.q_yield.put([0, "workload", [self.httpsrv.workload]])

View file

@ -4,6 +4,7 @@ from __future__ import print_function, unicode_literals
import threading
from .util import Queue
from .httpsrv import HttpSrv
@ -17,21 +18,36 @@ class BrokerThr(object):
self.mutex = threading.Lock()
self.httpsrv = HttpSrv(self.args, self.log)
# instantiate all services here (TODO: inheritance?)
self.httpsrv = HttpSrv(self)
self.httpsrv.disconnect_func = self.httpdrop
def shutdown(self):
# self.log("broker", "shutting down")
pass
def put(self, retq, act, *args):
if act == "httpconn":
def put(self, want_retval, dest, *args):
if dest == "httpconn":
sck, addr = args
self.log(str(addr), "-" * 4 + "C-qpop")
self.httpsrv.accept(sck, addr)
else:
raise Exception("what is " + str(act))
# new ipc invoking managed service in hub
obj = self.hub
for node in dest.split("."):
obj = getattr(obj, node)
# TODO will deadlock if dest performs another ipc
rv = obj(*args)
if not want_retval:
return
# pretend we're broker_mp
retq = Queue(1)
retq.put(rv)
return retq
def httpdrop(self, addr):
self.hub.tcpsrv.num_clients.add(-1)

View file

@ -1,4 +1,3 @@
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, unicode_literals
@ -248,7 +247,12 @@ class HttpCli(object):
except:
raise Pebkac(422, "you POSTed invalid json")
print(body)
# \suger0r/
x = self.conn.hsrv.broker.put(True, "up2k._get_wark", body)
wark = x.get()
msg = '{{ "wark": "{}" }}'.format(wark)
self.log(msg)
self.reply(msg.encode("utf-8"), headers=["Content-Type: application/json"])
def handle_post_binary(self):
raise Exception("todo")

View file

@ -1,4 +1,3 @@
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, unicode_literals
@ -18,15 +17,17 @@ class HttpConn(object):
creates an HttpCli for each request (Connection: Keep-Alive)
"""
def __init__(self, sck, addr, args, auth, log_func, cert_path):
def __init__(self, sck, addr, hsrv):
self.s = sck
self.addr = addr
self.args = args
self.auth = auth
self.cert_path = cert_path
self.hsrv = hsrv
self.args = hsrv.args
self.auth = hsrv.auth
self.cert_path = hsrv.cert_path
self.workload = 0
self.log_func = log_func
self.log_func = hsrv.log
self.log_src = "{} \033[36m{}".format(addr[0], addr[1]).ljust(26)
env = jinja2.Environment()

View file

@ -1,4 +1,3 @@
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, unicode_literals
@ -17,9 +16,10 @@ class HttpSrv(object):
relying on MpSrv for performance (HttpSrv is just plain threads)
"""
def __init__(self, args, log_func):
self.log = log_func
self.args = args
def __init__(self, broker):
self.broker = broker
self.args = broker.args
self.log = broker.log
self.disconnect_func = None
self.mutex = threading.Lock()
@ -27,7 +27,7 @@ class HttpSrv(object):
self.clients = {}
self.workload = 0
self.workload_thr_alive = False
self.auth = AuthSrv(args, log_func)
self.auth = AuthSrv(self.args, self.log)
cert_path = os.path.join(E.cfg, "cert.pem")
if os.path.exists(cert_path):
@ -38,7 +38,7 @@ class HttpSrv(object):
def accept(self, sck, addr):
"""takes an incoming tcp connection and creates a thread to handle it"""
self.log(str(addr), "-" * 5 + "C-cthr")
thr = threading.Thread(target=self.thr_client, args=(sck, addr, self.log))
thr = threading.Thread(target=self.thr_client, args=(sck, addr))
thr.daemon = True
thr.start()
@ -49,9 +49,9 @@ class HttpSrv(object):
def shutdown(self):
print("ok bye")
def thr_client(self, sck, addr, log):
def thr_client(self, sck, addr):
"""thread managing one tcp client"""
cli = HttpConn(sck, addr, self.args, self.auth, log, self.cert_path)
cli = HttpConn(sck, addr, self)
with self.mutex:
self.clients[cli] = 0
self.workload += 50

View file

@ -1,26 +1,27 @@
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, unicode_literals
import sys
import time
import threading
import multiprocessing as mp
from datetime import datetime, timedelta
import calendar
from .__init__ import PY2, WINDOWS
from .tcpsrv import TcpSrv
from .up2k import Up2k
from .util import mp
class SvcHub(object):
"""
Hosts all services which cannot be parallelized due to reliance on monolithic resources.
Creates a Broker which does most of the heavy stuff; hosted services can use this to perform work:
hub.broker.put(retq, action, arg1, argN).
hub.broker.put(want_reply, destination, args_list).
Either BrokerThr (plain threads) or BrokerMP (multiprocessing) is used depending on configuration.
To receive any output returned by action, provide a queue-object for retq, else None.
Nothing is returned synchronously; if you want any value returned from the call,
put() can return a queue (if want_reply=True) which has a blocking get() with the response.
"""
def __init__(self, args):
@ -31,6 +32,7 @@ class SvcHub(object):
# initiate all services to manage
self.tcpsrv = TcpSrv(self)
self.up2k = Up2k(self)
# decide which worker impl to use
if self.check_mp_enable():

View file

@ -1,4 +1,3 @@
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, unicode_literals
@ -68,7 +67,7 @@ class TcpSrv(object):
sck, addr = self.srv.accept()
self.log(str(addr), "-" * 3 + "C-acc2")
self.num_clients.add()
self.hub.broker.put(None, "httpconn", sck, addr)
self.hub.broker.put(False, "httpconn", sck, addr)
def shutdown(self):
self.log("tcpsrv", "ok bye")

37
copyparty/up2k.py Normal file
View file

@ -0,0 +1,37 @@
# coding: utf-8
from __future__ import print_function, unicode_literals
import re
import base64
import hashlib
from .util import Pebkac
class Up2k(object):
def __init__(self, broker):
self.broker = broker
self.args = broker.args
self.log = broker.log
self.salt = "hunter2" # TODO: config
self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$")
def _get_wark(self, j):
if len(j["name"]) > 4096 or len(j["hash"]) > 256:
raise Pebkac(400, "bad name or numchunks")
for k in j["hash"]:
if not self.r_hash.match(k):
raise Pebkac(400, "at least one bad hash")
plaintext = "\n".join([self.salt, j["name"], str(j["size"]), *j["hash"]])
hasher = hashlib.sha512()
hasher.update(plaintext.encode("utf-8"))
digest = hasher.digest()[:32]
wark = base64.urlsafe_b64encode(digest)
return wark.decode("utf-8").rstrip("=")

View file

@ -1,4 +1,3 @@
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, unicode_literals
@ -9,15 +8,20 @@ import hashlib
import threading
import subprocess as sp # nosec
# import multiprocessing.dummy as mp # noqa: F401
import multiprocessing as mp # noqa: F401
from .__init__ import PY2
from .stolen import surrogateescape
if not PY2:
from urllib.parse import unquote_to_bytes as unquote
from urllib.parse import quote_from_bytes as quote
from queue import Queue # noqa: F401
else:
from urllib import unquote # pylint: disable=no-name-in-module
from urllib import quote # pylint: disable=no-name-in-module
from Queue import Queue # pylint: disable=no-name-in-module # noqa: F401
surrogateescape.register_surrogateescape()

View file

@ -1,20 +1,3 @@
##
## current design
main
tcpsrv
httpsrv
mpsrv
mpworker
httpsrv
...
##
## things which need the broker
moving uploaded files into place
creating a thumbnail
##
## thumbnails