fix race in config reloader

nothing dangerous, just confusing log messages if an
admin hammers the reload button 100+ times per second,
or another linux process rapidly sends SIGUSR1
This commit is contained in:
ed 2024-02-28 20:08:20 +00:00
parent 8413ed6d1f
commit 096de50889
2 changed files with 46 additions and 24 deletions

View file

@ -94,7 +94,7 @@ class SvcHub(object):
self.stopping = False self.stopping = False
self.stopped = False self.stopped = False
self.reload_req = False self.reload_req = False
self.reloading = False self.reloading = 0
self.stop_cond = threading.Condition() self.stop_cond = threading.Condition()
self.nsigs = 3 self.nsigs = 3
self.retcode = 0 self.retcode = 0
@ -662,21 +662,24 @@ class SvcHub(object):
self.log("root", "ssdp startup failed;\n" + min_ex(), 3) self.log("root", "ssdp startup failed;\n" + min_ex(), 3)
def reload(self) -> str: def reload(self) -> str:
if self.reloading: with self.up2k.mutex:
return "cannot reload; already in progress" if self.reloading:
return "cannot reload; already in progress"
self.reloading = 1
self.reloading = True
Daemon(self._reload, "reloading") Daemon(self._reload, "reloading")
return "reload initiated" return "reload initiated"
def _reload(self) -> None: def _reload(self) -> None:
self.log("root", "reload scheduled")
with self.up2k.mutex: with self.up2k.mutex:
if self.reloading != 1:
return
self.reloading = 2
self.log("root", "reloading config")
self.asrv.reload() self.asrv.reload()
self.up2k.reload() self.up2k.reload()
self.broker.reload() self.broker.reload()
self.reloading = 0
self.reloading = False
def stop_thr(self) -> None: def stop_thr(self) -> None:
while not self.stop_req: while not self.stop_req:

View file

@ -200,10 +200,10 @@ class Up2k(object):
Daemon(self.deferred_init, "up2k-deferred-init") Daemon(self.deferred_init, "up2k-deferred-init")
def reload(self) -> None: def reload(self) -> None:
self.gid += 1 """mutex me"""
self.log("reload #{} initiated".format(self.gid)) self.log("reload #{} scheduled".format(self.gid + 1))
all_vols = self.asrv.vfs.all_vols all_vols = self.asrv.vfs.all_vols
self.rescan(all_vols, list(all_vols.keys()), True, False) self._rescan(all_vols, list(all_vols.keys()), True, False)
def deferred_init(self) -> None: def deferred_init(self) -> None:
all_vols = self.asrv.vfs.all_vols all_vols = self.asrv.vfs.all_vols
@ -232,7 +232,7 @@ class Up2k(object):
for n in range(max(1, self.args.mtag_mt)): for n in range(max(1, self.args.mtag_mt)):
Daemon(self._tagger, "tagger-{}".format(n)) Daemon(self._tagger, "tagger-{}".format(n))
Daemon(self._run_all_mtp, "up2k-mtp-init") Daemon(self._run_all_mtp, "up2k-mtp-init", (self.gid,))
def log(self, msg: str, c: Union[int, str] = 0) -> None: def log(self, msg: str, c: Union[int, str] = 0) -> None:
if self.pp: if self.pp:
@ -337,14 +337,21 @@ class Up2k(object):
def rescan( def rescan(
self, all_vols: dict[str, VFS], scan_vols: list[str], wait: bool, fscan: bool self, all_vols: dict[str, VFS], scan_vols: list[str], wait: bool, fscan: bool
) -> str: ) -> str:
with self.mutex:
return self._rescan(all_vols, scan_vols, wait, fscan)
def _rescan(
self, all_vols: dict[str, VFS], scan_vols: list[str], wait: bool, fscan: bool
) -> str:
"""mutex me"""
if not wait and self.pp: if not wait and self.pp:
return "cannot initiate; scan is already in progress" return "cannot initiate; scan is already in progress"
args = (all_vols, scan_vols, fscan) self.gid += 1
Daemon( Daemon(
self.init_indexes, self.init_indexes,
"up2k-rescan-{}".format(scan_vols[0] if scan_vols else "all"), "up2k-rescan-{}".format(scan_vols[0] if scan_vols else "all"),
args, (all_vols, scan_vols, fscan, self.gid),
) )
return "" return ""
@ -575,19 +582,32 @@ class Up2k(object):
return True, ret return True, ret
def init_indexes( def init_indexes(
self, all_vols: dict[str, VFS], scan_vols: list[str], fscan: bool self, all_vols: dict[str, VFS], scan_vols: list[str], fscan: bool, gid: int = 0
) -> bool: ) -> bool:
gid = self.gid if not gid:
while self.pp and gid == self.gid: with self.mutex:
time.sleep(0.1) gid = self.gid
if gid != self.gid: nspin = 0
return False while True:
nspin += 1
if nspin > 1:
time.sleep(0.1)
with self.mutex:
if gid != self.gid:
return False
if self.pp:
continue
self.pp = ProgressPrinter(self.log, self.args)
break
if gid: if gid:
self.log("reload #{} running".format(self.gid)) self.log("reload #%d running" % (gid,))
self.pp = ProgressPrinter(self.log, self.args)
vols = list(all_vols.values()) vols = list(all_vols.values())
t0 = time.time() t0 = time.time()
have_e2d = False have_e2d = False
@ -775,7 +795,7 @@ class Up2k(object):
if self.mtag: if self.mtag:
t = "online (running mtp)" t = "online (running mtp)"
if scan_vols: if scan_vols:
thr = Daemon(self._run_all_mtp, "up2k-mtp-scan", r=False) thr = Daemon(self._run_all_mtp, "up2k-mtp-scan", (gid,), r=False)
else: else:
self.pp = None self.pp = None
t = "online, idle" t = "online, idle"
@ -1809,8 +1829,7 @@ class Up2k(object):
self.pending_tags = [] self.pending_tags = []
return ret return ret
def _run_all_mtp(self) -> None: def _run_all_mtp(self, gid: int) -> None:
gid = self.gid
t0 = time.time() t0 = time.time()
for ptop, flags in self.flags.items(): for ptop, flags in self.flags.items():
if "mtp" in flags: if "mtp" in flags: