async e2ts / e2v + forget deleted shadowed

This commit is contained in:
ed 2022-07-26 12:47:40 +02:00
parent d686ce12b6
commit 5dc75ebb67
2 changed files with 267 additions and 123 deletions

View file

@ -12,6 +12,7 @@ import shutil
import signal
import stat
import subprocess as sp
import tempfile
import threading
import time
import traceback
@ -71,6 +72,8 @@ class Dbw(object):
class Mpqe(object):
"""pending files to tag-scan"""
def __init__(
self,
mtp: dict[str, MParser],
@ -129,6 +132,8 @@ class Up2k(object):
self.mem_cur = None
self.sqlite_ver = None
self.no_expr_idx = False
self.timeout = int(max(self.args.srch_time, 5) * 1.2)
self.spools: set[tempfile.SpooledTemporaryFile[bytes]] = set()
if HAVE_SQLITE3:
# mojibake detector
self.mem_cur = self._orz(":memory:")
@ -201,8 +206,9 @@ class Up2k(object):
self.log("uploads temporarily blocked due to " + why, 3)
def _unblock(self) -> None:
self.blocked = None
self.log("uploads are now possible", 2)
if self.blocked is not None:
self.blocked = None
self.log("uploads are now possible", 2)
def get_state(self) -> str:
mtpq: Union[int, str] = 0
@ -458,10 +464,9 @@ class Up2k(object):
self.volstate[vol.vpath] = t
# file contents verification
if next((zv for zv in vols if "e2v" in zv.flags), None):
self._block("integrity verification")
self._unblock()
# file contents verification
for vol in vols:
if self.stop:
break
@ -485,9 +490,6 @@ class Up2k(object):
self.volstate[vol.vpath] = t
if self.blocked:
self._unblock()
# open the rest + do any e2ts(a)
needed_mutagen = False
for vol in vols:
@ -665,7 +667,7 @@ class Up2k(object):
n_add = n_rm = 0
try:
n_add = self._build_dir(db, top, set(excl), top, rtop, rei, reh, [])
n_rm = self._drop_lost(db.c, top)
n_rm = self._drop_lost(db.c, top, excl)
except Exception as ex:
db_ex_chk(self.log, ex, db_path)
t = "failed to index volume [{}]:\n{}"
@ -698,6 +700,7 @@ class Up2k(object):
assert self.pp and self.mem_cur
self.pp.msg = "a{} {}".format(self.pp.n, cdir)
ret = 0
unreg: list[str] = []
seen_files = {} # != inames; files-only for dropcheck
g = statdir(self.log_func, not self.args.no_scandir, False, cdir)
gl = sorted(g)
@ -707,7 +710,12 @@ class Up2k(object):
return -1
abspath = os.path.join(cdir, iname)
rp = abspath[len(top) :].lstrip("/")
if WINDOWS:
rp = rp.replace("\\", "/").strip("/")
if rei and rei.search(abspath):
unreg.append(rp)
continue
nohash = reh.search(abspath) if reh else False
@ -716,6 +724,7 @@ class Up2k(object):
if stat.S_ISDIR(inf.st_mode):
rap = absreal(abspath)
if abspath in excl or rap in excl:
unreg.append(rp)
continue
if iname == ".th" and bos.path.isdir(os.path.join(abspath, "top")):
# abandoned or foreign, skip
@ -731,10 +740,6 @@ class Up2k(object):
else:
# self.log("file: {}".format(abspath))
seen_files[iname] = 1
rp = abspath[len(top) :].lstrip("/")
if WINDOWS:
rp = rp.replace("\\", "/").strip("/")
if rp.endswith(".PARTIAL") and time.time() - lmod < 60:
# rescan during upload
continue
@ -806,6 +811,25 @@ class Up2k(object):
if self.stop:
return -1
# drop shadowed folders
for rd in unreg:
n = 0
q = "select count(w) from up where (rd = ? or rd like ?||'%') and at == 0"
for erd in [rd, "//" + w8b64enc(rd)]:
try:
n = db.c.execute(q, (erd, erd + "/")).fetchone()[0]
break
except:
pass
if n:
t = "forgetting {} shadowed autoindexed files in [{}] > [{}]"
self.log(t.format(n, top, rd))
q = "delete from up where (rd = ? or rd like ?||'%') and at == 0"
db.c.execute(q, (erd, erd + "/"))
ret += n
# drop missing files
rd = cdir[len(top) + 1 :].strip("/")
if WINDOWS:
@ -828,12 +852,13 @@ class Up2k(object):
return ret
def _drop_lost(self, cur: "sqlite3.Cursor", top: str) -> int:
def _drop_lost(self, cur: "sqlite3.Cursor", top: str, excl: list[str]) -> int:
rm = []
n_rm = 0
nchecked = 0
assert self.pp
# `_build_dir` did all the files, now do dirs
# `_build_dir` did all unshadowed files; first do dirs:
ndirs = next(cur.execute("select count(distinct rd) from up"))[0]
c = cur.execute("select distinct rd from up order by rd desc")
for (drd,) in c:
@ -853,18 +878,50 @@ class Up2k(object):
rm.append(drd)
if not rm:
return 0
if rm:
q = "select count(w) from up where rd = ?"
for rd in rm:
n_rm += next(cur.execute(q, (rd,)))[0]
q = "select count(w) from up where rd = ?"
for rd in rm:
n_rm += next(cur.execute(q, (rd,)))[0]
self.log("forgetting {} deleted dirs, {} files".format(len(rm), n_rm))
for rd in rm:
cur.execute("delete from up where rd = ?", (rd,))
self.log("forgetting {} deleted dirs, {} files".format(len(rm), n_rm))
for rd in rm:
cur.execute("delete from up where rd = ?", (rd,))
# then shadowed deleted files
n_rm2 = 0
c2 = cur.connection.cursor()
excl = [x[len(top) + 1 :] for x in excl if x.startswith(top + "/")]
q = "select rd, fn from up where (rd = ? or rd like ?||'%') order by rd"
for rd in excl:
for erd in [rd, "//" + w8b64enc(rd)]:
try:
c = cur.execute(q, (erd, erd + "/"))
break
except:
pass
return n_rm
crd = "///"
cdc: set[str] = set()
for drd, dfn in c:
rd, fn = s3dec(drd, dfn)
if crd != rd:
crd = rd
try:
cdc = set(os.listdir(os.path.join(top, rd)))
except:
cdc.clear()
if fn not in cdc:
q = "delete from up where rd = ? and fn = ?"
c2.execute(q, (drd, dfn))
n_rm2 += 1
if n_rm2:
self.log("forgetting {} shadowed deleted files".format(n_rm2))
c2.close()
return n_rm + n_rm2
def _verify_integrity(self, vol: VFS) -> int:
"""expensive; blocks database access until finished"""
@ -888,7 +945,7 @@ class Up2k(object):
qexa.append("up.rd != ? and not up.rd like ?||'%'")
pexa.extend([vpath, vpath])
pex = tuple(pexa)
pex: tuple[Any, ...] = tuple(pexa)
qex = " and ".join(qexa)
if qex:
qex = " where " + qex
@ -903,11 +960,22 @@ class Up2k(object):
b_left += sz # sum() can overflow according to docs
n_left += 1
q = "select w, mt, sz, rd, fn from up" + qex
for w, mt, sz, drd, dfn in cur.execute(q, pex):
tf, _ = self._spool_warks(cur, "select w, rd, fn from up" + qex, pex, 0)
with gzip.GzipFile(mode="rb", fileobj=tf) as gf:
for zb in gf:
if self.stop:
return -1
w, drd, dfn = zb[:-1].decode("utf-8").split("\x00")
with self.mutex:
q = "select mt, sz from up where w = ? and rd = ? and fn = ?"
try:
mt, sz = cur.execute(q, (w, drd, dfn)).fetchone()
except:
# file moved/deleted since spooling
continue
n_left -= 1
b_left -= sz
if drd.startswith("//") or dfn.startswith("//"):
@ -952,18 +1020,21 @@ class Up2k(object):
t = t.format(abspath, w, sz, mt, w2, sz2, mt2)
self.log(t, 1)
if e2vp and rewark:
self.hub.retcode = 1
os.kill(os.getpid(), signal.SIGTERM)
raise Exception("{} files have incorrect hashes".format(len(rewark)))
if e2vp and rewark:
self.hub.retcode = 1
os.kill(os.getpid(), signal.SIGTERM)
raise Exception("{} files have incorrect hashes".format(len(rewark)))
if not e2vu:
return 0
if not e2vu or not rewark:
return 0
with self.mutex:
for rd, fn, w, sz, mt in rewark:
q = "update up set w = ?, sz = ?, mt = ? where rd = ? and fn = ? limit 1"
cur.execute(q, (w, sz, int(mt), rd, fn))
cur.connection.commit()
return len(rewark)
def _build_tags_index(self, vol: VFS) -> tuple[int, int, bool]:
@ -977,11 +1048,7 @@ class Up2k(object):
flags = self.flags[ptop]
cur = self.cur[ptop]
n_add = 0
n_rm = 0
n_buf = 0
last_write = time.time()
if "e2tsr" in flags:
with self.mutex:
n_rm = cur.execute("select count(w) from mt").fetchone()[0]
@ -992,93 +1059,153 @@ class Up2k(object):
# integrity: drop tags for tracks that were deleted
if "e2t" in flags:
with self.mutex:
drops = []
n = 0
c2 = cur.connection.cursor()
up_q = "select w from up where substr(w,1,16) = ?"
rm_q = "delete from mt where w = ?"
for (w,) in cur.execute("select w from mt"):
if not c2.execute(up_q, (w,)).fetchone():
drops.append(w[:16])
c2.close()
c2.execute(rm_q, (w[:16],))
n += 1
if drops:
msg = "discarding media tags for {} deleted files"
self.log(msg.format(len(drops)))
n_rm += len(drops)
for w in drops:
cur.execute("delete from mt where w = ?", (w,))
c2.close()
if n:
t = "discarded media tags for {} deleted files"
self.log(t.format(n))
n_rm += n
with self.mutex:
cur.connection.commit()
# bail if a volume flag disables indexing
if "d2t" in flags or "d2d" in flags:
return n_add, n_rm, True
return 0, n_rm, True
# add tags for new files
gcur = cur
with self.mutex:
gcur.connection.commit()
if "e2ts" in flags:
if not self.mtag:
return n_add, n_rm, False
return 0, n_rm, False
mpool: Optional[Queue[Mpqe]] = None
nq = 0
with self.mutex:
tf, nq = self._spool_warks(
cur, "select w from up order by rd, fn", (), 1
)
if self.mtag.prefer_mt and self.args.mtag_mt > 1:
mpool = self._start_mpool()
if not nq:
# self.log("tags ok")
self._unspool(tf)
return 0, n_rm, True
# TODO blocks writes to registry cursor; do chunks instead
conn = sqlite3.connect(db_path, timeout=15)
cur = conn.cursor()
c2 = conn.cursor()
c3 = conn.cursor()
n_left = cur.execute("select count(w) from up").fetchone()[0]
for w, rd, fn in cur.execute("select w, rd, fn from up order by rd, fn"):
if self.stop:
return -1, -1, False
if nq == -1:
return -1, -1, True
n_left -= 1
q = "select w from mt where w = ?"
if c2.execute(q, (w[:16],)).fetchone():
continue
with gzip.GzipFile(mode="rb", fileobj=tf) as gf:
n_add = self._e2ts_q(gf, nq, cur, ptop, entags)
if "mtp" in flags:
q = "insert into mt values (?,'t:mtp','a')"
c2.execute(q, (w[:16],))
if rd.startswith("//") or fn.startswith("//"):
rd, fn = s3dec(rd, fn)
abspath = os.path.join(ptop, rd, fn)
self.pp.msg = "c{} {}".format(n_left, abspath)
if not mpool:
n_tags = self._tag_file(c3, entags, w, abspath)
else:
mpool.put(Mpqe({}, entags, w, abspath, {}))
# not registry cursor; do not self.mutex:
n_tags = len(self._flush_mpool(c3))
n_add += n_tags
n_buf += n_tags
td = time.time() - last_write
if n_buf >= 4096 or td >= 60:
self.log("commit {} new tags".format(n_buf))
cur.connection.commit()
last_write = time.time()
n_buf = 0
if mpool:
self._stop_mpool(mpool)
with self.mutex:
n_add += len(self._flush_mpool(c3))
conn.commit()
c3.close()
c2.close()
cur.close()
conn.close()
self._unspool(tf)
return n_add, n_rm, True
def _e2ts_q(
self,
qf: gzip.GzipFile,
nq: int,
cur: "sqlite3.Cursor",
ptop: str,
entags: set[str],
) -> int:
assert self.pp and self.mtag
mpool: Optional[Queue[Mpqe]] = None
if self.mtag.prefer_mt and self.args.mtag_mt > 1:
mpool = self._start_mpool()
n_add = 0
n_buf = 0
last_write = time.time()
for bw in qf:
if self.stop:
return -1
w = bw[:-1].decode("ascii")
q = "select rd, fn from up where w = ?"
try:
rd, fn = cur.execute(q, (w,)).fetchone()
except:
# file modified/deleted since spooling
continue
if rd.startswith("//") or fn.startswith("//"):
rd, fn = s3dec(rd, fn)
abspath = os.path.join(ptop, rd, fn)
self.pp.msg = "c{} {}".format(nq, abspath)
if not mpool:
n_tags = self._tagscan_file(cur, entags, w, abspath)
else:
mpool.put(Mpqe({}, entags, w, abspath, {}))
with self.mutex:
n_tags = len(self._flush_mpool(cur))
n_add += n_tags
n_buf += n_tags
td = time.time() - last_write
if n_buf >= 4096 or td >= max(1, self.timeout - 1):
self.log("commit {} new tags".format(n_buf))
with self.mutex:
cur.connection.commit()
last_write = time.time()
n_buf = 0
if mpool:
self._stop_mpool(mpool)
with self.mutex:
n_add += len(self._flush_mpool(cur))
with self.mutex:
cur.connection.commit()
return n_add
def _spool_warks(
self,
cur: "sqlite3.Cursor",
q: str,
params: tuple[Any, ...],
flt: int,
) -> tuple[tempfile.SpooledTemporaryFile[bytes], int]:
n = 0
c2 = cur.connection.cursor()
tf = tempfile.SpooledTemporaryFile(1024 * 1024 * 8, "w+b", prefix="cpp-tq-")
with gzip.GzipFile(mode="wb", fileobj=tf) as gf:
for row in cur.execute(q, params):
if self.stop:
return tf, -1
if flt == 1:
q = "select w from mt where w = ?"
if c2.execute(q, (row[0][:16],)).fetchone():
continue
gf.write("{}\n".format("\x00".join(row)).encode("utf-8"))
n += 1
c2.close()
tf.seek(0)
self.spools.add(tf)
return tf, n
def _unspool(self, tf: tempfile.SpooledTemporaryFile[bytes]) -> None:
self.spools.remove(tf)
try:
tf.close()
except Exception as ex:
self.log("failed to delete spool: {}".format(ex), 3)
def _flush_mpool(self, wcur: "sqlite3.Cursor") -> list[str]:
ret = []
for x in self.pending_tags:
@ -1338,21 +1465,38 @@ class Up2k(object):
msg = "{} failed to read tags from {}:\n{}".format(parser, abspath, ex)
self.log(msg.lstrip(), c=1 if "<Signals.SIG" in msg else 3)
def _tagscan_file(
self,
write_cur: "sqlite3.Cursor",
entags: set[str],
wark: str,
abspath: str,
) -> int:
"""will mutex"""
assert self.mtag
if not bos.path.isfile(abspath):
return 0
try:
tags = self.mtag.get(abspath)
except Exception as ex:
self._log_tag_err("", abspath, ex)
return 0
with self.mutex:
return self._tag_file(write_cur, entags, wark, abspath, tags)
def _tag_file(
self,
write_cur: "sqlite3.Cursor",
entags: set[str],
wark: str,
abspath: str,
tags: Optional[dict[str, Union[str, float]]] = None,
tags: dict[str, Union[str, float]],
) -> int:
"""mutex me"""
assert self.mtag
if tags is None:
try:
tags = self.mtag.get(abspath)
except Exception as ex:
self._log_tag_err("", abspath, ex)
return 0
if not bos.path.isfile(abspath):
return 0
@ -1382,8 +1526,7 @@ class Up2k(object):
return ret
def _orz(self, db_path: str) -> "sqlite3.Cursor":
timeout = int(max(self.args.srch_time, 5) * 1.2)
return sqlite3.connect(db_path, timeout, check_same_thread=False).cursor()
return sqlite3.connect(db_path, self.timeout, check_same_thread=False).cursor()
# x.set_trace_callback(trace)
def _open_db(self, db_path: str) -> "sqlite3.Cursor":
@ -1953,7 +2096,9 @@ class Up2k(object):
self.db_add(cur, wark, rd, fn, lmod, sz, ip, at)
cur.connection.commit()
except Exception as ex:
db_ex_chk(self.log, ex, self.register_vpath(ptop, {})[1])
x = self.register_vpath(ptop, {})
assert x
db_ex_chk(self.log, ex, x[1])
raise
if "e2t" in self.flags[ptop]:
@ -2657,6 +2802,10 @@ class Up2k(object):
def shutdown(self) -> None:
self.stop = True
for x in list(self.spools):
self._unspool(x)
self.log("writing snapshot")
self.do_snapshot()

View file

@ -1196,15 +1196,10 @@ def s3enc(mem_cur: "sqlite3.Cursor", rd: str, fn: str) -> tuple[str, str]:
def s3dec(rd: str, fn: str) -> tuple[str, str]:
ret = []
for v in [rd, fn]:
if v.startswith("//"):
ret.append(w8b64dec(v[2:]))
# self.log("mojide [{}] {}".format(ret[-1], v[2:]))
else:
ret.append(v)
return ret[0], ret[1]
return (
w8b64dec(rd[2:]) if rd.startswith("//") else rd,
w8b64dec(fn[2:]) if fn.startswith("//") else fn,
)
def db_ex_chk(log: "NamedLogger", ex: Exception, db_path: str) -> None: