diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 812c6009..50ebefda 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -611,6 +611,7 @@ def run_argparse(argv: list[str], formatter: Any, retry: bool) -> argparse.Names ap2.add_argument("--hist", metavar="PATH", type=u, help="where to store volume data (db, thumbs)") ap2.add_argument("--no-hash", metavar="PTN", type=u, help="regex: disable hashing of matching paths during e2ds folder scans") ap2.add_argument("--no-idx", metavar="PTN", type=u, help="regex: disable indexing of matching paths during e2ds folder scans") + ap2.add_argument("--no-dhash", action="store_true", help="disable rescan acceleration; do full database integrity check (makes the db ~5%% smaller)") ap2.add_argument("--xdev", action="store_true", help="do not descend into other filesystems (symlink or bind-mount to another HDD, ...)") ap2.add_argument("--xvol", action="store_true", help="skip symlinks leaving the volume root") ap2.add_argument("--hash-mt", metavar="CORES", type=int, default=hcores, help="num cpu cores to use for file hashing; set 0 or 1 for single-core hashing") diff --git a/copyparty/up2k.py b/copyparty/up2k.py index c2c33fca..b9043240 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -46,6 +46,7 @@ from .util import ( s3enc, sanitize_fn, statdir, + vjoin, vsplit, w8b64dec, w8b64enc, @@ -733,6 +734,13 @@ class Up2k(object): if db.n: self.log("commit {} new files".format(db.n)) + if self.args.no_dhash: + if db.c.execute("select d from dh").fetchone(): + db.c.execute("delete from dh") + self.log("forgetting dhashes in {}".format(top)) + elif n_add or n_rm: + self._set_tagscan(db.c, True) + db.c.connection.commit() return True, bool(n_add or n_rm or do_vac) @@ -751,7 +759,7 @@ class Up2k(object): xvol: bool, ) -> int: if xvol and not rcdir.startswith(top): - self.log("skip xvol: [{}] -> [{}]".format(top, rcdir), 6) + self.log("skip xvol: [{}] -> [{}]".format(cdir, rcdir), 6) return 0 if rcdir in seen: @@ -759,29 +767,32 @@ class Up2k(object): self.log(t.format(seen[-1], rcdir, cdir), 3) return 0 + ret = 0 seen = seen + [rcdir] + unreg: list[str] = [] + files: list[tuple[int, int, str]] = [] + assert self.pp and self.mem_cur self.pp.msg = "a{} {}".format(self.pp.n, cdir) - ret = 0 - unreg: list[str] = [] - seen_files = {} # != inames; files-only for dropcheck + + rd = cdir[len(top) + 1 :].strip("/") + if WINDOWS: + rd = rd.replace("\\", "/").strip("/") + g = statdir(self.log_func, not self.args.no_scandir, False, cdir) gl = sorted(g) - inames = {x[0]: 1 for x in gl} + partials = set([x[0] for x in gl if "PARTIAL" in x[0]]) for iname, inf in gl: if self.stop: return -1 + rp = vjoin(rd, iname) abspath = os.path.join(cdir, iname) - rp = abspath[len(top) :].lstrip("/") - if WINDOWS: - rp = rp.replace("\\", "/").strip("/") if rei and rei.search(abspath): unreg.append(rp) continue - nohash = reh.search(abspath) if reh else False lmod = int(inf.st_mtime) sz = inf.st_size if stat.S_ISDIR(inf.st_mode): @@ -807,19 +818,53 @@ class Up2k(object): self.log("skip type-{:x} file [{}]".format(inf.st_mode, abspath)) else: # self.log("file: {}".format(abspath)) - seen_files[iname] = 1 if rp.endswith(".PARTIAL") and time.time() - lmod < 60: # rescan during upload continue if not sz and ( - "{}.PARTIAL".format(iname) in inames - or ".{}.PARTIAL".format(iname) in inames + "{}.PARTIAL".format(iname) in partials + or ".{}.PARTIAL".format(iname) in partials ): # placeholder for unfinished upload continue - rd, fn = rp.rsplit("/", 1) if "/" in rp else ["", rp] + files.append((sz, lmod, iname)) + + # folder of 1000 files = ~1 MiB RAM best-case (tiny filenames); + # free up stuff we're done with before dhashing + gl = [] + partials.clear() + if not self.args.no_dhash: + if len(files) < 9000: + zh = hashlib.sha1(str(files).encode("utf-8", "replace")) + else: + zh = hashlib.sha1() + _ = [zh.update(str(x).encode("utf-8", "replace")) for x in files] + + dhash = base64.urlsafe_b64encode(zh.digest()[:12]).decode("ascii") + sql = "select d from dh where d = ? and h = ?" + try: + c = db.c.execute(sql, (rd, dhash)) + drd = rd + except: + drd = "//" + w8b64enc(rd) + c = db.c.execute(sql, (drd, dhash)) + + if c.fetchone(): + return ret + + seen_files = set([x[2] for x in files]) # for dropcheck + for sz, lmod, fn in files: + if self.stop: + return -1 + + rp = vjoin(rd, fn) + abspath = os.path.join(cdir, fn) + nohash = reh.search(abspath) if reh else False + + if fn: # diff-golf + sql = "select w, mt, sz from up where rd = ? and fn = ?" try: c = db.c.execute(sql, (rd, fn)) @@ -879,6 +924,10 @@ class Up2k(object): db.n = 0 db.t = time.time() + if not self.args.no_dhash: + db.c.execute("delete from dh where d = ?", (drd,)) + db.c.execute("insert into dh values (?,?)", (drd, dhash)) + if self.stop: return -1 @@ -897,15 +946,14 @@ class Up2k(object): t = "forgetting {} shadowed autoindexed files in [{}] > [{}]" self.log(t.format(n, top, rd)) + q = "delete from dh where (d = ? or d like ?||'%')" + db.c.execute(q, (erd, erd + "/")) + q = "delete from up where (rd = ? or rd like ?||'%') and at == 0" db.c.execute(q, (erd, erd + "/")) ret += n # drop missing files - rd = cdir[len(top) + 1 :].strip("/") - if WINDOWS: - rd = rd.replace("\\", "/").strip("/") - q = "select fn from up where rd = ?" try: c = db.c.execute(q, (rd,)) @@ -956,6 +1004,7 @@ class Up2k(object): self.log("forgetting {} deleted dirs, {} files".format(len(rm), n_rm)) for rd in rm: + cur.execute("delete from dh where d = ?", (rd,)) cur.execute("delete from up where rd = ?", (rd,)) # then shadowed deleted files @@ -1117,10 +1166,43 @@ class Up2k(object): reg = self.register_vpath(ptop, vol.flags) assert reg and self.pp + cur = self.cur[ptop] + + if not self.args.no_dhash: + with self.mutex: + c = cur.execute("select k from kv where k = 'tagscan'") + if not c.fetchone(): + return 0, 0, bool(self.mtag) + + ret = self._build_tags_index_2(ptop) + + with self.mutex: + self._set_tagscan(cur, False) + cur.connection.commit() + + return ret + + def _set_tagscan(self, cur: "sqlite3.Cursor", need: bool) -> bool: + if self.args.no_dhash: + return False + + c = cur.execute("select k from kv where k = 'tagscan'") + if bool(c.fetchone()) == need: + return False + + if need: + cur.execute("insert into kv values ('tagscan',1)") + else: + cur.execute("delete from kv where k = 'tagscan'") + + return True + + def _build_tags_index_2(self, ptop: str) -> tuple[int, int, bool]: entags = self.entags[ptop] flags = self.flags[ptop] cur = self.cur[ptop] + n_add = 0 n_rm = 0 if "e2tsr" in flags: with self.mutex: @@ -1611,6 +1693,7 @@ class Up2k(object): write_cur.execute(q, (wark[:16], k, v)) ret += 1 + self._set_tagscan(write_cur, True) return ret def _orz(self, db_path: str) -> "sqlite3.Cursor": @@ -1634,6 +1717,11 @@ class Up2k(object): self.log("WARN: failed to upgrade from v4", 3) if ver == DB_VER: + try: + self._add_dhash_tab(cur) + except: + pass + try: nfiles = next(cur.execute("select count(w) from up"))[0] self.log("OK: {} |{}|".format(db_path, nfiles)) @@ -1737,6 +1825,16 @@ class Up2k(object): cur.connection.commit() + def _add_dhash_tab(self, cur: "sqlite3.Cursor") -> None: + # v5 -> v5a + for cmd in [ + r"create table dh (d text, h text)", + r"create index dh_d on dh(d)", + ]: + cur.execute(cmd) + + cur.connection.commit() + def _job_volchk(self, cj: dict[str, Any]) -> None: if not self.register_vpath(cj["ptop"], cj["vcfg"]): if cj["ptop"] not in self.registry: diff --git a/copyparty/util.py b/copyparty/util.py index 61d37a2e..96b65ef9 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -1327,6 +1327,10 @@ def vsplit(vpath: str) -> tuple[str, str]: return vpath.rsplit("/", 1) # type: ignore +def vjoin(rd: str, fn: str) -> str: + return rd + "/" + fn if rd else fn + + def w8dec(txt: bytes) -> str: """decodes filesystem-bytes to wtf8""" if PY2: