rescan volumes on sigusr1

This commit is contained in:
ed 2021-11-06 18:20:31 +01:00
parent 73baebbd16
commit f050647b43
9 changed files with 74 additions and 21 deletions

View file

@ -63,7 +63,7 @@ class BrokerMp(object):
procs.pop()
def reload(self):
self.log("broker", "forwarding reload event")
self.log("broker", "reloading")
for _, proc in enumerate(self.procs):
proc.q_pend.put([0, "reload", []])

View file

@ -70,8 +70,9 @@ class MpWorker(object):
return
elif dest == "reload":
self.logw("mpw reloading")
self.logw("mpw.asrv reloading")
self.asrv.reload()
self.logw("mpw.asrv reloaded")
elif dest == "listen":
self.httpsrv.listen(args[0], args[1])

View file

@ -1714,7 +1714,7 @@ class HttpCli(object):
vn, _ = self.asrv.vfs.get(self.vpath, self.uname, True, True)
args = [self.asrv.vfs.all_vols, [vn.vpath]]
args = [self.asrv.vfs.all_vols, [vn.vpath], False]
x = self.conn.hsrv.broker.put(True, "up2k.rescan", *args)
x = x.get()

View file

@ -39,7 +39,7 @@ class HttpConn(object):
self.u2fh = hsrv.u2fh
enth = HAVE_PIL and not self.args.no_thumb
self.thumbcli = ThumbCli(hsrv.broker) if enth else None
self.thumbcli = ThumbCli(hsrv) if enth else None
self.ico = Ico(self.args)
self.t0 = time.time()

View file

@ -235,6 +235,7 @@ class SvcHub(object):
self.log("root", "reload scheduled")
with self.up2k.mutex:
self.asrv.reload()
self.up2k.reload()
self.broker.reload()
self.reloading = False
@ -244,8 +245,10 @@ class SvcHub(object):
with self.stop_cond:
self.stop_cond.wait(9001)
if self.reload_req and not self.reloading:
self.reload()
if self.reload_req:
self.reload_req = False
if not self.reloading:
self.reload()
self.shutdown()

View file

@ -9,14 +9,18 @@ from .bos import bos
class ThumbCli(object):
def __init__(self, broker):
self.broker = broker
self.args = broker.args
self.asrv = broker.asrv
def __init__(self, hsrv):
self.broker = hsrv.broker
self.log_func = hsrv.log
self.args = hsrv.args
self.asrv = hsrv.asrv
# cache on both sides for less broker spam
self.cooldown = Cooldown(self.args.th_poke)
def log(self, msg, c=0):
self.log_func("thumbcli", msg, c)
def get(self, ptop, rem, mtime, fmt):
ext = rem.rsplit(".")[-1].lower()
if ext not in THUMBABLE:
@ -48,7 +52,11 @@ class ThumbCli(object):
if self.args.th_no_webp or ((is_vid or is_au) and self.args.th_ff_jpg):
fmt = "j"
histpath = self.asrv.vfs.histtab[ptop]
histpath = self.asrv.vfs.histtab.get(ptop)
if not histpath:
self.log("no histpath for [{}]".format(ptop))
return None
tpath = thumb_path(histpath, rem, mtime, fmt)
ret = None
try:

View file

@ -154,7 +154,11 @@ class ThumbSrv(object):
return not self.nthr
def get(self, ptop, rem, mtime, fmt):
histpath = self.asrv.vfs.histtab[ptop]
histpath = self.asrv.vfs.histtab.get(ptop)
if not histpath:
self.log("no histpath for [{}]".format(ptop))
return None
tpath = thumb_path(histpath, rem, mtime, fmt)
abspath = os.path.join(ptop, rem)
cond = threading.Condition(self.mutex)

View file

@ -67,7 +67,11 @@ class U2idx(object):
if cur:
return cur
histpath = self.asrv.vfs.histtab[ptop]
histpath = self.asrv.vfs.histtab.get(ptop)
if not histpath:
self.log("no histpath for [{}]".format(ptop))
return None
db_path = os.path.join(histpath, "up2k.db")
if not bos.path.exists(db_path):
return None

View file

@ -68,6 +68,7 @@ class Up2k(object):
self.tagq = Queue()
self.n_hashq = 0
self.n_tagq = 0
self.gid = 0
self.volstate = {}
self.need_rescan = {}
self.dupesched = {}
@ -114,6 +115,12 @@ class Up2k(object):
t.daemon = True
t.start()
def reload(self):
self.gid += 1
self.log("reload #{} initiated".format(self.gid))
all_vols = self.asrv.vfs.all_vols
self.rescan(all_vols, list(all_vols.keys()), True)
def deferred_init(self):
all_vols = self.asrv.vfs.all_vols
have_e2d = self.init_indexes(all_vols)
@ -168,15 +175,15 @@ class Up2k(object):
}
return json.dumps(ret, indent=4)
def rescan(self, all_vols, scan_vols):
if hasattr(self, "pp"):
def rescan(self, all_vols, scan_vols, wait):
if not wait and hasattr(self, "pp"):
return "cannot initiate; scan is already in progress"
args = (all_vols, scan_vols)
t = threading.Thread(
target=self.init_indexes,
args=args,
name="up2k-rescan-{}".format(scan_vols[0]),
name="up2k-rescan-{}".format(scan_vols[0] if scan_vols else "all"),
)
t.daemon = True
t.start()
@ -196,6 +203,10 @@ class Up2k(object):
if now < cooldown:
continue
if hasattr(self, "pp"):
cooldown = now + 5
continue
timeout = now + 9001
with self.mutex:
for vp, vol in sorted(self.asrv.vfs.all_vols.items()):
@ -217,7 +228,7 @@ class Up2k(object):
if vols:
cooldown = now + 10
err = self.rescan(self.asrv.vfs.all_vols, vols)
err = self.rescan(self.asrv.vfs.all_vols, vols, False)
if err:
for v in vols:
self.need_rescan[v] = True
@ -299,6 +310,16 @@ class Up2k(object):
return True, ret
def init_indexes(self, all_vols, scan_vols=None):
gid = self.gid
while hasattr(self, "pp") and gid == self.gid:
time.sleep(0.1)
if gid != self.gid:
return
if gid:
self.log("reload #{} running".format(self.gid))
self.pp = ProgressPrinter()
vols = all_vols.values()
t0 = time.time()
@ -429,7 +450,11 @@ class Up2k(object):
return have_e2d
def register_vpath(self, ptop, flags):
histpath = self.asrv.vfs.histtab[ptop]
histpath = self.asrv.vfs.histtab.get(ptop)
if not histpath:
self.log("no histpath for [{}]".format(ptop))
return None
db_path = os.path.join(histpath, "up2k.db")
if ptop in self.registry:
try:
@ -797,10 +822,11 @@ class Up2k(object):
return ret
def _run_all_mtp(self):
gid = self.gid
t0 = time.time()
for ptop, flags in self.flags.items():
if "mtp" in flags:
self._run_one_mtp(ptop)
self._run_one_mtp(ptop, gid)
td = time.time() - t0
msg = "mtp finished in {:.2f} sec ({})"
@ -811,7 +837,10 @@ class Up2k(object):
if "OFFLINE" not in self.volstate[k]:
self.volstate[k] = "online, idle"
def _run_one_mtp(self, ptop):
def _run_one_mtp(self, ptop, gid):
if gid != self.gid:
return
entags = self.entags[ptop]
parsers = {}
@ -844,6 +873,9 @@ class Up2k(object):
in_progress = {}
while True:
with self.mutex:
if gid != self.gid:
break
q = "select w from mt where k = 't:mtp' limit ?"
warks = cur.execute(q, (batch_sz,)).fetchall()
warks = [x[0] for x in warks]
@ -1960,7 +1992,8 @@ class Up2k(object):
self.snap_prev = {}
while True:
time.sleep(self.snap_persist_interval)
self.do_snapshot()
if not hasattr(self, "pp"):
self.do_snapshot()
def do_snapshot(self):
with self.mutex: