diff --git a/copyparty/broker_mp.py b/copyparty/broker_mp.py index ebbbeca3..df73658d 100644 --- a/copyparty/broker_mp.py +++ b/copyparty/broker_mp.py @@ -63,7 +63,7 @@ class BrokerMp(object): procs.pop() def reload(self): - self.log("broker", "forwarding reload event") + self.log("broker", "reloading") for _, proc in enumerate(self.procs): proc.q_pend.put([0, "reload", []]) diff --git a/copyparty/broker_mpw.py b/copyparty/broker_mpw.py index 5571a5ba..88e957ef 100644 --- a/copyparty/broker_mpw.py +++ b/copyparty/broker_mpw.py @@ -70,8 +70,9 @@ class MpWorker(object): return elif dest == "reload": - self.logw("mpw reloading") + self.logw("mpw.asrv reloading") self.asrv.reload() + self.logw("mpw.asrv reloaded") elif dest == "listen": self.httpsrv.listen(args[0], args[1]) diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index bc23d647..80f4674e 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -1714,7 +1714,7 @@ class HttpCli(object): vn, _ = self.asrv.vfs.get(self.vpath, self.uname, True, True) - args = [self.asrv.vfs.all_vols, [vn.vpath]] + args = [self.asrv.vfs.all_vols, [vn.vpath], False] x = self.conn.hsrv.broker.put(True, "up2k.rescan", *args) x = x.get() diff --git a/copyparty/httpconn.py b/copyparty/httpconn.py index 7967e481..bc44011d 100644 --- a/copyparty/httpconn.py +++ b/copyparty/httpconn.py @@ -39,7 +39,7 @@ class HttpConn(object): self.u2fh = hsrv.u2fh enth = HAVE_PIL and not self.args.no_thumb - self.thumbcli = ThumbCli(hsrv.broker) if enth else None + self.thumbcli = ThumbCli(hsrv) if enth else None self.ico = Ico(self.args) self.t0 = time.time() diff --git a/copyparty/svchub.py b/copyparty/svchub.py index e79dbb6b..2c5620a3 100644 --- a/copyparty/svchub.py +++ b/copyparty/svchub.py @@ -235,6 +235,7 @@ class SvcHub(object): self.log("root", "reload scheduled") with self.up2k.mutex: self.asrv.reload() + self.up2k.reload() self.broker.reload() self.reloading = False @@ -244,8 +245,10 @@ class SvcHub(object): with self.stop_cond: self.stop_cond.wait(9001) - if self.reload_req and not self.reloading: - self.reload() + if self.reload_req: + self.reload_req = False + if not self.reloading: + self.reload() self.shutdown() diff --git a/copyparty/th_cli.py b/copyparty/th_cli.py index 8bb395d5..4b9b6880 100644 --- a/copyparty/th_cli.py +++ b/copyparty/th_cli.py @@ -9,14 +9,18 @@ from .bos import bos class ThumbCli(object): - def __init__(self, broker): - self.broker = broker - self.args = broker.args - self.asrv = broker.asrv + def __init__(self, hsrv): + self.broker = hsrv.broker + self.log_func = hsrv.log + self.args = hsrv.args + self.asrv = hsrv.asrv # cache on both sides for less broker spam self.cooldown = Cooldown(self.args.th_poke) + def log(self, msg, c=0): + self.log_func("thumbcli", msg, c) + def get(self, ptop, rem, mtime, fmt): ext = rem.rsplit(".")[-1].lower() if ext not in THUMBABLE: @@ -48,7 +52,11 @@ class ThumbCli(object): if self.args.th_no_webp or ((is_vid or is_au) and self.args.th_ff_jpg): fmt = "j" - histpath = self.asrv.vfs.histtab[ptop] + histpath = self.asrv.vfs.histtab.get(ptop) + if not histpath: + self.log("no histpath for [{}]".format(ptop)) + return None + tpath = thumb_path(histpath, rem, mtime, fmt) ret = None try: diff --git a/copyparty/th_srv.py b/copyparty/th_srv.py index ce22aae5..7fc4c19a 100644 --- a/copyparty/th_srv.py +++ b/copyparty/th_srv.py @@ -154,7 +154,11 @@ class ThumbSrv(object): return not self.nthr def get(self, ptop, rem, mtime, fmt): - histpath = self.asrv.vfs.histtab[ptop] + histpath = self.asrv.vfs.histtab.get(ptop) + if not histpath: + self.log("no histpath for [{}]".format(ptop)) + return None + tpath = thumb_path(histpath, rem, mtime, fmt) abspath = os.path.join(ptop, rem) cond = threading.Condition(self.mutex) diff --git a/copyparty/u2idx.py b/copyparty/u2idx.py index e535aaa2..4d0c6f8e 100644 --- a/copyparty/u2idx.py +++ b/copyparty/u2idx.py @@ -67,7 +67,11 @@ class U2idx(object): if cur: return cur - histpath = self.asrv.vfs.histtab[ptop] + histpath = self.asrv.vfs.histtab.get(ptop) + if not histpath: + self.log("no histpath for [{}]".format(ptop)) + return None + db_path = os.path.join(histpath, "up2k.db") if not bos.path.exists(db_path): return None diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 5df6addd..729e1713 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -68,6 +68,7 @@ class Up2k(object): self.tagq = Queue() self.n_hashq = 0 self.n_tagq = 0 + self.gid = 0 self.volstate = {} self.need_rescan = {} self.dupesched = {} @@ -114,6 +115,12 @@ class Up2k(object): t.daemon = True t.start() + def reload(self): + self.gid += 1 + self.log("reload #{} initiated".format(self.gid)) + all_vols = self.asrv.vfs.all_vols + self.rescan(all_vols, list(all_vols.keys()), True) + def deferred_init(self): all_vols = self.asrv.vfs.all_vols have_e2d = self.init_indexes(all_vols) @@ -168,15 +175,15 @@ class Up2k(object): } return json.dumps(ret, indent=4) - def rescan(self, all_vols, scan_vols): - if hasattr(self, "pp"): + def rescan(self, all_vols, scan_vols, wait): + if not wait and hasattr(self, "pp"): return "cannot initiate; scan is already in progress" args = (all_vols, scan_vols) t = threading.Thread( target=self.init_indexes, args=args, - name="up2k-rescan-{}".format(scan_vols[0]), + name="up2k-rescan-{}".format(scan_vols[0] if scan_vols else "all"), ) t.daemon = True t.start() @@ -196,6 +203,10 @@ class Up2k(object): if now < cooldown: continue + if hasattr(self, "pp"): + cooldown = now + 5 + continue + timeout = now + 9001 with self.mutex: for vp, vol in sorted(self.asrv.vfs.all_vols.items()): @@ -217,7 +228,7 @@ class Up2k(object): if vols: cooldown = now + 10 - err = self.rescan(self.asrv.vfs.all_vols, vols) + err = self.rescan(self.asrv.vfs.all_vols, vols, False) if err: for v in vols: self.need_rescan[v] = True @@ -299,6 +310,16 @@ class Up2k(object): return True, ret def init_indexes(self, all_vols, scan_vols=None): + gid = self.gid + while hasattr(self, "pp") and gid == self.gid: + time.sleep(0.1) + + if gid != self.gid: + return + + if gid: + self.log("reload #{} running".format(self.gid)) + self.pp = ProgressPrinter() vols = all_vols.values() t0 = time.time() @@ -429,7 +450,11 @@ class Up2k(object): return have_e2d def register_vpath(self, ptop, flags): - histpath = self.asrv.vfs.histtab[ptop] + histpath = self.asrv.vfs.histtab.get(ptop) + if not histpath: + self.log("no histpath for [{}]".format(ptop)) + return None + db_path = os.path.join(histpath, "up2k.db") if ptop in self.registry: try: @@ -797,10 +822,11 @@ class Up2k(object): return ret def _run_all_mtp(self): + gid = self.gid t0 = time.time() for ptop, flags in self.flags.items(): if "mtp" in flags: - self._run_one_mtp(ptop) + self._run_one_mtp(ptop, gid) td = time.time() - t0 msg = "mtp finished in {:.2f} sec ({})" @@ -811,7 +837,10 @@ class Up2k(object): if "OFFLINE" not in self.volstate[k]: self.volstate[k] = "online, idle" - def _run_one_mtp(self, ptop): + def _run_one_mtp(self, ptop, gid): + if gid != self.gid: + return + entags = self.entags[ptop] parsers = {} @@ -844,6 +873,9 @@ class Up2k(object): in_progress = {} while True: with self.mutex: + if gid != self.gid: + break + q = "select w from mt where k = 't:mtp' limit ?" warks = cur.execute(q, (batch_sz,)).fetchall() warks = [x[0] for x in warks] @@ -1960,7 +1992,8 @@ class Up2k(object): self.snap_prev = {} while True: time.sleep(self.snap_persist_interval) - self.do_snapshot() + if not hasattr(self, "pp"): + self.do_snapshot() def do_snapshot(self): with self.mutex: