siocoutq-based shutdown

This commit is contained in:
ed 2022-08-31 01:16:09 +02:00
parent 5d19f23372
commit 558bfa4e1e
6 changed files with 59 additions and 15 deletions

View file

@ -20,7 +20,7 @@ import time
import traceback
from textwrap import dedent
from .__init__ import ANYWIN, PY2, VT100, WINDOWS, E, unicode, CORES
from .__init__ import ANYWIN, CORES, PY2, VT100, WINDOWS, E, unicode
from .__version__ import CODENAME, S_BUILD_DT, S_VERSION
from .authsrv import re_vol
from .svchub import SvcHub

View file

@ -6,7 +6,7 @@ import time
import queue
from .__init__ import TYPE_CHECKING, CORES
from .__init__ import CORES, TYPE_CHECKING
from .broker_mpw import MpWorker
from .broker_util import try_exec
from .util import mp

View file

@ -23,6 +23,7 @@ from .mtag import HAVE_FFMPEG
from .th_cli import ThumbCli
from .th_srv import HAVE_PIL, HAVE_VIPS
from .u2idx import U2idx
from .util import shut_socket
try:
from typing import Optional, Pattern, Union
@ -72,8 +73,7 @@ class HttpConn(object):
def shutdown(self) -> None:
self.stopping = True
try:
self.s.shutdown(socket.SHUT_RDWR)
self.s.close()
shut_socket(self.log, self.s, 1)
except:
pass

View file

@ -31,7 +31,7 @@ except ImportError:
from .__init__ import MACOS, TYPE_CHECKING, E
from .bos import bos
from .httpconn import HttpConn
from .util import FHC, min_ex, spack, start_log_thrs, start_stackmon
from .util import FHC, min_ex, shut_socket, spack, start_log_thrs, start_stackmon
if TYPE_CHECKING:
from .broker_util import BrokerCli
@ -55,6 +55,9 @@ class HttpSrv(object):
self.log = broker.log
self.asrv = broker.asrv
# redefine in case of multiprocessing
socket.setdefaulttimeout(120)
nsuf = "-n{}-i{:x}".format(nid, os.getpid()) if nid else ""
self.name = "hsrv" + nsuf
@ -293,8 +296,6 @@ class HttpSrv(object):
def thr_client(self, sck: socket.socket, addr: tuple[str, int]) -> None:
"""thread managing one tcp client"""
sck.settimeout(120)
cli = HttpConn(sck, addr, self)
with self.mutex:
self.clients.add(cli)
@ -321,8 +322,7 @@ class HttpSrv(object):
try:
fno = sck.fileno()
sck.shutdown(socket.SHUT_RDWR)
sck.close()
shut_socket(cli.log, sck)
except (OSError, socket.error) as ex:
if not MACOS:
self.log(

View file

@ -24,8 +24,10 @@ class TcpSrv(object):
self.args = hub.args
self.log = hub.log
self.stopping = False
# mp-safe since issue6056
socket.setdefaulttimeout(120)
self.stopping = False
self.srv: list[socket.socket] = []
self.nsrv = 0
ok: dict[str, list[int]] = {}
@ -112,6 +114,7 @@ class TcpSrv(object):
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
srv.settimeout(None) # < does not inherit, ^ does
try:
srv.bind((ip, port))
self.srv.append(srv)

View file

@ -24,12 +24,14 @@ from datetime import datetime
from queue import Queue
from .__init__ import ANYWIN, PY2, TYPE_CHECKING, VT100, WINDOWS
from .__init__ import ANYWIN, MACOS, PY2, TYPE_CHECKING, VT100, WINDOWS
from .__version__ import S_BUILD_DT, S_VERSION
from .stolen import surrogateescape
try:
import ctypes
import fcntl
import termios
except:
pass
@ -1440,6 +1442,48 @@ def get_df(abspath: str) -> tuple[Optional[int], Optional[int]]:
return (None, None)
if not ANYWIN and not MACOS:
def siocoutq(sck: socket.socket) -> int:
# SIOCOUTQ^sockios.h == TIOCOUTQ^ioctl.h
try:
zb = fcntl.ioctl(sck.fileno(), termios.TIOCOUTQ, b"AAAA")
return sunpack(b"I", zb)[0] # type: ignore
except:
return 1
else:
# macos: getsockopt(fd, SOL_SOCKET, SO_NWRITE, ...)
# windows: TcpConnectionEstatsSendBuff
def siocoutq(sck: socket.socket) -> int:
return 1
def shut_socket(log: "NamedLogger", sck: socket.socket, timeout: int = 3) -> None:
t0 = time.time()
try:
sck.settimeout(timeout)
sck.shutdown(socket.SHUT_WR)
try:
while time.time() - t0 < timeout:
if not siocoutq(sck):
# kernel says tx queue empty, we good
break
# on windows in particular, drain rx until client shuts
if not sck.recv(32 * 1024):
break
except:
pass
finally:
td = time.time() - t0
if td >= 1:
log("shut() in {:.3f} sec".format(td), "1;30")
sck.close()
def read_socket(sr: Unrecv, total_size: int) -> Generator[bytes, None, None]:
remains = total_size
while remains > 0:
@ -2030,10 +2074,7 @@ def termsize() -> tuple[int, int]:
def ioctl_GWINSZ(fd: int) -> Optional[tuple[int, int]]:
try:
import fcntl
import termios
cr = struct.unpack("hh", fcntl.ioctl(fd, termios.TIOCGWINSZ, b"1234"))
cr = sunpack(b"hh", fcntl.ioctl(fd, termios.TIOCGWINSZ, b"AAAA"))
return int(cr[1]), int(cr[0])
except:
return None