diff --git a/contrib/nginx/copyparty.conf b/contrib/nginx/copyparty.conf index d39d8453..9430d105 100644 --- a/contrib/nginx/copyparty.conf +++ b/contrib/nginx/copyparty.conf @@ -1,7 +1,15 @@ -# when running copyparty behind a reverse-proxy, -# make sure that copyparty allows at least as many clients as the proxy does, -# so run copyparty with -nc 512 if your nginx has the default limits -# (worker_processes 1, worker_connections 512) +# when running copyparty behind a reverse proxy, +# the following arguments are recommended: +# +# -nc 512 important, see next paragraph +# --http-only lower latency on initial connection +# -i 127.0.0.1 only accept connections from nginx +# +# -nc must match or exceed the webserver's max number of concurrent clients; +# nginx default is 512 (worker_processes 1, worker_connections 512) +# +# you may also consider adding -j0 for CPU-intensive configurations +# (not that i can really think of any good examples) upstream cpp { server 127.0.0.1:3923; diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 72137b7d..770f9a80 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -298,6 +298,7 @@ def run_argparse(argv, formatter): ap2.add_argument("-q", action="store_true", help="quiet") ap2.add_argument("-lo", metavar="PATH", type=str, help="logfile, example: cpp-%%Y-%%m%%d-%%H%%M%%S.txt.xz") ap2.add_argument("--log-conn", action="store_true", help="print tcp-server msgs") + ap2.add_argument("--log-htp", action="store_true", help="print http-server threadpool scaling") ap2.add_argument("--ihead", metavar="HEADER", action='append', help="dump incoming header") ap2.add_argument("--lf-url", metavar="RE", type=str, default=r"^/\.cpr/|\?th=[wj]$", help="dont log URLs matching") @@ -342,6 +343,7 @@ def run_argparse(argv, formatter): ap2.add_argument("--no-sendfile", action="store_true", help="disable sendfile") ap2.add_argument("--no-scandir", action="store_true", help="disable scandir") ap2.add_argument("--no-fastboot", action="store_true", help="wait for up2k indexing") + ap2.add_argument("--no-htp", action="store_true", help="disable httpserver threadpool, create threads as-needed instead") ap2.add_argument("--stackmon", metavar="P,S", help="write stacktrace to Path every S second") return ap.parse_args(args=argv[1:]) diff --git a/copyparty/httpsrv.py b/copyparty/httpsrv.py index 619de298..2ed9a6a0 100644 --- a/copyparty/httpsrv.py +++ b/copyparty/httpsrv.py @@ -25,10 +25,15 @@ except ImportError: ) sys.exit(1) -from .__init__ import E, MACOS -from .util import spack +from .__init__ import E, PY2, MACOS +from .util import spack, min_ex from .httpconn import HttpConn +if PY2: + import Queue as queue +else: + import queue + class HttpSrv(object): """ @@ -46,6 +51,11 @@ class HttpSrv(object): self.disconnect_func = None self.mutex = threading.Lock() + self.tp_nthr = 0 # actual + self.tp_ncli = 0 # fading + self.tp_time = None # latest worker collect + self.tp_q = None if self.args.no_htp else queue.LifoQueue() + self.clients = {} self.workload = 0 self.workload_thr_alive = False @@ -65,11 +75,65 @@ class HttpSrv(object): else: self.cert_path = None + if self.tp_q: + self.start_threads(4) + + t = threading.Thread(target=self.thr_scaler) + t.daemon = True + t.start() + + def start_threads(self, n): + self.tp_nthr += n + if self.args.log_htp: + self.log("httpsrv", "workers += {} = {}".format(n, self.tp_nthr), 6) + + for _ in range(n): + thr = threading.Thread( + target=self.thr_poolw, + name="httpsrv-poolw", + ) + thr.daemon = True + thr.start() + + def stop_threads(self, n): + self.tp_nthr -= n + if self.args.log_htp: + self.log("httpsrv", "workers -= {} = {}".format(n, self.tp_nthr), 6) + + for _ in range(n): + self.tp_q.put(None) + + def thr_scaler(self): + while True: + time.sleep(2 if self.tp_ncli else 30) + with self.mutex: + self.tp_ncli = max(len(self.clients), self.tp_ncli - 2) + if self.tp_nthr > self.tp_ncli + 8: + self.stop_threads(4) + def accept(self, sck, addr): """takes an incoming tcp connection and creates a thread to handle it""" if self.args.log_conn: self.log("%s %s" % addr, "|%sC-cthr" % ("-" * 5,), c="1;30") + now = time.time() + + if self.tp_time and now - self.tp_time > 300: + self.tp_q = None + + if self.tp_q: + self.tp_q.put((sck, addr)) + with self.mutex: + self.tp_time = self.tp_time or now + self.tp_ncli = max(self.tp_ncli, len(self.clients) + 1) + if self.tp_nthr < len(self.clients) + 4: + self.start_threads(8) + return + + if not self.args.no_htp: + m = "looks like the httpserver threadpool died; please make an issue on github and tell me the story of how you pulled that off, thanks and dog bless\n" + self.log("httpsrv", m, 1) + thr = threading.Thread( target=self.thr_client, args=(sck, addr), @@ -78,6 +142,26 @@ class HttpSrv(object): thr.daemon = True thr.start() + def thr_poolw(self): + while True: + task = self.tp_q.get() + if not task: + break + + with self.mutex: + self.tp_time = None + + try: + sck, addr = task + me = threading.current_thread() + me.name = ( + "httpsrv-{}-{}".format(addr[0].split(".", 2)[-1][-6:], addr[1]), + ) + self.thr_client(sck, addr) + me.name = "httpsrv-poolw" + except: + self.log("httpsrv", "thr_client: " + min_ex(), 3) + def num_clients(self): with self.mutex: return len(self.clients)