From 980c6fc81072c04f6ab9e65320d7814e04526f60 Mon Sep 17 00:00:00 2001 From: ed Date: Mon, 26 Jul 2021 02:34:56 +0200 Subject: [PATCH] add scheduled rescans + fix mv bugs --- copyparty/__main__.py | 2 + copyparty/authsrv.py | 4 +- copyparty/up2k.py | 108 +++++++++++++++++++++++++++++++----------- 3 files changed, 84 insertions(+), 30 deletions(-) diff --git a/copyparty/__main__.py b/copyparty/__main__.py index b64515e2..adabb1e2 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -324,6 +324,8 @@ def run_argparse(argv, formatter): ap2.add_argument("--no-mutagen", action="store_true", help="use FFprobe for tags instead") ap2.add_argument("--no-mtag-mt", action="store_true", help="disable tag-read parallelism") ap2.add_argument("--no-mtag-ff", action="store_true", help="never use FFprobe as tag reader") + ap2.add_argument("--re-int", metavar="SEC", type=int, default=30, help="disk rescan check interval") + ap2.add_argument("--re-maxage", metavar="SEC", type=int, default=0, help="disk rescan volume interval (0=off)") ap2.add_argument("-mtm", metavar="M=t,t,t", type=u, action="append", help="add/replace metadata mapping") ap2.add_argument("-mte", metavar="M,M,M", type=u, help="tags to index/display (comma-sep.)", default="circle,album,.tn,artist,title,.bpm,key,.dur,.q,.vq,.aq,ac,vc,res,.fps") diff --git a/copyparty/authsrv.py b/copyparty/authsrv.py index bebf4e1f..d7d58999 100644 --- a/copyparty/authsrv.py +++ b/copyparty/authsrv.py @@ -180,13 +180,13 @@ class VFS(object): vrem = "/".join([x for x in vrem if x]) return dbv, vrem - def canonical(self, rem): + def canonical(self, rem, resolve=True): """returns the canonical path (fully-resolved absolute fs path)""" rp = self.realpath if rem: rp += "/" + rem - return absreal(rp) + return absreal(rp) if resolve else rp def ls(self, rem, uname, scandir, permsets, lstat=False): # type: (str, str, bool, list[list[bool]], bool) -> tuple[str, str, dict[str, VFS]] diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 786ddb4a..9f739b7b 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -64,6 +64,7 @@ class Up2k(object): self.n_hashq = 0 self.n_tagq = 0 self.volstate = {} + self.need_rescan = {} self.registry = {} self.entags = {} self.flags = {} @@ -121,6 +122,10 @@ class Up2k(object): thr.daemon = True thr.start() + thr = threading.Thread(target=self._sched_rescan, name="up2k-rescan") + thr.daemon = True + thr.start() + if self.mtag: thr = threading.Thread(target=self._tagger, name="up2k-tagger") thr.daemon = True @@ -170,6 +175,38 @@ class Up2k(object): t.start() return None + def _sched_rescan(self): + maxage = self.args.re_maxage + volage = {} + while True: + time.sleep(self.args.re_int) + now = time.time() + vpaths = list(sorted(self.asrv.vfs.all_vols.keys())) + with self.mutex: + if maxage: + for vp in vpaths: + if vp not in volage: + volage[vp] = now + + if now - volage[vp] >= maxage: + self.need_rescan[vp] = 1 + + if not self.need_rescan: + continue + + vols = list(sorted(self.need_rescan.keys())) + self.need_rescan = {} + + err = self.rescan(self.asrv.vfs.all_vols, vols) + if err: + for v in vols: + self.need_rescan[v] = True + + continue + + for v in vols: + volage[v] = now + def _vis_job_progress(self, job): perc = 100 - (len(job["need"]) * 100.0 / len(job["hash"])) path = os.path.join(job["ptop"], job["prel"], job["name"]) @@ -1301,7 +1338,7 @@ class Up2k(object): with self.mutex: try: ptop = dbv.realpath - cur, wark = self._find_from_vpath(ptop, vrem) + cur, wark, _, _ = self._find_from_vpath(ptop, vrem) self._forget_file(ptop, vpath, cur, wark) finally: cur.connection.commit() @@ -1321,7 +1358,7 @@ class Up2k(object): def handle_mv(self, uname, svp, dvp): svn, srem = self.asrv.vfs.get(svp, uname, True, False, True) svn, srem = svn.get_dbv(srem) - sabs = svn.canonical(srem) + sabs = svn.canonical(srem, False) if not srem: raise Pebkac(400, "mv: cannot move a mountpoint") @@ -1334,7 +1371,7 @@ class Up2k(object): permsets = [[True, False, True]] scandir = not self.args.no_scandir - # following symlinks is too scary, TODO schedule rescans as needed + # following symlinks is too scary g = svn.walk("", srem, [], uname, permsets, True, scandir, True) for dbv, vrem, _, atop, files, rd, vd in g: if dbv != jail: @@ -1356,7 +1393,7 @@ class Up2k(object): dvpf = dvp + svpf[len(svp) :] self._mv_file(uname, svpf, dvpf) - + for d in list(dirs.keys()) + [sabs]: try: bos.rmdir(d) @@ -1372,17 +1409,33 @@ class Up2k(object): dvn, drem = self.asrv.vfs.get(dvp, uname, False, True) dvn, drem = dvn.get_dbv(drem) - sabs = svn.canonical(srem) + sabs = svn.canonical(srem, False) dabs = dvn.canonical(drem) drd, dfn = vsplit(drem) - st = bos.stat(sabs) if bos.path.exists(dabs): raise Pebkac(400, "mv2: target file exists") - c1, w = self._find_from_vpath(svn.realpath, srem) + if bos.path.islink(sabs): + # following symlinks is too scary, schedule rescan of both vols + self.need_rescan[svn.vpath] = 1 + self.need_rescan[dvn.vpath] = 1 + bos.makedirs(os.path.dirname(dabs)) + bos.rename(sabs, dabs) + return "k" + + c1, w, ftime, fsize = self._find_from_vpath(svn.realpath, srem) c2 = self.cur.get(dvn.realpath) - if c1 and c2: + + if ftime is None: + st = bos.stat(sabs) + ftime = st.st_mtime + fsize = st.st_size + + if not w: + self.log("not found in src db: [{}]".format(svp)) + + if w and c2: q = "select rd, fn from up where substr(w,1,16)=? and w=?" for rd, fn in c2.execute(q, (w[:16], w)): if rd.startswith("//") or fn.startswith("//"): @@ -1394,15 +1447,13 @@ class Up2k(object): # hit is src continue - if bos.path.exists(slabs): - self.log("mv: quick relink, nice") - self._symlink(slabs, dabs) - st = bos.stat(sabs) - self.db_add(c2, w, drd, dfn, st.st_mtime, st.st_size) - bos.unlink(sabs) - else: - self.log("mv: file in db missing? whatever, fixed") - bos.rename(sabs, slabs) + if not bos.path.exists(slabs): + continue + + self.log("mv: quick relink, nice") + self._symlink(slabs, dabs) + self.db_add(c2, w, drd, dfn, ftime, fsize) + bos.unlink(sabs) self._forget_file(svn.realpath, srem, c1, w) c1.connection.commit() @@ -1412,16 +1463,16 @@ class Up2k(object): # not found in dst db; copy info self.log("mv: plain move") - if c1 and c2: - self._copy_tags(c1, c2, w) + if w: + if c2: + self._copy_tags(c1, c2, w) - if c1: self._forget_file(svn.realpath, srem, c1, w) c1.connection.commit() - if c2: - self.db_add(c2, w, drd, dfn, st.st_mtime, st.st_size) - c2.connection.commit() + if c2: + self.db_add(c2, w, drd, dfn, ftime, fsize) + c2.connection.commit() bos.makedirs(os.path.dirname(dabs)) bos.rename(sabs, dabs) @@ -1443,16 +1494,17 @@ class Up2k(object): return None, None rd, fn = vsplit(vrem) - q = "select w from up where rd=? and fn=? limit 1" + q = "select w, mt, sz from up where rd=? and fn=? limit 1" try: c = cur.execute(q, (rd, fn)) except: c = cur.execute(q, s3enc(self.mem_cur, rd, fn)) - wark = c.fetchone() - if wark: - return cur, wark[0] - return cur, None + hit = c.fetchone() + if hit: + wark, ftime, fsize = hit + return cur, wark, ftime, fsize + return cur, None, None, None def _forget_file(self, ptop, vrem, cur, wark): """forgets file in db, fixes symlinks, does not delete"""