diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 111a6c32..f0f49872 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -20,7 +20,7 @@ import time import traceback from textwrap import dedent -from .__init__ import ANYWIN, PY2, VT100, WINDOWS, E, unicode, CORES +from .__init__ import ANYWIN, CORES, PY2, VT100, WINDOWS, E, unicode from .__version__ import CODENAME, S_BUILD_DT, S_VERSION from .authsrv import re_vol from .svchub import SvcHub diff --git a/copyparty/broker_mp.py b/copyparty/broker_mp.py index df4f8c26..1727e9c1 100644 --- a/copyparty/broker_mp.py +++ b/copyparty/broker_mp.py @@ -6,7 +6,7 @@ import time import queue -from .__init__ import TYPE_CHECKING, CORES +from .__init__ import CORES, TYPE_CHECKING from .broker_mpw import MpWorker from .broker_util import try_exec from .util import mp diff --git a/copyparty/httpconn.py b/copyparty/httpconn.py index 7c6c168b..448ee203 100644 --- a/copyparty/httpconn.py +++ b/copyparty/httpconn.py @@ -23,6 +23,7 @@ from .mtag import HAVE_FFMPEG from .th_cli import ThumbCli from .th_srv import HAVE_PIL, HAVE_VIPS from .u2idx import U2idx +from .util import shut_socket try: from typing import Optional, Pattern, Union @@ -72,8 +73,7 @@ class HttpConn(object): def shutdown(self) -> None: self.stopping = True try: - self.s.shutdown(socket.SHUT_RDWR) - self.s.close() + shut_socket(self.log, self.s, 1) except: pass diff --git a/copyparty/httpsrv.py b/copyparty/httpsrv.py index 3a35099a..cbd2defd 100644 --- a/copyparty/httpsrv.py +++ b/copyparty/httpsrv.py @@ -31,7 +31,7 @@ except ImportError: from .__init__ import MACOS, TYPE_CHECKING, E from .bos import bos from .httpconn import HttpConn -from .util import FHC, min_ex, spack, start_log_thrs, start_stackmon +from .util import FHC, min_ex, shut_socket, spack, start_log_thrs, start_stackmon if TYPE_CHECKING: from .broker_util import BrokerCli @@ -55,6 +55,9 @@ class HttpSrv(object): self.log = broker.log self.asrv = broker.asrv + # redefine in case of multiprocessing + socket.setdefaulttimeout(120) + nsuf = "-n{}-i{:x}".format(nid, os.getpid()) if nid else "" self.name = "hsrv" + nsuf @@ -293,8 +296,6 @@ class HttpSrv(object): def thr_client(self, sck: socket.socket, addr: tuple[str, int]) -> None: """thread managing one tcp client""" - sck.settimeout(120) - cli = HttpConn(sck, addr, self) with self.mutex: self.clients.add(cli) @@ -321,8 +322,7 @@ class HttpSrv(object): try: fno = sck.fileno() - sck.shutdown(socket.SHUT_RDWR) - sck.close() + shut_socket(cli.log, sck) except (OSError, socket.error) as ex: if not MACOS: self.log( diff --git a/copyparty/tcpsrv.py b/copyparty/tcpsrv.py index 4badc02e..28a87538 100644 --- a/copyparty/tcpsrv.py +++ b/copyparty/tcpsrv.py @@ -24,8 +24,10 @@ class TcpSrv(object): self.args = hub.args self.log = hub.log - self.stopping = False + # mp-safe since issue6056 + socket.setdefaulttimeout(120) + self.stopping = False self.srv: list[socket.socket] = [] self.nsrv = 0 ok: dict[str, list[int]] = {} @@ -112,6 +114,7 @@ class TcpSrv(object): srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) srv.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + srv.settimeout(None) # < does not inherit, ^ does try: srv.bind((ip, port)) self.srv.append(srv) diff --git a/copyparty/util.py b/copyparty/util.py index 96b65ef9..4eaf9cd1 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -24,12 +24,14 @@ from datetime import datetime from queue import Queue -from .__init__ import ANYWIN, PY2, TYPE_CHECKING, VT100, WINDOWS +from .__init__ import ANYWIN, MACOS, PY2, TYPE_CHECKING, VT100, WINDOWS from .__version__ import S_BUILD_DT, S_VERSION from .stolen import surrogateescape try: import ctypes + import fcntl + import termios except: pass @@ -1440,6 +1442,48 @@ def get_df(abspath: str) -> tuple[Optional[int], Optional[int]]: return (None, None) +if not ANYWIN and not MACOS: + + def siocoutq(sck: socket.socket) -> int: + # SIOCOUTQ^sockios.h == TIOCOUTQ^ioctl.h + try: + zb = fcntl.ioctl(sck.fileno(), termios.TIOCOUTQ, b"AAAA") + return sunpack(b"I", zb)[0] # type: ignore + except: + return 1 + +else: + # macos: getsockopt(fd, SOL_SOCKET, SO_NWRITE, ...) + # windows: TcpConnectionEstatsSendBuff + + def siocoutq(sck: socket.socket) -> int: + return 1 + + +def shut_socket(log: "NamedLogger", sck: socket.socket, timeout: int = 3) -> None: + t0 = time.time() + try: + sck.settimeout(timeout) + sck.shutdown(socket.SHUT_WR) + try: + while time.time() - t0 < timeout: + if not siocoutq(sck): + # kernel says tx queue empty, we good + break + + # on windows in particular, drain rx until client shuts + if not sck.recv(32 * 1024): + break + except: + pass + finally: + td = time.time() - t0 + if td >= 1: + log("shut() in {:.3f} sec".format(td), "1;30") + + sck.close() + + def read_socket(sr: Unrecv, total_size: int) -> Generator[bytes, None, None]: remains = total_size while remains > 0: @@ -2030,10 +2074,7 @@ def termsize() -> tuple[int, int]: def ioctl_GWINSZ(fd: int) -> Optional[tuple[int, int]]: try: - import fcntl - import termios - - cr = struct.unpack("hh", fcntl.ioctl(fd, termios.TIOCGWINSZ, b"1234")) + cr = sunpack(b"hh", fcntl.ioctl(fd, termios.TIOCGWINSZ, b"AAAA")) return int(cr[1]), int(cr[0]) except: return None