From 4bcd30da6b88eb52e69fea7e3c60d974c9bbc2c5 Mon Sep 17 00:00:00 2001 From: ed Date: Sun, 23 Oct 2022 12:05:44 +0200 Subject: [PATCH] cleaner daemon instancing --- bin/up2k.py | 21 +++++++-------- copyparty/broker_mp.py | 10 ++------ copyparty/broker_mpw.py | 7 ++--- copyparty/fsutil.py | 2 +- copyparty/ftpd.py | 6 ++--- copyparty/httpsrv.py | 37 +++++++++----------------- copyparty/star.py | 6 ++--- copyparty/svchub.py | 17 ++++-------- copyparty/th_srv.py | 22 +++++++++------- copyparty/u2idx.py | 22 ++++++++-------- copyparty/up2k.py | 57 +++++++++++++---------------------------- copyparty/util.py | 39 +++++++++++++--------------- 12 files changed, 95 insertions(+), 151 deletions(-) diff --git a/bin/up2k.py b/bin/up2k.py index 0318a228..56f72be7 100755 --- a/bin/up2k.py +++ b/bin/up2k.py @@ -154,10 +154,7 @@ class MTHash(object): self.done_q = Queue() self.thrs = [] for _ in range(cores): - t = threading.Thread(target=self.worker) - t.daemon = True - t.start() - self.thrs.append(t) + self.thrs.append(Daemon(self.worker)) def hash(self, f, fsz, chunksz, pcb=None, pcb_opaque=None): with self.omutex: @@ -290,9 +287,7 @@ class CTermsize(object): except: return - thr = threading.Thread(target=self.worker) - thr.daemon = True - thr.start() + Daemon(self.worker) def worker(self): while True: @@ -549,9 +544,11 @@ def upload(req_ses, file, cid, pw): class Daemon(threading.Thread): - def __init__(self, *a, **ka): - threading.Thread.__init__(self, *a, **ka) + def __init__(self, target, name=None, a=None): + # type: (Any, Any, Any) -> None + threading.Thread.__init__(self, target=target, args=a or (), name=name) self.daemon = True + self.start() class Ctl(object): @@ -678,10 +675,10 @@ class Ctl(object): atexit.register(self.cleanup_vt100) ss.scroll_region(3) - Daemon(target=self.hasher).start() + Daemon(self.hasher) for _ in range(self.ar.j): - Daemon(target=self.handshaker).start() - Daemon(target=self.uploader).start() + Daemon(self.handshaker) + Daemon(self.uploader) idles = 0 while idles < 3: diff --git a/copyparty/broker_mp.py b/copyparty/broker_mp.py index 1727e9c1..6afe634d 100644 --- a/copyparty/broker_mp.py +++ b/copyparty/broker_mp.py @@ -9,7 +9,7 @@ import queue from .__init__ import CORES, TYPE_CHECKING from .broker_mpw import MpWorker from .broker_util import try_exec -from .util import mp +from .util import Daemon, mp if TYPE_CHECKING: from .svchub import SvcHub @@ -51,13 +51,7 @@ class BrokerMp(object): q_yield: queue.Queue[tuple[int, str, list[Any]]] = mp.Queue(64) proc = MProcess(q_pend, q_yield, MpWorker, (q_pend, q_yield, self.args, n)) - - thr = threading.Thread( - target=self.collector, args=(proc,), name="mp-sink-{}".format(n) - ) - thr.daemon = True - thr.start() - + Daemon(self.collector, "mp-sink-{}".format(n), (proc,)) self.procs.append(proc) proc.start() diff --git a/copyparty/broker_mpw.py b/copyparty/broker_mpw.py index 79952989..d119b914 100644 --- a/copyparty/broker_mpw.py +++ b/copyparty/broker_mpw.py @@ -13,7 +13,7 @@ from .__init__ import ANYWIN from .authsrv import AuthSrv from .broker_util import BrokerCli, ExceptionalQueue from .httpsrv import HttpSrv -from .util import FAKE_MP, HMaccas +from .util import FAKE_MP, Daemon, HMaccas try: from types import FrameType @@ -65,10 +65,7 @@ class MpWorker(BrokerCli): # on winxp and some other platforms, # use thr.join() to block all signals - thr = threading.Thread(target=self.main, name="mpw-main") - thr.daemon = True - thr.start() - thr.join() + Daemon(self.main, "mpw-main").join() def signal_handler(self, sig: Optional[int], frame: Optional[FrameType]) -> None: # print('k') diff --git a/copyparty/fsutil.py b/copyparty/fsutil.py index e115d76f..cfba6161 100644 --- a/copyparty/fsutil.py +++ b/copyparty/fsutil.py @@ -28,7 +28,7 @@ class Fstab(object): self.age = 0.0 def log(self, msg: str, c: Union[int, str] = 0) -> None: - self.log_func("fstab", msg + "\033[K", c) + self.log_func("fstab", msg, c) def get(self, path: str) -> str: if len(self.cache) > 9000: diff --git a/copyparty/ftpd.py b/copyparty/ftpd.py index a30196cd..942edd87 100644 --- a/copyparty/ftpd.py +++ b/copyparty/ftpd.py @@ -17,7 +17,7 @@ from pyftpdlib.servers import FTPServer from .__init__ import PY2, TYPE_CHECKING, E from .bos import bos -from .util import Pebkac, exclude_dotfiles, fsenc +from .util import Daemon, Pebkac, exclude_dotfiles, fsenc try: from pyftpdlib.ioloop import IOLoop @@ -402,9 +402,7 @@ class Ftpd(object): for h, lp in hs: FTPServer((ip, int(lp)), h, ioloop) - thr = threading.Thread(target=ioloop.loop, name="ftp") - thr.daemon = True - thr.start() + Daemon(ioloop.loop, "ftp") def join(p1: str, p2: str) -> str: diff --git a/copyparty/httpsrv.py b/copyparty/httpsrv.py index b14d6ad2..7d22ba78 100644 --- a/copyparty/httpsrv.py +++ b/copyparty/httpsrv.py @@ -33,6 +33,7 @@ from .bos import bos from .httpconn import HttpConn from .util import ( FHC, + Daemon, Garda, Magician, E_SCK, @@ -121,9 +122,7 @@ class HttpSrv(object): start_log_thrs(self.log, self.args.log_thrs, nid) self.th_cfg: dict[str, Any] = {} - t = threading.Thread(target=self.post_init, name="hsrv-init2") - t.daemon = True - t.start() + Daemon(self.post_init, "hsrv-init2") def post_init(self) -> None: try: @@ -138,12 +137,7 @@ class HttpSrv(object): self.log(self.name, "workers += {} = {}".format(n, self.tp_nthr), 6) for _ in range(n): - thr = threading.Thread( - target=self.thr_poolw, - name=self.name + "-poolw", - ) - thr.daemon = True - thr.start() + Daemon(self.thr_poolw, self.name + "-poolw") def stop_threads(self, n: int) -> None: self.tp_nthr -= n @@ -178,13 +172,11 @@ class HttpSrv(object): ip, port = sck.getsockname() self.srvs.append(sck) self.nclimax = math.ceil(self.args.nc * 1.0 / nlisteners) - t = threading.Thread( - target=self.thr_listen, - args=(sck,), - name="httpsrv-n{}-listen-{}-{}".format(self.nid or "0", ip, port), + Daemon( + self.thr_listen, + "httpsrv-n{}-listen-{}-{}".format(self.nid or "0", ip, port), + (sck,), ) - t.daemon = True - t.start() def thr_listen(self, srv_sck: socket.socket) -> None: """listens on a shared tcp server""" @@ -242,10 +234,7 @@ class HttpSrv(object): if self.nid: name += "-{}".format(self.nid) - thr = threading.Thread(target=self.periodic, name=name) - self.t_periodic = thr - thr.daemon = True - thr.start() + self.t_periodic = Daemon(self.periodic, name) if self.tp_q: self.tp_time = self.tp_time or now @@ -260,13 +249,11 @@ class HttpSrv(object): t = "looks like the httpserver threadpool died; please make an issue on github and tell me the story of how you pulled that off, thanks and dog bless\n" self.log(self.name, t, 1) - thr = threading.Thread( - target=self.thr_client, - args=(sck, addr), - name="httpconn-{}-{}".format(addr[0].split(".", 2)[-1][-6:], addr[1]), + Daemon( + self.thr_client, + "httpconn-{}-{}".format(addr[0].split(".", 2)[-1][-6:], addr[1]), + (sck, addr), ) - thr.daemon = True - thr.start() def thr_poolw(self) -> None: assert self.tp_q diff --git a/copyparty/star.py b/copyparty/star.py index f63685c2..c7731b2b 100644 --- a/copyparty/star.py +++ b/copyparty/star.py @@ -8,7 +8,7 @@ from queue import Queue from .bos import bos from .sutil import StreamArc, errdesc -from .util import fsenc, min_ex +from .util import Daemon, fsenc, min_ex try: from typing import Any, Generator, Optional @@ -60,9 +60,7 @@ class StreamTar(StreamArc): fmt = tarfile.GNU_FORMAT self.tar = tarfile.open(fileobj=self.qfile, mode="w|", format=fmt) # type: ignore - w = threading.Thread(target=self._gen, name="star-gen") - w.daemon = True - w.start() + Daemon(self._gen, "star-gen") def gen(self) -> Generator[Optional[bytes], None, None]: try: diff --git a/copyparty/svchub.py b/copyparty/svchub.py index 1187e4b2..3bc89227 100644 --- a/copyparty/svchub.py +++ b/copyparty/svchub.py @@ -35,6 +35,7 @@ from .th_srv import HAVE_PIL, HAVE_VIPS, HAVE_WEBP, ThumbSrv from .up2k import Up2k from .util import ( VERSIONS, + Daemon, HMaccas, alltrace, ansi_re, @@ -232,9 +233,7 @@ class SvcHub(object): self.up2k.init_vols() - thr = threading.Thread(target=self.sd_notify, name="sd-notify") - thr.daemon = True - thr.start() + Daemon(self.sd_notify, "sd-notify") def _logname(self) -> str: dt = datetime.utcnow() @@ -285,9 +284,7 @@ class SvcHub(object): def run(self) -> None: self.tcpsrv.run() - thr = threading.Thread(target=self.thr_httpsrv_up, name="sig-hsrv-up2") - thr.daemon = True - thr.start() + Daemon(self.thr_httpsrv_up, "sig-hsrv-up2") sigs = [signal.SIGINT, signal.SIGTERM] if not ANYWIN: @@ -302,9 +299,7 @@ class SvcHub(object): # never lucky if ANYWIN: # msys-python probably fine but >msys-python - thr = threading.Thread(target=self.stop_thr, name="svchub-sig") - thr.daemon = True - thr.start() + Daemon(self.stop_thr, "svchub-sig") try: while not self.stop_req: @@ -324,9 +319,7 @@ class SvcHub(object): return "cannot reload; already in progress" self.reloading = True - t = threading.Thread(target=self._reload, name="reloading") - t.daemon = True - t.start() + Daemon(self._reload, "reloading") return "reload initiated" def _reload(self) -> None: diff --git a/copyparty/th_srv.py b/copyparty/th_srv.py index d92b186e..29d07745 100644 --- a/copyparty/th_srv.py +++ b/copyparty/th_srv.py @@ -14,7 +14,17 @@ from queue import Queue from .__init__ import TYPE_CHECKING from .bos import bos from .mtag import HAVE_FFMPEG, HAVE_FFPROBE, ffprobe -from .util import BytesIO, Cooldown, Pebkac, fsenc, min_ex, runcmd, statdir, vsplit +from .util import ( + BytesIO, + Cooldown, + Daemon, + Pebkac, + fsenc, + min_ex, + runcmd, + statdir, + vsplit, +) try: from typing import Optional, Union @@ -106,11 +116,7 @@ class ThumbSrv(object): self.q: Queue[Optional[tuple[str, str]]] = Queue(self.nthr * 4) for n in range(self.nthr): - thr = threading.Thread( - target=self.worker, name="thumb-{}-{}".format(n, self.nthr) - ) - thr.daemon = True - thr.start() + Daemon(self.worker, "thumb-{}-{}".format(n, self.nthr)) want_ff = not self.args.no_vthumb or not self.args.no_athumb if want_ff and (not HAVE_FFMPEG or not HAVE_FFPROBE): @@ -126,9 +132,7 @@ class ThumbSrv(object): self.log(msg, c=3) if self.args.th_clean: - t = threading.Thread(target=self.cleaner, name="thumb.cln") - t.daemon = True - t.start() + Daemon(self.cleaner, "thumb.cln") self.fmt_pil, self.fmt_vips, self.fmt_ffi, self.fmt_ffv, self.fmt_ffa = [ set(y.split(",")) diff --git a/copyparty/u2idx.py b/copyparty/u2idx.py index 8a342042..6d8ae7ef 100644 --- a/copyparty/u2idx.py +++ b/copyparty/u2idx.py @@ -11,7 +11,16 @@ from operator import itemgetter from .__init__ import ANYWIN, TYPE_CHECKING, unicode from .bos import bos from .up2k import up2k_wark_from_hashlist -from .util import HAVE_SQLITE3, Pebkac, absreal, gen_filekey, min_ex, quotep, s3dec +from .util import ( + HAVE_SQLITE3, + Daemon, + Pebkac, + absreal, + gen_filekey, + min_ex, + quotep, + s3dec, +) if HAVE_SQLITE3: import sqlite3 @@ -270,16 +279,7 @@ class U2idx(object): self.active_id = "{:.6f}_{}".format( time.time(), threading.current_thread().ident ) - thr = threading.Thread( - target=self.terminator, - args=( - self.active_id, - done_flag, - ), - name="u2idx-terminator", - ) - thr.daemon = True - thr.start() + Daemon(self.terminator, "u2idx-terminator", (self.active_id, done_flag)) if not uq or not uv: uq = "select * from up" diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 3160d316..e91947a0 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -29,6 +29,7 @@ from .mtag import MParser, MTag from .util import ( HAVE_SQLITE3, SYMTIME, + Daemon, MTHash, Pebkac, ProgressPrinter, @@ -153,9 +154,7 @@ class Up2k(object): if ANYWIN: # usually fails to set lastmod too quickly self.lastmod_q: list[tuple[str, int, tuple[int, int], bool]] = [] - thr = threading.Thread(target=self._lastmodder, name="up2k-lastmod") - thr.daemon = True - thr.start() + Daemon(self._lastmodder, "up2k-lastmod") self.fstab = Fstab(self.log_func) @@ -171,9 +170,7 @@ class Up2k(object): if self.args.no_fastboot: return - t = threading.Thread(target=self.deferred_init, name="up2k-deferred-init") - t.daemon = True - t.start() + Daemon(self.deferred_init, "up2k-deferred-init") def reload(self) -> None: self.gid += 1 @@ -188,32 +185,21 @@ class Up2k(object): if not self.pp and self.args.exit == "idx": return self.hub.sigterm() - thr = threading.Thread(target=self._snapshot, name="up2k-snapshot") - thr.daemon = True - thr.start() - + Daemon(self._snapshot, "up2k-snapshot") if have_e2d: - thr = threading.Thread(target=self._hasher, name="up2k-hasher") - thr.daemon = True - thr.start() - - thr = threading.Thread(target=self._sched_rescan, name="up2k-rescan") - thr.daemon = True - thr.start() - + Daemon(self._hasher, "up2k-hasher") + Daemon(self._sched_rescan, "up2k-rescan") if self.mtag: for n in range(max(1, self.args.mtag_mt)): - name = "tagger-{}".format(n) - thr = threading.Thread(target=self._tagger, name=name) - thr.daemon = True - thr.start() + Daemon(self._tagger, "tagger-{}".format(n)) - thr = threading.Thread(target=self._run_all_mtp, name="up2k-mtp-init") - thr.daemon = True - thr.start() + Daemon(self._run_all_mtp, "up2k-mtp-init") def log(self, msg: str, c: Union[int, str] = 0) -> None: - self.log_func("up2k", msg + "\033[K", c) + if self.pp: + msg += "\033[K" + + self.log_func("up2k", msg, c) def _block(self, why: str) -> None: self.blocked = why @@ -256,13 +242,11 @@ class Up2k(object): return "cannot initiate; scan is already in progress" args = (all_vols, scan_vols) - t = threading.Thread( - target=self.init_indexes, - args=args, - name="up2k-rescan-{}".format(scan_vols[0] if scan_vols else "all"), + Daemon( + self.init_indexes, + "up2k-rescan-{}".format(scan_vols[0] if scan_vols else "all"), + args, ) - t.daemon = True - t.start() return "" def _sched_rescan(self) -> None: @@ -581,7 +565,7 @@ class Up2k(object): if self.mtag: t = "online (running mtp)" if scan_vols: - thr = threading.Thread(target=self._run_all_mtp, name="up2k-mtp-scan") + thr = Daemon(self._run_all_mtp, "up2k-mtp-scan", r=False) else: self.pp = None t = "online, idle" @@ -590,7 +574,6 @@ class Up2k(object): self.volstate[vol.vpath] = t if thr: - thr.daemon = True thr.start() return have_e2d @@ -1622,11 +1605,7 @@ class Up2k(object): mpool: Queue[Mpqe] = Queue(nw) for _ in range(nw): - thr = threading.Thread( - target=self._tag_thr, args=(mpool,), name="up2k-mpool" - ) - thr.daemon = True - thr.start() + Daemon(self._tag_thr, "up2k-mpool", (mpool,)) return mpool diff --git a/copyparty/util.py b/copyparty/util.py index 95e3c456..1568f90d 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -340,6 +340,20 @@ _: Any = (mp, BytesIO, quote, unquote, SQLITE_VER, JINJA_VER, PYFTPD_VER) __all__ = ["mp", "BytesIO", "quote", "unquote", "SQLITE_VER", "JINJA_VER", "PYFTPD_VER"] +class Daemon(threading.Thread): + def __init__( + self, + target: Any, + name: Optional[str] = None, + a: Iterable[Any] = None, + r=True, + ) -> None: + threading.Thread.__init__(self, target=target, name=name, args=a or ()) + self.daemon = True + if r: + self.start() + + class Cooldown(object): def __init__(self, maxage: float) -> None: self.maxage = maxage @@ -569,9 +583,7 @@ class MTHash(object): self.done_q: Queue[tuple[int, str, int, int]] = Queue() self.thrs = [] for n in range(cores): - t = threading.Thread(target=self.worker, name="mth-" + str(n)) - t.daemon = True - t.start() + t = Daemon(self.worker, "mth-" + str(n)) self.thrs.append(t) def hash( @@ -884,13 +896,7 @@ def start_stackmon(arg_str: str, nid: int) -> None: suffix = "-{}".format(nid) if nid else "" fp, f = arg_str.rsplit(",", 1) zi = int(f) - t = threading.Thread( - target=stackmon, - args=(fp, zi, suffix), - name="stackmon" + suffix, - ) - t.daemon = True - t.start() + Daemon(stackmon, "stackmon" + suffix, (fp, zi, suffix)) def stackmon(fp: str, ival: float, suffix: str) -> None: @@ -943,13 +949,7 @@ def start_log_thrs( tname = "logthr-n{}-i{:x}".format(nid, os.getpid()) lname = tname[3:] - t = threading.Thread( - target=log_thrs, - args=(logger, ival, lname), - name=tname, - ) - t.daemon = True - t.start() + Daemon(log_thrs, tname, (logger, ival, lname)) def log_thrs(log: Callable[[str, str, int], None], ival: float, name: str) -> None: @@ -1673,10 +1673,7 @@ def db_ex_chk(log: "NamedLogger", ex: Exception, db_path: str) -> bool: if str(ex) != "database is locked": return False - thr = threading.Thread(target=lsof, args=(log, db_path), name="dbex") - thr.daemon = True - thr.start() - + Daemon(lsof, "dbex", (log, db_path)) return True