From 9da6a7304490225d78f2c441554efd2b38e26796 Mon Sep 17 00:00:00 2001 From: ed Date: Mon, 1 Jul 2019 02:42:29 +0200 Subject: [PATCH] full-duplex broker for up2k-registry --- .vscode/launch.json | 2 -- copyparty/__init__.py | 1 - copyparty/__version__.py | 1 - copyparty/authsrv.py | 1 - copyparty/broker_mp.py | 58 +++++++++++++++++++++++++++++----------- copyparty/broker_mpw.py | 47 +++++++++++++++++++++++--------- copyparty/broker_thr.py | 24 ++++++++++++++--- copyparty/httpcli.py | 8 ++++-- copyparty/httpconn.py | 13 ++++----- copyparty/httpsrv.py | 16 +++++------ copyparty/svchub.py | 12 +++++---- copyparty/tcpsrv.py | 3 +-- copyparty/up2k.py | 37 +++++++++++++++++++++++++ copyparty/util.py | 6 ++++- docs/design.txt | 17 ------------ 15 files changed, 168 insertions(+), 78 deletions(-) create mode 100644 copyparty/up2k.py diff --git a/.vscode/launch.json b/.vscode/launch.json index f4104c1f..f5ec519b 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -11,8 +11,6 @@ "args": [ "-j", "0", - "-nc", - "4", //"-nw", "-a", "ed:wark", diff --git a/copyparty/__init__.py b/copyparty/__init__.py index c351fad3..ed021649 100644 --- a/copyparty/__init__.py +++ b/copyparty/__init__.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # coding: utf-8 from __future__ import print_function, unicode_literals diff --git a/copyparty/__version__.py b/copyparty/__version__.py index f4fa394e..2364f94a 100644 --- a/copyparty/__version__.py +++ b/copyparty/__version__.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # coding: utf-8 VERSION = (0, 1, 0) diff --git a/copyparty/authsrv.py b/copyparty/authsrv.py index c3408fbb..28f9d2b8 100644 --- a/copyparty/authsrv.py +++ b/copyparty/authsrv.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # coding: utf-8 from __future__ import print_function, unicode_literals diff --git a/copyparty/broker_mp.py b/copyparty/broker_mp.py index d8278b10..5b73f3d5 100644 --- a/copyparty/broker_mp.py +++ b/copyparty/broker_mp.py @@ -1,13 +1,12 @@ -#!/usr/bin/env python # coding: utf-8 from __future__ import print_function, unicode_literals import time import threading -import multiprocessing as mp from .__init__ import PY2, WINDOWS from .broker_mpw import MpWorker +from .util import mp if PY2 and not WINDOWS: @@ -23,9 +22,10 @@ class BrokerMp(object): self.log = hub.log self.args = hub.args - self.mutex = threading.Lock() - self.procs = [] + self.retpend = {} + self.retpend_mutex = threading.Lock() + self.mutex = threading.Lock() cores = self.args.j if cores is None: @@ -58,7 +58,7 @@ class BrokerMp(object): def shutdown(self): self.log("broker", "shutting down") for proc in self.procs: - thr = threading.Thread(target=proc.q_pend.put(["shutdown"])) + thr = threading.Thread(target=proc.q_pend.put([0, "shutdown", []])) thr.start() with self.mutex: @@ -73,19 +73,20 @@ class BrokerMp(object): procs.pop() def collector(self, proc): + """receive message from hub in other process""" while True: msg = proc.q_yield.get() - k = msg[0] + retq_id, dest, args = msg - if k == "log": - self.log(*msg[1:]) + if dest == "log": + self.log(*args) - elif k == "workload": + elif dest == "workload": with self.mutex: - proc.workload = msg[1] + proc.workload = args[0] - elif k == "httpdrop": - addr = msg[1] + elif dest == "httpdrop": + addr = args[0] with self.mutex: del proc.clients[addr] @@ -94,8 +95,32 @@ class BrokerMp(object): self.hub.tcpsrv.num_clients.add(-1) - def put(self, retq, act, *args): - if act == "httpconn": + elif dest == "retq": + # response from previous ipc call + with self.retpend_mutex: + retq = self.retpend.pop(retq_id) + + retq.put(args) + + else: + # new ipc invoking managed service in hub + obj = self.hub + for node in dest.split("."): + obj = getattr(obj, node) + + # TODO will deadlock if dest performs another ipc + rv = obj(*args) + + if retq_id: + proc.q_pend.put([retq_id, "retq", rv]) + + def put(self, want_retval, dest, *args): + """ + send message to non-hub component in other process, + returns a Queue object which eventually contains the response if want_retval + (not-impl here since nothing uses it yet) + """ + if dest == "httpconn": sck, addr = args sck2 = sck if PY2: @@ -104,13 +129,14 @@ class BrokerMp(object): sck2 = buf.getvalue() proc = sorted(self.procs, key=lambda x: x.workload)[0] - proc.q_pend.put(["httpconn", sck2, addr]) + proc.q_pend.put([0, dest, [sck2, addr]]) with self.mutex: proc.clients[addr] = 50 proc.workload += 50 + else: - raise Exception("what is " + str(act)) + raise Exception("what is " + str(dest)) def debug_load_balancer(self): last = "" diff --git a/copyparty/broker_mpw.py b/copyparty/broker_mpw.py index b69b579f..821b65e4 100644 --- a/copyparty/broker_mpw.py +++ b/copyparty/broker_mpw.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # coding: utf-8 from __future__ import print_function, unicode_literals @@ -9,6 +8,7 @@ import threading from .__init__ import PY2, WINDOWS from .httpsrv import HttpSrv +from .util import Queue if PY2 and not WINDOWS: import pickle # nosec @@ -23,6 +23,8 @@ class MpWorker(object): self.args = args self.n = n + self.retpend = {} + self.retpend_mutex = threading.Lock() self.mutex = threading.Lock() self.workload_thr_active = False @@ -30,7 +32,8 @@ class MpWorker(object): # replace it with something harmless signal.signal(signal.SIGINT, self.signal_handler) - self.httpsrv = HttpSrv(self.args, self.log) + # instantiate all services here (TODO: inheritance?) + self.httpsrv = HttpSrv(self) self.httpsrv.disconnect_func = self.httpdrop # on winxp and some other platforms, @@ -45,31 +48,31 @@ class MpWorker(object): pass def log(self, src, msg): - self.q_yield.put(["log", src, msg]) + self.q_yield.put([0, "log", [src, msg]]) def logw(self, msg): self.log("mp{}".format(self.n), msg) def httpdrop(self, addr): - self.q_yield.put(["httpdrop", addr]) + self.q_yield.put([0, "httpdrop", [addr]]) def main(self): while True: - d = self.q_pend.get() + retq_id, dest, args = self.q_pend.get() # self.logw("work: [{}]".format(d[0])) - if d[0] == "shutdown": + if dest == "shutdown": self.logw("ok bye") sys.exit(0) return - elif d[0] == "httpconn": - sck = d[1] + elif dest == "httpconn": + sck, addr = args if PY2: sck = pickle.loads(sck) # nosec - self.log(str(d[2]), "-" * 4 + "C-qpop") - self.httpsrv.accept(sck, d[2]) + self.log(str(addr), "-" * 4 + "C-qpop") + self.httpsrv.accept(sck, addr) with self.mutex: if not self.workload_thr_active: @@ -78,8 +81,28 @@ class MpWorker(object): thr.daemon = True thr.start() + elif dest == "retq": + # response from previous ipc call + with self.retpend_mutex: + retq = self.retpend.pop(retq_id) + + retq.put(args) + else: - raise Exception("what is " + str(d[0])) + raise Exception("what is " + str(dest)) + + def put(self, want_retval, dest, *args): + if want_retval: + retq = Queue(1) + retq_id = id(retq) + with self.retpend_mutex: + self.retpend[retq_id] = retq + else: + retq = None + retq_id = 0 + + self.q_yield.put([retq_id, dest, args]) + return retq def thr_workload(self): """announce workloads to MpSrv (the mp controller / loadbalancer)""" @@ -92,4 +115,4 @@ class MpWorker(object): self.workload_thr_alive = False return - self.q_yield.put(["workload", self.httpsrv.workload]) + self.q_yield.put([0, "workload", [self.httpsrv.workload]]) diff --git a/copyparty/broker_thr.py b/copyparty/broker_thr.py index b6528929..665ed5cb 100644 --- a/copyparty/broker_thr.py +++ b/copyparty/broker_thr.py @@ -4,6 +4,7 @@ from __future__ import print_function, unicode_literals import threading +from .util import Queue from .httpsrv import HttpSrv @@ -17,21 +18,36 @@ class BrokerThr(object): self.mutex = threading.Lock() - self.httpsrv = HttpSrv(self.args, self.log) + # instantiate all services here (TODO: inheritance?) + self.httpsrv = HttpSrv(self) self.httpsrv.disconnect_func = self.httpdrop def shutdown(self): # self.log("broker", "shutting down") pass - def put(self, retq, act, *args): - if act == "httpconn": + def put(self, want_retval, dest, *args): + if dest == "httpconn": sck, addr = args self.log(str(addr), "-" * 4 + "C-qpop") self.httpsrv.accept(sck, addr) else: - raise Exception("what is " + str(act)) + # new ipc invoking managed service in hub + obj = self.hub + for node in dest.split("."): + obj = getattr(obj, node) + + # TODO will deadlock if dest performs another ipc + rv = obj(*args) + + if not want_retval: + return + + # pretend we're broker_mp + retq = Queue(1) + retq.put(rv) + return retq def httpdrop(self, addr): self.hub.tcpsrv.num_clients.add(-1) diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 32e0705d..40aafe25 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # coding: utf-8 from __future__ import print_function, unicode_literals @@ -248,7 +247,12 @@ class HttpCli(object): except: raise Pebkac(422, "you POSTed invalid json") - print(body) + # \suger0r/ + x = self.conn.hsrv.broker.put(True, "up2k._get_wark", body) + wark = x.get() + msg = '{{ "wark": "{}" }}'.format(wark) + self.log(msg) + self.reply(msg.encode("utf-8"), headers=["Content-Type: application/json"]) def handle_post_binary(self): raise Exception("todo") diff --git a/copyparty/httpconn.py b/copyparty/httpconn.py index 790aaa35..03c9745a 100644 --- a/copyparty/httpconn.py +++ b/copyparty/httpconn.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # coding: utf-8 from __future__ import print_function, unicode_literals @@ -18,15 +17,17 @@ class HttpConn(object): creates an HttpCli for each request (Connection: Keep-Alive) """ - def __init__(self, sck, addr, args, auth, log_func, cert_path): + def __init__(self, sck, addr, hsrv): self.s = sck self.addr = addr - self.args = args - self.auth = auth - self.cert_path = cert_path + self.hsrv = hsrv + + self.args = hsrv.args + self.auth = hsrv.auth + self.cert_path = hsrv.cert_path self.workload = 0 - self.log_func = log_func + self.log_func = hsrv.log self.log_src = "{} \033[36m{}".format(addr[0], addr[1]).ljust(26) env = jinja2.Environment() diff --git a/copyparty/httpsrv.py b/copyparty/httpsrv.py index 667e4b95..1cab3ede 100644 --- a/copyparty/httpsrv.py +++ b/copyparty/httpsrv.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # coding: utf-8 from __future__ import print_function, unicode_literals @@ -17,9 +16,10 @@ class HttpSrv(object): relying on MpSrv for performance (HttpSrv is just plain threads) """ - def __init__(self, args, log_func): - self.log = log_func - self.args = args + def __init__(self, broker): + self.broker = broker + self.args = broker.args + self.log = broker.log self.disconnect_func = None self.mutex = threading.Lock() @@ -27,7 +27,7 @@ class HttpSrv(object): self.clients = {} self.workload = 0 self.workload_thr_alive = False - self.auth = AuthSrv(args, log_func) + self.auth = AuthSrv(self.args, self.log) cert_path = os.path.join(E.cfg, "cert.pem") if os.path.exists(cert_path): @@ -38,7 +38,7 @@ class HttpSrv(object): def accept(self, sck, addr): """takes an incoming tcp connection and creates a thread to handle it""" self.log(str(addr), "-" * 5 + "C-cthr") - thr = threading.Thread(target=self.thr_client, args=(sck, addr, self.log)) + thr = threading.Thread(target=self.thr_client, args=(sck, addr)) thr.daemon = True thr.start() @@ -49,9 +49,9 @@ class HttpSrv(object): def shutdown(self): print("ok bye") - def thr_client(self, sck, addr, log): + def thr_client(self, sck, addr): """thread managing one tcp client""" - cli = HttpConn(sck, addr, self.args, self.auth, log, self.cert_path) + cli = HttpConn(sck, addr, self) with self.mutex: self.clients[cli] = 0 self.workload += 50 diff --git a/copyparty/svchub.py b/copyparty/svchub.py index af3c7a30..c2a83b49 100644 --- a/copyparty/svchub.py +++ b/copyparty/svchub.py @@ -1,26 +1,27 @@ -#!/usr/bin/env python # coding: utf-8 from __future__ import print_function, unicode_literals import sys import time import threading -import multiprocessing as mp from datetime import datetime, timedelta import calendar from .__init__ import PY2, WINDOWS from .tcpsrv import TcpSrv +from .up2k import Up2k +from .util import mp class SvcHub(object): """ Hosts all services which cannot be parallelized due to reliance on monolithic resources. Creates a Broker which does most of the heavy stuff; hosted services can use this to perform work: - hub.broker.put(retq, action, arg1, argN). - + hub.broker.put(want_reply, destination, args_list). + Either BrokerThr (plain threads) or BrokerMP (multiprocessing) is used depending on configuration. - To receive any output returned by action, provide a queue-object for retq, else None. + Nothing is returned synchronously; if you want any value returned from the call, + put() can return a queue (if want_reply=True) which has a blocking get() with the response. """ def __init__(self, args): @@ -31,6 +32,7 @@ class SvcHub(object): # initiate all services to manage self.tcpsrv = TcpSrv(self) + self.up2k = Up2k(self) # decide which worker impl to use if self.check_mp_enable(): diff --git a/copyparty/tcpsrv.py b/copyparty/tcpsrv.py index 69198242..3a23ff48 100644 --- a/copyparty/tcpsrv.py +++ b/copyparty/tcpsrv.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # coding: utf-8 from __future__ import print_function, unicode_literals @@ -68,7 +67,7 @@ class TcpSrv(object): sck, addr = self.srv.accept() self.log(str(addr), "-" * 3 + "C-acc2") self.num_clients.add() - self.hub.broker.put(None, "httpconn", sck, addr) + self.hub.broker.put(False, "httpconn", sck, addr) def shutdown(self): self.log("tcpsrv", "ok bye") diff --git a/copyparty/up2k.py b/copyparty/up2k.py new file mode 100644 index 00000000..3aac7a5b --- /dev/null +++ b/copyparty/up2k.py @@ -0,0 +1,37 @@ +# coding: utf-8 +from __future__ import print_function, unicode_literals + + +import re +import base64 +import hashlib + +from .util import Pebkac + + +class Up2k(object): + def __init__(self, broker): + self.broker = broker + self.args = broker.args + self.log = broker.log + + self.salt = "hunter2" # TODO: config + + self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$") + + def _get_wark(self, j): + if len(j["name"]) > 4096 or len(j["hash"]) > 256: + raise Pebkac(400, "bad name or numchunks") + + for k in j["hash"]: + if not self.r_hash.match(k): + raise Pebkac(400, "at least one bad hash") + + plaintext = "\n".join([self.salt, j["name"], str(j["size"]), *j["hash"]]) + + hasher = hashlib.sha512() + hasher.update(plaintext.encode("utf-8")) + digest = hasher.digest()[:32] + + wark = base64.urlsafe_b64encode(digest) + return wark.decode("utf-8").rstrip("=") diff --git a/copyparty/util.py b/copyparty/util.py index f0773ebe..6ad1f984 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # coding: utf-8 from __future__ import print_function, unicode_literals @@ -9,15 +8,20 @@ import hashlib import threading import subprocess as sp # nosec +# import multiprocessing.dummy as mp # noqa: F401 +import multiprocessing as mp # noqa: F401 + from .__init__ import PY2 from .stolen import surrogateescape if not PY2: from urllib.parse import unquote_to_bytes as unquote from urllib.parse import quote_from_bytes as quote + from queue import Queue # noqa: F401 else: from urllib import unquote # pylint: disable=no-name-in-module from urllib import quote # pylint: disable=no-name-in-module + from Queue import Queue # pylint: disable=no-name-in-module # noqa: F401 surrogateescape.register_surrogateescape() diff --git a/docs/design.txt b/docs/design.txt index b6492b8b..e4606c10 100644 --- a/docs/design.txt +++ b/docs/design.txt @@ -1,20 +1,3 @@ -## -## current design - -main - tcpsrv - httpsrv - mpsrv - mpworker - httpsrv - ... - -## -## things which need the broker - -moving uploaded files into place -creating a thumbnail - ## ## thumbnails