From 07b2bf1104c36299fe6fcd7f65dbbe8efe9a1dcf Mon Sep 17 00:00:00 2001 From: ed Date: Fri, 31 May 2024 23:31:32 +0000 Subject: [PATCH] better support for 700+ connections when there was more than ~700 active connections, * sendfile (non-https downloads) could fail * mdns and ssdp could fail to reinitialize on network changes ...because `select` can't handle FDs higher than 512 on windows (1024 on linux/macos), so prefer `poll` where possible (linux/macos) but apple keeps breaking and unbreaking `poll` in macos, so use `--no-poll` if necessary to force `select` instead --- copyparty/__main__.py | 8 +++++++- copyparty/httpcli.py | 9 ++++++++- copyparty/mdns.py | 25 +++++++++++++++++++++++-- copyparty/ssdp.py | 24 ++++++++++++++++++++++-- copyparty/util.py | 12 +++++++++++- 5 files changed, 71 insertions(+), 7 deletions(-) diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 21cba5bb..e4f96aa3 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -13,6 +13,7 @@ import base64 import locale import os import re +import select import socket import sys import threading @@ -1335,6 +1336,8 @@ def add_debug(ap): ap2 = ap.add_argument_group('debug options') ap2.add_argument("--vc", action="store_true", help="verbose config file parser (explain config)") ap2.add_argument("--cgen", action="store_true", help="generate config file from current config (best-effort; probably buggy)") + if hasattr(select, "poll"): + ap2.add_argument("--no-poll", action="store_true", help="kernel-bug workaround: disable poll; use select instead (limits max num clients to ~700)") ap2.add_argument("--no-sendfile", action="store_true", help="kernel-bug workaround: disable sendfile; do a safe and slow read-send-loop instead") ap2.add_argument("--no-scandir", action="store_true", help="kernel-bug workaround: disable scandir; do a listdir + stat on each file instead") ap2.add_argument("--no-fastboot", action="store_true", help="wait for initial filesystem indexing before accepting client requests") @@ -1545,7 +1548,7 @@ def main(argv: Optional[list[str]] = None, rsrc: Optional[str] = None) -> None: if hard > 0: # -1 == infinite nc = min(nc, int(hard / 4)) except: - nc = 512 + nc = 486 # mdns/ssdp restart headroom; select() maxfd is 512 on windows retry = False for fmtr in [RiceFormatter, RiceFormatter, Dodge11874, BasicDodge11874]: @@ -1638,6 +1641,9 @@ def main(argv: Optional[list[str]] = None, rsrc: Optional[str] = None) -> None: if not hasattr(os, "sendfile"): al.no_sendfile = True + if not hasattr(select, "poll"): + al.no_poll = True + # signal.signal(signal.SIGINT, sighandler) SvcHub(al, dal, argv, "".join(printed)).run() diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index c6ea483b..9d2f74b7 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -3193,7 +3193,14 @@ class HttpCli(object): sendfun = sendfile_kern if use_sendfile else sendfile_py remains = sendfun( - self.log, lower, upper, f, self.s, self.args.s_wr_sz, self.args.s_wr_slp + self.log, + lower, + upper, + f, + self.s, + self.args.s_wr_sz, + self.args.s_wr_slp, + not self.args.no_poll, ) if remains > 0: diff --git a/copyparty/mdns.py b/copyparty/mdns.py index c225002e..3a4a7f2d 100644 --- a/copyparty/mdns.py +++ b/copyparty/mdns.py @@ -292,6 +292,22 @@ class MDNS(MCast): def run2(self) -> None: last_hop = time.time() ihop = self.args.mc_hop + + try: + if self.args.no_poll: + raise Exception() + fd2sck = {} + srvpoll = select.poll() + for sck in self.srv: + fd = sck.fileno() + fd2sck[fd] = sck + srvpoll.register(fd, select.POLLIN) + except Exception as ex: + srvpoll = None + if not self.args.no_poll: + t = "WARNING: failed to poll(), will use select() instead: %r" + self.log(t % (ex,), 3) + while self.running: timeout = ( 0.02 + random.random() * 0.07 @@ -300,8 +316,13 @@ class MDNS(MCast): if self.unsolicited else (last_hop + ihop if ihop else 180) ) - rdy = select.select(self.srv, [], [], timeout) - rx: list[socket.socket] = rdy[0] # type: ignore + if srvpoll: + pr = srvpoll.poll(timeout * 1000) + rx = [fd2sck[x[0]] for x in pr if x[1] & select.POLLIN] + else: + rdy = select.select(self.srv, [], [], timeout) + rx: list[socket.socket] = rdy[0] # type: ignore + self.rx4.cln() self.rx6.cln() buf = b"" diff --git a/copyparty/ssdp.py b/copyparty/ssdp.py index 3464a50a..81958b05 100644 --- a/copyparty/ssdp.py +++ b/copyparty/ssdp.py @@ -141,9 +141,29 @@ class SSDPd(MCast): self.log("stopped", 2) def run2(self) -> None: + try: + if self.args.no_poll: + raise Exception() + fd2sck = {} + srvpoll = select.poll() + for sck in self.srv: + fd = sck.fileno() + fd2sck[fd] = sck + srvpoll.register(fd, select.POLLIN) + except Exception as ex: + srvpoll = None + if not self.args.no_poll: + t = "WARNING: failed to poll(), will use select() instead: %r" + self.log(t % (ex,), 3) + while self.running: - rdy = select.select(self.srv, [], [], self.args.z_chk or 180) - rx: list[socket.socket] = rdy[0] # type: ignore + if srvpoll: + pr = srvpoll.poll((self.args.z_chk or 180) * 1000) + rx = [fd2sck[x[0]] for x in pr if x[1] & select.POLLIN] + else: + rdy = select.select(self.srv, [], [], self.args.z_chk or 180) + rx: list[socket.socket] = rdy[0] # type: ignore + self.rxc.cln() buf = b"" addr = ("0", 0) diff --git a/copyparty/util.py b/copyparty/util.py index 4d33608a..050bf718 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -2517,6 +2517,7 @@ def sendfile_py( s: socket.socket, bufsz: int, slp: int, + use_poll: bool, ) -> int: remains = upper - lower f.seek(lower) @@ -2545,22 +2546,31 @@ def sendfile_kern( s: socket.socket, bufsz: int, slp: int, + use_poll: bool, ) -> int: out_fd = s.fileno() in_fd = f.fileno() ofs = lower stuck = 0.0 + if use_poll: + poll = select.poll() + poll.register(out_fd, select.POLLOUT) + while ofs < upper: stuck = stuck or time.time() try: req = min(2 ** 30, upper - ofs) - select.select([], [out_fd], [], 10) + if use_poll: + poll.poll(10000) + else: + select.select([], [out_fd], [], 10) n = os.sendfile(out_fd, in_fd, ofs, req) stuck = 0 except OSError as ex: # client stopped reading; do another select d = time.time() - stuck if d < 3600 and ex.errno == errno.EWOULDBLOCK: + time.sleep(0.02) continue n = 0