diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 707060d7..df1a61df 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -12,6 +12,7 @@ import shutil import signal import stat import subprocess as sp +import tempfile import threading import time import traceback @@ -71,6 +72,8 @@ class Dbw(object): class Mpqe(object): + """pending files to tag-scan""" + def __init__( self, mtp: dict[str, MParser], @@ -129,6 +132,8 @@ class Up2k(object): self.mem_cur = None self.sqlite_ver = None self.no_expr_idx = False + self.timeout = int(max(self.args.srch_time, 5) * 1.2) + self.spools: set[tempfile.SpooledTemporaryFile[bytes]] = set() if HAVE_SQLITE3: # mojibake detector self.mem_cur = self._orz(":memory:") @@ -201,8 +206,9 @@ class Up2k(object): self.log("uploads temporarily blocked due to " + why, 3) def _unblock(self) -> None: - self.blocked = None - self.log("uploads are now possible", 2) + if self.blocked is not None: + self.blocked = None + self.log("uploads are now possible", 2) def get_state(self) -> str: mtpq: Union[int, str] = 0 @@ -458,10 +464,9 @@ class Up2k(object): self.volstate[vol.vpath] = t - # file contents verification - if next((zv for zv in vols if "e2v" in zv.flags), None): - self._block("integrity verification") + self._unblock() + # file contents verification for vol in vols: if self.stop: break @@ -485,9 +490,6 @@ class Up2k(object): self.volstate[vol.vpath] = t - if self.blocked: - self._unblock() - # open the rest + do any e2ts(a) needed_mutagen = False for vol in vols: @@ -665,7 +667,7 @@ class Up2k(object): n_add = n_rm = 0 try: n_add = self._build_dir(db, top, set(excl), top, rtop, rei, reh, []) - n_rm = self._drop_lost(db.c, top) + n_rm = self._drop_lost(db.c, top, excl) except Exception as ex: db_ex_chk(self.log, ex, db_path) t = "failed to index volume [{}]:\n{}" @@ -698,6 +700,7 @@ class Up2k(object): assert self.pp and self.mem_cur self.pp.msg = "a{} {}".format(self.pp.n, cdir) ret = 0 + unreg: list[str] = [] seen_files = {} # != inames; files-only for dropcheck g = statdir(self.log_func, not self.args.no_scandir, False, cdir) gl = sorted(g) @@ -707,7 +710,12 @@ class Up2k(object): return -1 abspath = os.path.join(cdir, iname) + rp = abspath[len(top) :].lstrip("/") + if WINDOWS: + rp = rp.replace("\\", "/").strip("/") + if rei and rei.search(abspath): + unreg.append(rp) continue nohash = reh.search(abspath) if reh else False @@ -716,6 +724,7 @@ class Up2k(object): if stat.S_ISDIR(inf.st_mode): rap = absreal(abspath) if abspath in excl or rap in excl: + unreg.append(rp) continue if iname == ".th" and bos.path.isdir(os.path.join(abspath, "top")): # abandoned or foreign, skip @@ -731,10 +740,6 @@ class Up2k(object): else: # self.log("file: {}".format(abspath)) seen_files[iname] = 1 - rp = abspath[len(top) :].lstrip("/") - if WINDOWS: - rp = rp.replace("\\", "/").strip("/") - if rp.endswith(".PARTIAL") and time.time() - lmod < 60: # rescan during upload continue @@ -806,6 +811,25 @@ class Up2k(object): if self.stop: return -1 + # drop shadowed folders + for rd in unreg: + n = 0 + q = "select count(w) from up where (rd = ? or rd like ?||'%') and at == 0" + for erd in [rd, "//" + w8b64enc(rd)]: + try: + n = db.c.execute(q, (erd, erd + "/")).fetchone()[0] + break + except: + pass + + if n: + t = "forgetting {} shadowed autoindexed files in [{}] > [{}]" + self.log(t.format(n, top, rd)) + + q = "delete from up where (rd = ? or rd like ?||'%') and at == 0" + db.c.execute(q, (erd, erd + "/")) + ret += n + # drop missing files rd = cdir[len(top) + 1 :].strip("/") if WINDOWS: @@ -828,12 +852,13 @@ class Up2k(object): return ret - def _drop_lost(self, cur: "sqlite3.Cursor", top: str) -> int: + def _drop_lost(self, cur: "sqlite3.Cursor", top: str, excl: list[str]) -> int: rm = [] n_rm = 0 nchecked = 0 assert self.pp - # `_build_dir` did all the files, now do dirs + + # `_build_dir` did all unshadowed files; first do dirs: ndirs = next(cur.execute("select count(distinct rd) from up"))[0] c = cur.execute("select distinct rd from up order by rd desc") for (drd,) in c: @@ -853,18 +878,50 @@ class Up2k(object): rm.append(drd) - if not rm: - return 0 + if rm: + q = "select count(w) from up where rd = ?" + for rd in rm: + n_rm += next(cur.execute(q, (rd,)))[0] - q = "select count(w) from up where rd = ?" - for rd in rm: - n_rm += next(cur.execute(q, (rd,)))[0] + self.log("forgetting {} deleted dirs, {} files".format(len(rm), n_rm)) + for rd in rm: + cur.execute("delete from up where rd = ?", (rd,)) - self.log("forgetting {} deleted dirs, {} files".format(len(rm), n_rm)) - for rd in rm: - cur.execute("delete from up where rd = ?", (rd,)) + # then shadowed deleted files + n_rm2 = 0 + c2 = cur.connection.cursor() + excl = [x[len(top) + 1 :] for x in excl if x.startswith(top + "/")] + q = "select rd, fn from up where (rd = ? or rd like ?||'%') order by rd" + for rd in excl: + for erd in [rd, "//" + w8b64enc(rd)]: + try: + c = cur.execute(q, (erd, erd + "/")) + break + except: + pass - return n_rm + crd = "///" + cdc: set[str] = set() + for drd, dfn in c: + rd, fn = s3dec(drd, dfn) + + if crd != rd: + crd = rd + try: + cdc = set(os.listdir(os.path.join(top, rd))) + except: + cdc.clear() + + if fn not in cdc: + q = "delete from up where rd = ? and fn = ?" + c2.execute(q, (drd, dfn)) + n_rm2 += 1 + + if n_rm2: + self.log("forgetting {} shadowed deleted files".format(n_rm2)) + + c2.close() + return n_rm + n_rm2 def _verify_integrity(self, vol: VFS) -> int: """expensive; blocks database access until finished""" @@ -888,7 +945,7 @@ class Up2k(object): qexa.append("up.rd != ? and not up.rd like ?||'%'") pexa.extend([vpath, vpath]) - pex = tuple(pexa) + pex: tuple[Any, ...] = tuple(pexa) qex = " and ".join(qexa) if qex: qex = " where " + qex @@ -903,11 +960,22 @@ class Up2k(object): b_left += sz # sum() can overflow according to docs n_left += 1 - q = "select w, mt, sz, rd, fn from up" + qex - for w, mt, sz, drd, dfn in cur.execute(q, pex): + tf, _ = self._spool_warks(cur, "select w, rd, fn from up" + qex, pex, 0) + + with gzip.GzipFile(mode="rb", fileobj=tf) as gf: + for zb in gf: if self.stop: return -1 + w, drd, dfn = zb[:-1].decode("utf-8").split("\x00") + with self.mutex: + q = "select mt, sz from up where w = ? and rd = ? and fn = ?" + try: + mt, sz = cur.execute(q, (w, drd, dfn)).fetchone() + except: + # file moved/deleted since spooling + continue + n_left -= 1 b_left -= sz if drd.startswith("//") or dfn.startswith("//"): @@ -952,18 +1020,21 @@ class Up2k(object): t = t.format(abspath, w, sz, mt, w2, sz2, mt2) self.log(t, 1) - if e2vp and rewark: - self.hub.retcode = 1 - os.kill(os.getpid(), signal.SIGTERM) - raise Exception("{} files have incorrect hashes".format(len(rewark))) + if e2vp and rewark: + self.hub.retcode = 1 + os.kill(os.getpid(), signal.SIGTERM) + raise Exception("{} files have incorrect hashes".format(len(rewark))) - if not e2vu: - return 0 + if not e2vu or not rewark: + return 0 + with self.mutex: for rd, fn, w, sz, mt in rewark: q = "update up set w = ?, sz = ?, mt = ? where rd = ? and fn = ? limit 1" cur.execute(q, (w, sz, int(mt), rd, fn)) + cur.connection.commit() + return len(rewark) def _build_tags_index(self, vol: VFS) -> tuple[int, int, bool]: @@ -977,11 +1048,7 @@ class Up2k(object): flags = self.flags[ptop] cur = self.cur[ptop] - n_add = 0 n_rm = 0 - n_buf = 0 - last_write = time.time() - if "e2tsr" in flags: with self.mutex: n_rm = cur.execute("select count(w) from mt").fetchone()[0] @@ -992,93 +1059,153 @@ class Up2k(object): # integrity: drop tags for tracks that were deleted if "e2t" in flags: with self.mutex: - drops = [] + n = 0 c2 = cur.connection.cursor() up_q = "select w from up where substr(w,1,16) = ?" + rm_q = "delete from mt where w = ?" for (w,) in cur.execute("select w from mt"): if not c2.execute(up_q, (w,)).fetchone(): - drops.append(w[:16]) - c2.close() + c2.execute(rm_q, (w[:16],)) + n += 1 - if drops: - msg = "discarding media tags for {} deleted files" - self.log(msg.format(len(drops))) - n_rm += len(drops) - for w in drops: - cur.execute("delete from mt where w = ?", (w,)) + c2.close() + if n: + t = "discarded media tags for {} deleted files" + self.log(t.format(n)) + n_rm += n + + with self.mutex: + cur.connection.commit() # bail if a volume flag disables indexing if "d2t" in flags or "d2d" in flags: - return n_add, n_rm, True + return 0, n_rm, True # add tags for new files - gcur = cur - with self.mutex: - gcur.connection.commit() - if "e2ts" in flags: if not self.mtag: - return n_add, n_rm, False + return 0, n_rm, False - mpool: Optional[Queue[Mpqe]] = None + nq = 0 + with self.mutex: + tf, nq = self._spool_warks( + cur, "select w from up order by rd, fn", (), 1 + ) - if self.mtag.prefer_mt and self.args.mtag_mt > 1: - mpool = self._start_mpool() + if not nq: + # self.log("tags ok") + self._unspool(tf) + return 0, n_rm, True - # TODO blocks writes to registry cursor; do chunks instead - conn = sqlite3.connect(db_path, timeout=15) - cur = conn.cursor() - c2 = conn.cursor() - c3 = conn.cursor() - n_left = cur.execute("select count(w) from up").fetchone()[0] - for w, rd, fn in cur.execute("select w, rd, fn from up order by rd, fn"): - if self.stop: - return -1, -1, False + if nq == -1: + return -1, -1, True - n_left -= 1 - q = "select w from mt where w = ?" - if c2.execute(q, (w[:16],)).fetchone(): - continue + with gzip.GzipFile(mode="rb", fileobj=tf) as gf: + n_add = self._e2ts_q(gf, nq, cur, ptop, entags) - if "mtp" in flags: - q = "insert into mt values (?,'t:mtp','a')" - c2.execute(q, (w[:16],)) - - if rd.startswith("//") or fn.startswith("//"): - rd, fn = s3dec(rd, fn) - - abspath = os.path.join(ptop, rd, fn) - self.pp.msg = "c{} {}".format(n_left, abspath) - if not mpool: - n_tags = self._tag_file(c3, entags, w, abspath) - else: - mpool.put(Mpqe({}, entags, w, abspath, {})) - # not registry cursor; do not self.mutex: - n_tags = len(self._flush_mpool(c3)) - - n_add += n_tags - n_buf += n_tags - - td = time.time() - last_write - if n_buf >= 4096 or td >= 60: - self.log("commit {} new tags".format(n_buf)) - cur.connection.commit() - last_write = time.time() - n_buf = 0 - - if mpool: - self._stop_mpool(mpool) - with self.mutex: - n_add += len(self._flush_mpool(c3)) - - conn.commit() - c3.close() - c2.close() - cur.close() - conn.close() + self._unspool(tf) return n_add, n_rm, True + def _e2ts_q( + self, + qf: gzip.GzipFile, + nq: int, + cur: "sqlite3.Cursor", + ptop: str, + entags: set[str], + ) -> int: + assert self.pp and self.mtag + + mpool: Optional[Queue[Mpqe]] = None + if self.mtag.prefer_mt and self.args.mtag_mt > 1: + mpool = self._start_mpool() + + n_add = 0 + n_buf = 0 + last_write = time.time() + for bw in qf: + if self.stop: + return -1 + + w = bw[:-1].decode("ascii") + + q = "select rd, fn from up where w = ?" + try: + rd, fn = cur.execute(q, (w,)).fetchone() + except: + # file modified/deleted since spooling + continue + + if rd.startswith("//") or fn.startswith("//"): + rd, fn = s3dec(rd, fn) + + abspath = os.path.join(ptop, rd, fn) + self.pp.msg = "c{} {}".format(nq, abspath) + if not mpool: + n_tags = self._tagscan_file(cur, entags, w, abspath) + else: + mpool.put(Mpqe({}, entags, w, abspath, {})) + with self.mutex: + n_tags = len(self._flush_mpool(cur)) + + n_add += n_tags + n_buf += n_tags + + td = time.time() - last_write + if n_buf >= 4096 or td >= max(1, self.timeout - 1): + self.log("commit {} new tags".format(n_buf)) + with self.mutex: + cur.connection.commit() + + last_write = time.time() + n_buf = 0 + + if mpool: + self._stop_mpool(mpool) + with self.mutex: + n_add += len(self._flush_mpool(cur)) + + with self.mutex: + cur.connection.commit() + + return n_add + + def _spool_warks( + self, + cur: "sqlite3.Cursor", + q: str, + params: tuple[Any, ...], + flt: int, + ) -> tuple[tempfile.SpooledTemporaryFile[bytes], int]: + n = 0 + c2 = cur.connection.cursor() + tf = tempfile.SpooledTemporaryFile(1024 * 1024 * 8, "w+b", prefix="cpp-tq-") + with gzip.GzipFile(mode="wb", fileobj=tf) as gf: + for row in cur.execute(q, params): + if self.stop: + return tf, -1 + + if flt == 1: + q = "select w from mt where w = ?" + if c2.execute(q, (row[0][:16],)).fetchone(): + continue + + gf.write("{}\n".format("\x00".join(row)).encode("utf-8")) + n += 1 + + c2.close() + tf.seek(0) + self.spools.add(tf) + return tf, n + + def _unspool(self, tf: tempfile.SpooledTemporaryFile[bytes]) -> None: + self.spools.remove(tf) + try: + tf.close() + except Exception as ex: + self.log("failed to delete spool: {}".format(ex), 3) + def _flush_mpool(self, wcur: "sqlite3.Cursor") -> list[str]: ret = [] for x in self.pending_tags: @@ -1338,21 +1465,38 @@ class Up2k(object): msg = "{} failed to read tags from {}:\n{}".format(parser, abspath, ex) self.log(msg.lstrip(), c=1 if " int: + """will mutex""" + assert self.mtag + + if not bos.path.isfile(abspath): + return 0 + + try: + tags = self.mtag.get(abspath) + except Exception as ex: + self._log_tag_err("", abspath, ex) + return 0 + + with self.mutex: + return self._tag_file(write_cur, entags, wark, abspath, tags) + def _tag_file( self, write_cur: "sqlite3.Cursor", entags: set[str], wark: str, abspath: str, - tags: Optional[dict[str, Union[str, float]]] = None, + tags: dict[str, Union[str, float]], ) -> int: + """mutex me""" assert self.mtag - if tags is None: - try: - tags = self.mtag.get(abspath) - except Exception as ex: - self._log_tag_err("", abspath, ex) - return 0 if not bos.path.isfile(abspath): return 0 @@ -1382,8 +1526,7 @@ class Up2k(object): return ret def _orz(self, db_path: str) -> "sqlite3.Cursor": - timeout = int(max(self.args.srch_time, 5) * 1.2) - return sqlite3.connect(db_path, timeout, check_same_thread=False).cursor() + return sqlite3.connect(db_path, self.timeout, check_same_thread=False).cursor() # x.set_trace_callback(trace) def _open_db(self, db_path: str) -> "sqlite3.Cursor": @@ -1953,7 +2096,9 @@ class Up2k(object): self.db_add(cur, wark, rd, fn, lmod, sz, ip, at) cur.connection.commit() except Exception as ex: - db_ex_chk(self.log, ex, self.register_vpath(ptop, {})[1]) + x = self.register_vpath(ptop, {}) + assert x + db_ex_chk(self.log, ex, x[1]) raise if "e2t" in self.flags[ptop]: @@ -2657,6 +2802,10 @@ class Up2k(object): def shutdown(self) -> None: self.stop = True + + for x in list(self.spools): + self._unspool(x) + self.log("writing snapshot") self.do_snapshot() diff --git a/copyparty/util.py b/copyparty/util.py index 0a758067..477166f0 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -1196,15 +1196,10 @@ def s3enc(mem_cur: "sqlite3.Cursor", rd: str, fn: str) -> tuple[str, str]: def s3dec(rd: str, fn: str) -> tuple[str, str]: - ret = [] - for v in [rd, fn]: - if v.startswith("//"): - ret.append(w8b64dec(v[2:])) - # self.log("mojide [{}] {}".format(ret[-1], v[2:])) - else: - ret.append(v) - - return ret[0], ret[1] + return ( + w8b64dec(rd[2:]) if rd.startswith("//") else rd, + w8b64dec(fn[2:]) if fn.startswith("//") else fn, + ) def db_ex_chk(log: "NamedLogger", ex: Exception, db_path: str) -> None: