cleaner daemon instancing

This commit is contained in:
ed 2022-10-23 12:05:44 +02:00
parent 947dbb6f8a
commit 4bcd30da6b
12 changed files with 95 additions and 151 deletions

View file

@ -154,10 +154,7 @@ class MTHash(object):
self.done_q = Queue()
self.thrs = []
for _ in range(cores):
t = threading.Thread(target=self.worker)
t.daemon = True
t.start()
self.thrs.append(t)
self.thrs.append(Daemon(self.worker))
def hash(self, f, fsz, chunksz, pcb=None, pcb_opaque=None):
with self.omutex:
@ -290,9 +287,7 @@ class CTermsize(object):
except:
return
thr = threading.Thread(target=self.worker)
thr.daemon = True
thr.start()
Daemon(self.worker)
def worker(self):
while True:
@ -549,9 +544,11 @@ def upload(req_ses, file, cid, pw):
class Daemon(threading.Thread):
def __init__(self, *a, **ka):
threading.Thread.__init__(self, *a, **ka)
def __init__(self, target, name=None, a=None):
# type: (Any, Any, Any) -> None
threading.Thread.__init__(self, target=target, args=a or (), name=name)
self.daemon = True
self.start()
class Ctl(object):
@ -678,10 +675,10 @@ class Ctl(object):
atexit.register(self.cleanup_vt100)
ss.scroll_region(3)
Daemon(target=self.hasher).start()
Daemon(self.hasher)
for _ in range(self.ar.j):
Daemon(target=self.handshaker).start()
Daemon(target=self.uploader).start()
Daemon(self.handshaker)
Daemon(self.uploader)
idles = 0
while idles < 3:

View file

@ -9,7 +9,7 @@ import queue
from .__init__ import CORES, TYPE_CHECKING
from .broker_mpw import MpWorker
from .broker_util import try_exec
from .util import mp
from .util import Daemon, mp
if TYPE_CHECKING:
from .svchub import SvcHub
@ -51,13 +51,7 @@ class BrokerMp(object):
q_yield: queue.Queue[tuple[int, str, list[Any]]] = mp.Queue(64)
proc = MProcess(q_pend, q_yield, MpWorker, (q_pend, q_yield, self.args, n))
thr = threading.Thread(
target=self.collector, args=(proc,), name="mp-sink-{}".format(n)
)
thr.daemon = True
thr.start()
Daemon(self.collector, "mp-sink-{}".format(n), (proc,))
self.procs.append(proc)
proc.start()

View file

@ -13,7 +13,7 @@ from .__init__ import ANYWIN
from .authsrv import AuthSrv
from .broker_util import BrokerCli, ExceptionalQueue
from .httpsrv import HttpSrv
from .util import FAKE_MP, HMaccas
from .util import FAKE_MP, Daemon, HMaccas
try:
from types import FrameType
@ -65,10 +65,7 @@ class MpWorker(BrokerCli):
# on winxp and some other platforms,
# use thr.join() to block all signals
thr = threading.Thread(target=self.main, name="mpw-main")
thr.daemon = True
thr.start()
thr.join()
Daemon(self.main, "mpw-main").join()
def signal_handler(self, sig: Optional[int], frame: Optional[FrameType]) -> None:
# print('k')

View file

@ -28,7 +28,7 @@ class Fstab(object):
self.age = 0.0
def log(self, msg: str, c: Union[int, str] = 0) -> None:
self.log_func("fstab", msg + "\033[K", c)
self.log_func("fstab", msg, c)
def get(self, path: str) -> str:
if len(self.cache) > 9000:

View file

@ -17,7 +17,7 @@ from pyftpdlib.servers import FTPServer
from .__init__ import PY2, TYPE_CHECKING, E
from .bos import bos
from .util import Pebkac, exclude_dotfiles, fsenc
from .util import Daemon, Pebkac, exclude_dotfiles, fsenc
try:
from pyftpdlib.ioloop import IOLoop
@ -402,9 +402,7 @@ class Ftpd(object):
for h, lp in hs:
FTPServer((ip, int(lp)), h, ioloop)
thr = threading.Thread(target=ioloop.loop, name="ftp")
thr.daemon = True
thr.start()
Daemon(ioloop.loop, "ftp")
def join(p1: str, p2: str) -> str:

View file

@ -33,6 +33,7 @@ from .bos import bos
from .httpconn import HttpConn
from .util import (
FHC,
Daemon,
Garda,
Magician,
E_SCK,
@ -121,9 +122,7 @@ class HttpSrv(object):
start_log_thrs(self.log, self.args.log_thrs, nid)
self.th_cfg: dict[str, Any] = {}
t = threading.Thread(target=self.post_init, name="hsrv-init2")
t.daemon = True
t.start()
Daemon(self.post_init, "hsrv-init2")
def post_init(self) -> None:
try:
@ -138,12 +137,7 @@ class HttpSrv(object):
self.log(self.name, "workers += {} = {}".format(n, self.tp_nthr), 6)
for _ in range(n):
thr = threading.Thread(
target=self.thr_poolw,
name=self.name + "-poolw",
)
thr.daemon = True
thr.start()
Daemon(self.thr_poolw, self.name + "-poolw")
def stop_threads(self, n: int) -> None:
self.tp_nthr -= n
@ -178,13 +172,11 @@ class HttpSrv(object):
ip, port = sck.getsockname()
self.srvs.append(sck)
self.nclimax = math.ceil(self.args.nc * 1.0 / nlisteners)
t = threading.Thread(
target=self.thr_listen,
args=(sck,),
name="httpsrv-n{}-listen-{}-{}".format(self.nid or "0", ip, port),
Daemon(
self.thr_listen,
"httpsrv-n{}-listen-{}-{}".format(self.nid or "0", ip, port),
(sck,),
)
t.daemon = True
t.start()
def thr_listen(self, srv_sck: socket.socket) -> None:
"""listens on a shared tcp server"""
@ -242,10 +234,7 @@ class HttpSrv(object):
if self.nid:
name += "-{}".format(self.nid)
thr = threading.Thread(target=self.periodic, name=name)
self.t_periodic = thr
thr.daemon = True
thr.start()
self.t_periodic = Daemon(self.periodic, name)
if self.tp_q:
self.tp_time = self.tp_time or now
@ -260,13 +249,11 @@ class HttpSrv(object):
t = "looks like the httpserver threadpool died; please make an issue on github and tell me the story of how you pulled that off, thanks and dog bless\n"
self.log(self.name, t, 1)
thr = threading.Thread(
target=self.thr_client,
args=(sck, addr),
name="httpconn-{}-{}".format(addr[0].split(".", 2)[-1][-6:], addr[1]),
Daemon(
self.thr_client,
"httpconn-{}-{}".format(addr[0].split(".", 2)[-1][-6:], addr[1]),
(sck, addr),
)
thr.daemon = True
thr.start()
def thr_poolw(self) -> None:
assert self.tp_q

View file

@ -8,7 +8,7 @@ from queue import Queue
from .bos import bos
from .sutil import StreamArc, errdesc
from .util import fsenc, min_ex
from .util import Daemon, fsenc, min_ex
try:
from typing import Any, Generator, Optional
@ -60,9 +60,7 @@ class StreamTar(StreamArc):
fmt = tarfile.GNU_FORMAT
self.tar = tarfile.open(fileobj=self.qfile, mode="w|", format=fmt) # type: ignore
w = threading.Thread(target=self._gen, name="star-gen")
w.daemon = True
w.start()
Daemon(self._gen, "star-gen")
def gen(self) -> Generator[Optional[bytes], None, None]:
try:

View file

@ -35,6 +35,7 @@ from .th_srv import HAVE_PIL, HAVE_VIPS, HAVE_WEBP, ThumbSrv
from .up2k import Up2k
from .util import (
VERSIONS,
Daemon,
HMaccas,
alltrace,
ansi_re,
@ -232,9 +233,7 @@ class SvcHub(object):
self.up2k.init_vols()
thr = threading.Thread(target=self.sd_notify, name="sd-notify")
thr.daemon = True
thr.start()
Daemon(self.sd_notify, "sd-notify")
def _logname(self) -> str:
dt = datetime.utcnow()
@ -285,9 +284,7 @@ class SvcHub(object):
def run(self) -> None:
self.tcpsrv.run()
thr = threading.Thread(target=self.thr_httpsrv_up, name="sig-hsrv-up2")
thr.daemon = True
thr.start()
Daemon(self.thr_httpsrv_up, "sig-hsrv-up2")
sigs = [signal.SIGINT, signal.SIGTERM]
if not ANYWIN:
@ -302,9 +299,7 @@ class SvcHub(object):
# never lucky
if ANYWIN:
# msys-python probably fine but >msys-python
thr = threading.Thread(target=self.stop_thr, name="svchub-sig")
thr.daemon = True
thr.start()
Daemon(self.stop_thr, "svchub-sig")
try:
while not self.stop_req:
@ -324,9 +319,7 @@ class SvcHub(object):
return "cannot reload; already in progress"
self.reloading = True
t = threading.Thread(target=self._reload, name="reloading")
t.daemon = True
t.start()
Daemon(self._reload, "reloading")
return "reload initiated"
def _reload(self) -> None:

View file

@ -14,7 +14,17 @@ from queue import Queue
from .__init__ import TYPE_CHECKING
from .bos import bos
from .mtag import HAVE_FFMPEG, HAVE_FFPROBE, ffprobe
from .util import BytesIO, Cooldown, Pebkac, fsenc, min_ex, runcmd, statdir, vsplit
from .util import (
BytesIO,
Cooldown,
Daemon,
Pebkac,
fsenc,
min_ex,
runcmd,
statdir,
vsplit,
)
try:
from typing import Optional, Union
@ -106,11 +116,7 @@ class ThumbSrv(object):
self.q: Queue[Optional[tuple[str, str]]] = Queue(self.nthr * 4)
for n in range(self.nthr):
thr = threading.Thread(
target=self.worker, name="thumb-{}-{}".format(n, self.nthr)
)
thr.daemon = True
thr.start()
Daemon(self.worker, "thumb-{}-{}".format(n, self.nthr))
want_ff = not self.args.no_vthumb or not self.args.no_athumb
if want_ff and (not HAVE_FFMPEG or not HAVE_FFPROBE):
@ -126,9 +132,7 @@ class ThumbSrv(object):
self.log(msg, c=3)
if self.args.th_clean:
t = threading.Thread(target=self.cleaner, name="thumb.cln")
t.daemon = True
t.start()
Daemon(self.cleaner, "thumb.cln")
self.fmt_pil, self.fmt_vips, self.fmt_ffi, self.fmt_ffv, self.fmt_ffa = [
set(y.split(","))

View file

@ -11,7 +11,16 @@ from operator import itemgetter
from .__init__ import ANYWIN, TYPE_CHECKING, unicode
from .bos import bos
from .up2k import up2k_wark_from_hashlist
from .util import HAVE_SQLITE3, Pebkac, absreal, gen_filekey, min_ex, quotep, s3dec
from .util import (
HAVE_SQLITE3,
Daemon,
Pebkac,
absreal,
gen_filekey,
min_ex,
quotep,
s3dec,
)
if HAVE_SQLITE3:
import sqlite3
@ -270,16 +279,7 @@ class U2idx(object):
self.active_id = "{:.6f}_{}".format(
time.time(), threading.current_thread().ident
)
thr = threading.Thread(
target=self.terminator,
args=(
self.active_id,
done_flag,
),
name="u2idx-terminator",
)
thr.daemon = True
thr.start()
Daemon(self.terminator, "u2idx-terminator", (self.active_id, done_flag))
if not uq or not uv:
uq = "select * from up"

View file

@ -29,6 +29,7 @@ from .mtag import MParser, MTag
from .util import (
HAVE_SQLITE3,
SYMTIME,
Daemon,
MTHash,
Pebkac,
ProgressPrinter,
@ -153,9 +154,7 @@ class Up2k(object):
if ANYWIN:
# usually fails to set lastmod too quickly
self.lastmod_q: list[tuple[str, int, tuple[int, int], bool]] = []
thr = threading.Thread(target=self._lastmodder, name="up2k-lastmod")
thr.daemon = True
thr.start()
Daemon(self._lastmodder, "up2k-lastmod")
self.fstab = Fstab(self.log_func)
@ -171,9 +170,7 @@ class Up2k(object):
if self.args.no_fastboot:
return
t = threading.Thread(target=self.deferred_init, name="up2k-deferred-init")
t.daemon = True
t.start()
Daemon(self.deferred_init, "up2k-deferred-init")
def reload(self) -> None:
self.gid += 1
@ -188,32 +185,21 @@ class Up2k(object):
if not self.pp and self.args.exit == "idx":
return self.hub.sigterm()
thr = threading.Thread(target=self._snapshot, name="up2k-snapshot")
thr.daemon = True
thr.start()
Daemon(self._snapshot, "up2k-snapshot")
if have_e2d:
thr = threading.Thread(target=self._hasher, name="up2k-hasher")
thr.daemon = True
thr.start()
thr = threading.Thread(target=self._sched_rescan, name="up2k-rescan")
thr.daemon = True
thr.start()
Daemon(self._hasher, "up2k-hasher")
Daemon(self._sched_rescan, "up2k-rescan")
if self.mtag:
for n in range(max(1, self.args.mtag_mt)):
name = "tagger-{}".format(n)
thr = threading.Thread(target=self._tagger, name=name)
thr.daemon = True
thr.start()
Daemon(self._tagger, "tagger-{}".format(n))
thr = threading.Thread(target=self._run_all_mtp, name="up2k-mtp-init")
thr.daemon = True
thr.start()
Daemon(self._run_all_mtp, "up2k-mtp-init")
def log(self, msg: str, c: Union[int, str] = 0) -> None:
self.log_func("up2k", msg + "\033[K", c)
if self.pp:
msg += "\033[K"
self.log_func("up2k", msg, c)
def _block(self, why: str) -> None:
self.blocked = why
@ -256,13 +242,11 @@ class Up2k(object):
return "cannot initiate; scan is already in progress"
args = (all_vols, scan_vols)
t = threading.Thread(
target=self.init_indexes,
args=args,
name="up2k-rescan-{}".format(scan_vols[0] if scan_vols else "all"),
Daemon(
self.init_indexes,
"up2k-rescan-{}".format(scan_vols[0] if scan_vols else "all"),
args,
)
t.daemon = True
t.start()
return ""
def _sched_rescan(self) -> None:
@ -581,7 +565,7 @@ class Up2k(object):
if self.mtag:
t = "online (running mtp)"
if scan_vols:
thr = threading.Thread(target=self._run_all_mtp, name="up2k-mtp-scan")
thr = Daemon(self._run_all_mtp, "up2k-mtp-scan", r=False)
else:
self.pp = None
t = "online, idle"
@ -590,7 +574,6 @@ class Up2k(object):
self.volstate[vol.vpath] = t
if thr:
thr.daemon = True
thr.start()
return have_e2d
@ -1622,11 +1605,7 @@ class Up2k(object):
mpool: Queue[Mpqe] = Queue(nw)
for _ in range(nw):
thr = threading.Thread(
target=self._tag_thr, args=(mpool,), name="up2k-mpool"
)
thr.daemon = True
thr.start()
Daemon(self._tag_thr, "up2k-mpool", (mpool,))
return mpool

View file

@ -340,6 +340,20 @@ _: Any = (mp, BytesIO, quote, unquote, SQLITE_VER, JINJA_VER, PYFTPD_VER)
__all__ = ["mp", "BytesIO", "quote", "unquote", "SQLITE_VER", "JINJA_VER", "PYFTPD_VER"]
class Daemon(threading.Thread):
def __init__(
self,
target: Any,
name: Optional[str] = None,
a: Iterable[Any] = None,
r=True,
) -> None:
threading.Thread.__init__(self, target=target, name=name, args=a or ())
self.daemon = True
if r:
self.start()
class Cooldown(object):
def __init__(self, maxage: float) -> None:
self.maxage = maxage
@ -569,9 +583,7 @@ class MTHash(object):
self.done_q: Queue[tuple[int, str, int, int]] = Queue()
self.thrs = []
for n in range(cores):
t = threading.Thread(target=self.worker, name="mth-" + str(n))
t.daemon = True
t.start()
t = Daemon(self.worker, "mth-" + str(n))
self.thrs.append(t)
def hash(
@ -884,13 +896,7 @@ def start_stackmon(arg_str: str, nid: int) -> None:
suffix = "-{}".format(nid) if nid else ""
fp, f = arg_str.rsplit(",", 1)
zi = int(f)
t = threading.Thread(
target=stackmon,
args=(fp, zi, suffix),
name="stackmon" + suffix,
)
t.daemon = True
t.start()
Daemon(stackmon, "stackmon" + suffix, (fp, zi, suffix))
def stackmon(fp: str, ival: float, suffix: str) -> None:
@ -943,13 +949,7 @@ def start_log_thrs(
tname = "logthr-n{}-i{:x}".format(nid, os.getpid())
lname = tname[3:]
t = threading.Thread(
target=log_thrs,
args=(logger, ival, lname),
name=tname,
)
t.daemon = True
t.start()
Daemon(log_thrs, tname, (logger, ival, lname))
def log_thrs(log: Callable[[str, str, int], None], ival: float, name: str) -> None:
@ -1673,10 +1673,7 @@ def db_ex_chk(log: "NamedLogger", ex: Exception, db_path: str) -> bool:
if str(ex) != "database is locked":
return False
thr = threading.Thread(target=lsof, args=(log, db_path), name="dbex")
thr.daemon = True
thr.start()
Daemon(lsof, "dbex", (log, db_path))
return True