add directory hashing (boots ~3x faster)

This commit is contained in:
ed 2022-08-12 23:17:18 +02:00
parent 11df36f3cf
commit 9ccd327298
3 changed files with 120 additions and 17 deletions

View file

@ -611,6 +611,7 @@ def run_argparse(argv: list[str], formatter: Any, retry: bool) -> argparse.Names
ap2.add_argument("--hist", metavar="PATH", type=u, help="where to store volume data (db, thumbs)")
ap2.add_argument("--no-hash", metavar="PTN", type=u, help="regex: disable hashing of matching paths during e2ds folder scans")
ap2.add_argument("--no-idx", metavar="PTN", type=u, help="regex: disable indexing of matching paths during e2ds folder scans")
ap2.add_argument("--no-dhash", action="store_true", help="disable rescan acceleration; do full database integrity check (makes the db ~5%% smaller)")
ap2.add_argument("--xdev", action="store_true", help="do not descend into other filesystems (symlink or bind-mount to another HDD, ...)")
ap2.add_argument("--xvol", action="store_true", help="skip symlinks leaving the volume root")
ap2.add_argument("--hash-mt", metavar="CORES", type=int, default=hcores, help="num cpu cores to use for file hashing; set 0 or 1 for single-core hashing")

View file

@ -46,6 +46,7 @@ from .util import (
s3enc,
sanitize_fn,
statdir,
vjoin,
vsplit,
w8b64dec,
w8b64enc,
@ -733,6 +734,13 @@ class Up2k(object):
if db.n:
self.log("commit {} new files".format(db.n))
if self.args.no_dhash:
if db.c.execute("select d from dh").fetchone():
db.c.execute("delete from dh")
self.log("forgetting dhashes in {}".format(top))
elif n_add or n_rm:
self._set_tagscan(db.c, True)
db.c.connection.commit()
return True, bool(n_add or n_rm or do_vac)
@ -751,7 +759,7 @@ class Up2k(object):
xvol: bool,
) -> int:
if xvol and not rcdir.startswith(top):
self.log("skip xvol: [{}] -> [{}]".format(top, rcdir), 6)
self.log("skip xvol: [{}] -> [{}]".format(cdir, rcdir), 6)
return 0
if rcdir in seen:
@ -759,29 +767,32 @@ class Up2k(object):
self.log(t.format(seen[-1], rcdir, cdir), 3)
return 0
ret = 0
seen = seen + [rcdir]
unreg: list[str] = []
files: list[tuple[int, int, str]] = []
assert self.pp and self.mem_cur
self.pp.msg = "a{} {}".format(self.pp.n, cdir)
ret = 0
unreg: list[str] = []
seen_files = {} # != inames; files-only for dropcheck
rd = cdir[len(top) + 1 :].strip("/")
if WINDOWS:
rd = rd.replace("\\", "/").strip("/")
g = statdir(self.log_func, not self.args.no_scandir, False, cdir)
gl = sorted(g)
inames = {x[0]: 1 for x in gl}
partials = set([x[0] for x in gl if "PARTIAL" in x[0]])
for iname, inf in gl:
if self.stop:
return -1
rp = vjoin(rd, iname)
abspath = os.path.join(cdir, iname)
rp = abspath[len(top) :].lstrip("/")
if WINDOWS:
rp = rp.replace("\\", "/").strip("/")
if rei and rei.search(abspath):
unreg.append(rp)
continue
nohash = reh.search(abspath) if reh else False
lmod = int(inf.st_mtime)
sz = inf.st_size
if stat.S_ISDIR(inf.st_mode):
@ -807,19 +818,53 @@ class Up2k(object):
self.log("skip type-{:x} file [{}]".format(inf.st_mode, abspath))
else:
# self.log("file: {}".format(abspath))
seen_files[iname] = 1
if rp.endswith(".PARTIAL") and time.time() - lmod < 60:
# rescan during upload
continue
if not sz and (
"{}.PARTIAL".format(iname) in inames
or ".{}.PARTIAL".format(iname) in inames
"{}.PARTIAL".format(iname) in partials
or ".{}.PARTIAL".format(iname) in partials
):
# placeholder for unfinished upload
continue
rd, fn = rp.rsplit("/", 1) if "/" in rp else ["", rp]
files.append((sz, lmod, iname))
# folder of 1000 files = ~1 MiB RAM best-case (tiny filenames);
# free up stuff we're done with before dhashing
gl = []
partials.clear()
if not self.args.no_dhash:
if len(files) < 9000:
zh = hashlib.sha1(str(files).encode("utf-8", "replace"))
else:
zh = hashlib.sha1()
_ = [zh.update(str(x).encode("utf-8", "replace")) for x in files]
dhash = base64.urlsafe_b64encode(zh.digest()[:12]).decode("ascii")
sql = "select d from dh where d = ? and h = ?"
try:
c = db.c.execute(sql, (rd, dhash))
drd = rd
except:
drd = "//" + w8b64enc(rd)
c = db.c.execute(sql, (drd, dhash))
if c.fetchone():
return ret
seen_files = set([x[2] for x in files]) # for dropcheck
for sz, lmod, fn in files:
if self.stop:
return -1
rp = vjoin(rd, fn)
abspath = os.path.join(cdir, fn)
nohash = reh.search(abspath) if reh else False
if fn: # diff-golf
sql = "select w, mt, sz from up where rd = ? and fn = ?"
try:
c = db.c.execute(sql, (rd, fn))
@ -879,6 +924,10 @@ class Up2k(object):
db.n = 0
db.t = time.time()
if not self.args.no_dhash:
db.c.execute("delete from dh where d = ?", (drd,))
db.c.execute("insert into dh values (?,?)", (drd, dhash))
if self.stop:
return -1
@ -897,15 +946,14 @@ class Up2k(object):
t = "forgetting {} shadowed autoindexed files in [{}] > [{}]"
self.log(t.format(n, top, rd))
q = "delete from dh where (d = ? or d like ?||'%')"
db.c.execute(q, (erd, erd + "/"))
q = "delete from up where (rd = ? or rd like ?||'%') and at == 0"
db.c.execute(q, (erd, erd + "/"))
ret += n
# drop missing files
rd = cdir[len(top) + 1 :].strip("/")
if WINDOWS:
rd = rd.replace("\\", "/").strip("/")
q = "select fn from up where rd = ?"
try:
c = db.c.execute(q, (rd,))
@ -956,6 +1004,7 @@ class Up2k(object):
self.log("forgetting {} deleted dirs, {} files".format(len(rm), n_rm))
for rd in rm:
cur.execute("delete from dh where d = ?", (rd,))
cur.execute("delete from up where rd = ?", (rd,))
# then shadowed deleted files
@ -1117,10 +1166,43 @@ class Up2k(object):
reg = self.register_vpath(ptop, vol.flags)
assert reg and self.pp
cur = self.cur[ptop]
if not self.args.no_dhash:
with self.mutex:
c = cur.execute("select k from kv where k = 'tagscan'")
if not c.fetchone():
return 0, 0, bool(self.mtag)
ret = self._build_tags_index_2(ptop)
with self.mutex:
self._set_tagscan(cur, False)
cur.connection.commit()
return ret
def _set_tagscan(self, cur: "sqlite3.Cursor", need: bool) -> bool:
if self.args.no_dhash:
return False
c = cur.execute("select k from kv where k = 'tagscan'")
if bool(c.fetchone()) == need:
return False
if need:
cur.execute("insert into kv values ('tagscan',1)")
else:
cur.execute("delete from kv where k = 'tagscan'")
return True
def _build_tags_index_2(self, ptop: str) -> tuple[int, int, bool]:
entags = self.entags[ptop]
flags = self.flags[ptop]
cur = self.cur[ptop]
n_add = 0
n_rm = 0
if "e2tsr" in flags:
with self.mutex:
@ -1611,6 +1693,7 @@ class Up2k(object):
write_cur.execute(q, (wark[:16], k, v))
ret += 1
self._set_tagscan(write_cur, True)
return ret
def _orz(self, db_path: str) -> "sqlite3.Cursor":
@ -1634,6 +1717,11 @@ class Up2k(object):
self.log("WARN: failed to upgrade from v4", 3)
if ver == DB_VER:
try:
self._add_dhash_tab(cur)
except:
pass
try:
nfiles = next(cur.execute("select count(w) from up"))[0]
self.log("OK: {} |{}|".format(db_path, nfiles))
@ -1737,6 +1825,16 @@ class Up2k(object):
cur.connection.commit()
def _add_dhash_tab(self, cur: "sqlite3.Cursor") -> None:
# v5 -> v5a
for cmd in [
r"create table dh (d text, h text)",
r"create index dh_d on dh(d)",
]:
cur.execute(cmd)
cur.connection.commit()
def _job_volchk(self, cj: dict[str, Any]) -> None:
if not self.register_vpath(cj["ptop"], cj["vcfg"]):
if cj["ptop"] not in self.registry:

View file

@ -1327,6 +1327,10 @@ def vsplit(vpath: str) -> tuple[str, str]:
return vpath.rsplit("/", 1) # type: ignore
def vjoin(rd: str, fn: str) -> str:
return rd + "/" + fn if rd else fn
def w8dec(txt: bytes) -> str:
"""decodes filesystem-bytes to wtf8"""
if PY2: