diff --git a/copyparty/svchub.py b/copyparty/svchub.py index 8bf3c645..008cf5eb 100644 --- a/copyparty/svchub.py +++ b/copyparty/svchub.py @@ -94,7 +94,7 @@ class SvcHub(object): self.stopping = False self.stopped = False self.reload_req = False - self.reloading = False + self.reloading = 0 self.stop_cond = threading.Condition() self.nsigs = 3 self.retcode = 0 @@ -662,21 +662,24 @@ class SvcHub(object): self.log("root", "ssdp startup failed;\n" + min_ex(), 3) def reload(self) -> str: - if self.reloading: - return "cannot reload; already in progress" + with self.up2k.mutex: + if self.reloading: + return "cannot reload; already in progress" + self.reloading = 1 - self.reloading = True Daemon(self._reload, "reloading") return "reload initiated" def _reload(self) -> None: - self.log("root", "reload scheduled") with self.up2k.mutex: + if self.reloading != 1: + return + self.reloading = 2 + self.log("root", "reloading config") self.asrv.reload() self.up2k.reload() self.broker.reload() - - self.reloading = False + self.reloading = 0 def stop_thr(self) -> None: while not self.stop_req: diff --git a/copyparty/up2k.py b/copyparty/up2k.py index af834394..9abcfd7a 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -200,10 +200,10 @@ class Up2k(object): Daemon(self.deferred_init, "up2k-deferred-init") def reload(self) -> None: - self.gid += 1 - self.log("reload #{} initiated".format(self.gid)) + """mutex me""" + self.log("reload #{} scheduled".format(self.gid + 1)) all_vols = self.asrv.vfs.all_vols - self.rescan(all_vols, list(all_vols.keys()), True, False) + self._rescan(all_vols, list(all_vols.keys()), True, False) def deferred_init(self) -> None: all_vols = self.asrv.vfs.all_vols @@ -232,7 +232,7 @@ class Up2k(object): for n in range(max(1, self.args.mtag_mt)): Daemon(self._tagger, "tagger-{}".format(n)) - Daemon(self._run_all_mtp, "up2k-mtp-init") + Daemon(self._run_all_mtp, "up2k-mtp-init", (self.gid,)) def log(self, msg: str, c: Union[int, str] = 0) -> None: if self.pp: @@ -337,14 +337,21 @@ class Up2k(object): def rescan( self, all_vols: dict[str, VFS], scan_vols: list[str], wait: bool, fscan: bool ) -> str: + with self.mutex: + return self._rescan(all_vols, scan_vols, wait, fscan) + + def _rescan( + self, all_vols: dict[str, VFS], scan_vols: list[str], wait: bool, fscan: bool + ) -> str: + """mutex me""" if not wait and self.pp: return "cannot initiate; scan is already in progress" - args = (all_vols, scan_vols, fscan) + self.gid += 1 Daemon( self.init_indexes, "up2k-rescan-{}".format(scan_vols[0] if scan_vols else "all"), - args, + (all_vols, scan_vols, fscan, self.gid), ) return "" @@ -575,19 +582,32 @@ class Up2k(object): return True, ret def init_indexes( - self, all_vols: dict[str, VFS], scan_vols: list[str], fscan: bool + self, all_vols: dict[str, VFS], scan_vols: list[str], fscan: bool, gid: int = 0 ) -> bool: - gid = self.gid - while self.pp and gid == self.gid: - time.sleep(0.1) + if not gid: + with self.mutex: + gid = self.gid - if gid != self.gid: - return False + nspin = 0 + while True: + nspin += 1 + if nspin > 1: + time.sleep(0.1) + + with self.mutex: + if gid != self.gid: + return False + + if self.pp: + continue + + self.pp = ProgressPrinter(self.log, self.args) + + break if gid: - self.log("reload #{} running".format(self.gid)) + self.log("reload #%d running" % (gid,)) - self.pp = ProgressPrinter(self.log, self.args) vols = list(all_vols.values()) t0 = time.time() have_e2d = False @@ -775,7 +795,7 @@ class Up2k(object): if self.mtag: t = "online (running mtp)" if scan_vols: - thr = Daemon(self._run_all_mtp, "up2k-mtp-scan", r=False) + thr = Daemon(self._run_all_mtp, "up2k-mtp-scan", (gid,), r=False) else: self.pp = None t = "online, idle" @@ -1809,8 +1829,7 @@ class Up2k(object): self.pending_tags = [] return ret - def _run_all_mtp(self) -> None: - gid = self.gid + def _run_all_mtp(self, gid: int) -> None: t0 = time.time() for ptop, flags in self.flags.items(): if "mtp" in flags: