live db/tags rescan

This commit is contained in:
ed 2021-05-29 23:35:07 +02:00
parent eb5aaddba4
commit 0be7c5e2d8
7 changed files with 335 additions and 162 deletions

View file

@ -294,6 +294,8 @@ the same arguments can be set as volume flags, in addition to `d2d` and `d2t` fo
`e2tsr` is probably always overkill, since `e2ds`/`e2dsa` would pick up any file modifications and cause `e2ts` to reindex those
the rescan button in the admin panel has no effect unless the volume has `-e2ds` or higher
## metadata from audio files

View file

@ -249,6 +249,10 @@ def run_argparse(argv, formatter):
ap.add_argument("--urlform", metavar="MODE", type=str, default="print,get", help="how to handle url-forms")
ap.add_argument("--salt", type=str, default="hunter2", help="up2k file-hash salt")
ap2 = ap.add_argument_group('admin panel options')
ap2.add_argument("--no-rescan", action="store_true", help="disable ?scan (volume reindexing)")
ap2.add_argument("--no-stack", action="store_true", help="disable ?stack (list all stacks)")
ap2 = ap.add_argument_group('thumbnail options')
ap2.add_argument("--no-thumb", action="store_true", help="disable all thumbnails")
ap2.add_argument("--no-vthumb", action="store_true", help="disable video thumbnails")
@ -287,6 +291,7 @@ def run_argparse(argv, formatter):
ap2.add_argument("--log-conn", action="store_true", help="print tcp-server msgs")
ap2.add_argument("--no-sendfile", action="store_true", help="disable sendfile")
ap2.add_argument("--no-scandir", action="store_true", help="disable scandir")
ap2.add_argument("--no-fastboot", action="store_true", help="wait for up2k indexing")
ap2.add_argument("--ihead", metavar="HEADER", action='append', help="dump incoming header")
ap2.add_argument("--lf-url", metavar="RE", type=str, default=r"^/\.cpr/|\?th=[wj]$", help="dont log URLs matching")

View file

@ -14,11 +14,12 @@ from .util import IMPLICATIONS, undot, Pebkac, fsdec, fsenc, statdir, nuprint
class VFS(object):
"""single level in the virtual fs"""
def __init__(self, realpath, vpath, uread=[], uwrite=[], flags={}):
def __init__(self, realpath, vpath, uread=[], uwrite=[], uadm=[], flags={}):
self.realpath = realpath # absolute path on host filesystem
self.vpath = vpath # absolute path in the virtual filesystem
self.uread = uread # users who can read this
self.uwrite = uwrite # users who can write this
self.uadm = uadm # users who are regular admins
self.flags = flags # config switches
self.nodes = {} # child nodes
self.all_vols = {vpath: self} # flattened recursive
@ -27,7 +28,7 @@ class VFS(object):
return "VFS({})".format(
", ".join(
"{}={!r}".format(k, self.__dict__[k])
for k in "realpath vpath uread uwrite flags".split()
for k in "realpath vpath uread uwrite uadm flags".split()
)
)
@ -52,6 +53,7 @@ class VFS(object):
"{}/{}".format(self.vpath, name).lstrip("/"),
self.uread,
self.uwrite,
self.uadm,
self.flags,
)
self._trk(vn)
@ -226,15 +228,19 @@ class VFS(object):
for f in [{"vp": v, "ap": a, "st": n[1]} for v, a, n in files]:
yield f
def user_tree(self, uname, readable=False, writable=False):
def user_tree(self, uname, readable=False, writable=False, admin=False):
ret = []
opt1 = readable and (uname in self.uread or "*" in self.uread)
opt2 = writable and (uname in self.uwrite or "*" in self.uwrite)
if opt1 or opt2:
ret.append(self.vpath)
if admin:
if opt1 and opt2:
ret.append(self.vpath)
else:
if opt1 or opt2:
ret.append(self.vpath)
for _, vn in sorted(self.nodes.items()):
ret.extend(vn.user_tree(uname, readable, writable))
ret.extend(vn.user_tree(uname, readable, writable, admin))
return ret
@ -269,7 +275,7 @@ class AuthSrv(object):
yield prev, True
def _parse_config_file(self, fd, user, mread, mwrite, mflags, mount):
def _parse_config_file(self, fd, user, mread, mwrite, madm, mflags, mount):
vol_src = None
vol_dst = None
self.line_ctr = 0
@ -301,6 +307,7 @@ class AuthSrv(object):
mount[vol_dst] = vol_src
mread[vol_dst] = []
mwrite[vol_dst] = []
madm[vol_dst] = []
mflags[vol_dst] = {}
continue
@ -311,10 +318,15 @@ class AuthSrv(object):
uname = "*"
self._read_vol_str(
lvl, uname, mread[vol_dst], mwrite[vol_dst], mflags[vol_dst]
lvl,
uname,
mread[vol_dst],
mwrite[vol_dst],
madm[vol_dst],
mflags[vol_dst],
)
def _read_vol_str(self, lvl, uname, mr, mw, mf):
def _read_vol_str(self, lvl, uname, mr, mw, ma, mf):
if lvl == "c":
cval = True
if "=" in uname:
@ -332,6 +344,9 @@ class AuthSrv(object):
if lvl in "wa":
mw.append(uname)
if lvl == "a":
ma.append(uname)
def _read_volflag(self, flags, name, value, is_list):
if name not in ["mtp"]:
flags[name] = value
@ -355,6 +370,7 @@ class AuthSrv(object):
user = {} # username:password
mread = {} # mountpoint:[username]
mwrite = {} # mountpoint:[username]
madm = {} # mountpoint:[username]
mflags = {} # mountpoint:[flag]
mount = {} # dst:src (mountpoint:realpath)
@ -378,17 +394,22 @@ class AuthSrv(object):
mount[dst] = src
mread[dst] = []
mwrite[dst] = []
madm[dst] = []
mflags[dst] = {}
perms = perms.split(":")
for (lvl, uname) in [[x[0], x[1:]] for x in perms]:
self._read_vol_str(lvl, uname, mread[dst], mwrite[dst], mflags[dst])
self._read_vol_str(
lvl, uname, mread[dst], mwrite[dst], madm[dst], mflags[dst]
)
if self.args.c:
for cfg_fn in self.args.c:
with open(cfg_fn, "rb") as f:
try:
self._parse_config_file(f, user, mread, mwrite, mflags, mount)
self._parse_config_file(
f, user, mread, mwrite, madm, mflags, mount
)
except:
m = "\n\033[1;31m\nerror in config file {} on line {}:\n\033[0m"
print(m.format(cfg_fn, self.line_ctr))
@ -410,12 +431,15 @@ class AuthSrv(object):
if dst == "":
# rootfs was mapped; fully replaces the default CWD vfs
vfs = VFS(mount[dst], dst, mread[dst], mwrite[dst], mflags[dst])
vfs = VFS(
mount[dst], dst, mread[dst], mwrite[dst], madm[dst], mflags[dst]
)
continue
v = vfs.add(mount[dst], dst)
v.uread = mread[dst]
v.uwrite = mwrite[dst]
v.uadm = madm[dst]
v.flags = mflags[dst]
missing_users = {}

View file

@ -10,6 +10,7 @@ import json
import string
import socket
import ctypes
import traceback
from datetime import datetime
import calendar
@ -155,6 +156,7 @@ class HttpCli(object):
if self.uname:
self.rvol = self.auth.vfs.user_tree(self.uname, readable=True)
self.wvol = self.auth.vfs.user_tree(self.uname, writable=True)
self.avol = self.auth.vfs.user_tree(self.uname, True, True, True)
ua = self.headers.get("user-agent", "")
self.is_rclone = ua.startswith("rclone/")
@ -326,6 +328,12 @@ class HttpCli(object):
self.vpath = None
return self.tx_mounts()
if "scan" in self.uparam:
return self.scanvol()
if "stack" in self.uparam:
return self.tx_stack()
return self.tx_browser()
def handle_options(self):
@ -1304,10 +1312,61 @@ class HttpCli(object):
suf = self.urlq(rm=["h"])
rvol = [x + "/" if x else x for x in self.rvol]
wvol = [x + "/" if x else x for x in self.wvol]
html = self.j2("splash", this=self, rvol=rvol, wvol=wvol, url_suf=suf)
vstate = {}
if self.avol and not self.args.no_rescan:
x = self.conn.hsrv.broker.put(True, "up2k.get_volstate")
vstate = json.loads(x.get())
html = self.j2(
"splash",
this=self,
rvol=rvol,
wvol=wvol,
avol=self.avol,
vstate=vstate,
url_suf=suf,
)
self.reply(html.encode("utf-8"), headers=NO_STORE)
return True
def scanvol(self):
if not self.readable or not self.writable:
raise Pebkac(403, "not admin")
if self.args.no_rescan:
raise Pebkac(403, "disabled by argv")
vn, _ = self.auth.vfs.get(self.vpath, self.uname, True, True)
args = [self.auth.vfs.all_vols, [vn.vpath]]
x = self.conn.hsrv.broker.put(True, "up2k.rescan", *args)
x = x.get()
if not x:
self.redirect("", "?h")
return ""
raise Pebkac(500, x)
def tx_stack(self):
if not self.readable or not self.writable:
raise Pebkac(403, "not admin")
if self.args.no_stack:
raise Pebkac(403, "disabled by argv")
ret = []
names = dict([(t.ident, t.name) for t in threading.enumerate()])
for tid, stack in sys._current_frames().items():
ret.append("\n\n# {} ({:x})".format(names.get(tid), tid))
for fn, lno, name, line in traceback.extract_stack(stack):
ret.append('File: "{}", line {}, in {}'.format(fn, lno, name))
if line:
ret.append(" " + str(line.strip()))
ret = ("<pre>" + "\n".join(ret)).encode("utf-8")
self.reply(ret)
def tx_tree(self):
top = self.uparam["tree"] or ""
dst = self.vpath

View file

@ -52,7 +52,6 @@ class Up2k(object):
self.hub = hub
self.args = hub.args
self.log_func = hub.log
self.all_vols = all_vols
# config
self.salt = self.args.salt
@ -61,12 +60,14 @@ class Up2k(object):
self.mutex = threading.Lock()
self.hashq = Queue()
self.tagq = Queue()
self.volstate = {}
self.registry = {}
self.entags = {}
self.flags = {}
self.cur = {}
self.mtag = None
self.pending_tags = None
self.mtp_parsers = {}
self.mem_cur = None
self.sqlite_ver = None
@ -92,7 +93,15 @@ class Up2k(object):
if not HAVE_SQLITE3:
self.log("could not initialize sqlite3, will use in-memory registry only")
have_e2d = self.init_indexes()
if self.args.no_fastboot:
self.deferred_init(all_vols)
else:
t = threading.Thread(target=self.deferred_init, args=(all_vols,))
t.daemon = True
t.start()
def deferred_init(self, all_vols):
have_e2d = self.init_indexes(all_vols)
if have_e2d:
thr = threading.Thread(target=self._snapshot)
@ -115,6 +124,19 @@ class Up2k(object):
def log(self, msg, c=0):
self.log_func("up2k", msg + "\033[K", c)
def get_volstate(self):
return json.dumps(self.volstate, indent=4)
def rescan(self, all_vols, scan_vols):
if hasattr(self, "pp"):
return "cannot initiate; scan is already in progress"
args = (all_vols, scan_vols)
t = threading.Thread(target=self.init_indexes, args=args)
t.daemon = True
t.start()
return None
def _vis_job_progress(self, job):
perc = 100 - (len(job["need"]) * 100.0 / len(job["hash"]))
path = os.path.join(job["ptop"], job["prel"], job["name"])
@ -137,9 +159,9 @@ class Up2k(object):
return True, ret
def init_indexes(self):
def init_indexes(self, all_vols, scan_vols=[]):
self.pp = ProgressPrinter()
vols = self.all_vols.values()
vols = all_vols.values()
t0 = time.time()
have_e2d = False
@ -159,24 +181,32 @@ class Up2k(object):
for vol in vols:
try:
os.listdir(vol.realpath)
live_vols.append(vol)
if not self.register_vpath(vol.realpath, vol.flags):
raise Exception()
if vol.vpath in scan_vols or not scan_vols:
live_vols.append(vol)
if vol.vpath not in self.volstate:
self.volstate[vol.vpath] = "OFFLINE (not initialized)"
except:
self.log("cannot access " + vol.realpath, c=1)
# self.log("db not enabled for {}".format(m, vol.realpath))
pass
vols = live_vols
need_vac = {}
need_mtag = False
for vol in vols:
if "e2t" in vol.flags:
need_mtag = True
if need_mtag:
if need_mtag and not self.mtag:
self.mtag = MTag(self.log_func, self.args)
if not self.mtag.usable:
self.mtag = None
# e2ds(a) volumes first,
# also covers tags where e2ts is set
# e2ds(a) volumes first
for vol in vols:
en = {}
if "mte" in vol.flags:
@ -188,26 +218,45 @@ class Up2k(object):
have_e2d = True
if "e2ds" in vol.flags:
r = self._build_file_index(vol, vols)
if not r:
needed_mutagen = True
self.volstate[vol.vpath] = "busy (hashing files)"
_, vac = self._build_file_index(vol, list(all_vols.values()))
if vac:
need_vac[vol] = True
if "e2ts" not in vol.flags:
m = "online, idle"
else:
m = "online (tags pending)"
self.volstate[vol.vpath] = m
# open the rest + do any e2ts(a)
needed_mutagen = False
for vol in vols:
r = self.register_vpath(vol.realpath, vol.flags)
if not r or "e2ts" not in vol.flags:
if "e2ts" not in vol.flags:
continue
cur, db_path, sz0 = r
n_add, n_rm, success = self._build_tags_index(vol.realpath)
m = "online (reading tags)"
self.volstate[vol.vpath] = m
self.log("{} [{}]".format(m, vol.realpath))
nadd, nrm, success = self._build_tags_index(vol)
if not success:
needed_mutagen = True
if n_add or n_rm:
self.vac(cur, db_path, n_add, n_rm, sz0)
if nadd or nrm:
need_vac[vol] = True
self.volstate[vol.vpath] = "online (mtp soon)"
for vol in need_vac:
cur, _ = self.register_vpath(vol.realpath, vol.flags)
with self.mutex:
cur.connection.commit()
cur.execute("vacuum")
self.pp.end = True
msg = "{} volumes in {:.2f} sec"
self.log(msg.format(len(vols), time.time() - t0))
@ -215,110 +264,104 @@ class Up2k(object):
msg = "could not read tags because no backends are available (mutagen or ffprobe)"
self.log(msg, c=1)
thr = None
if self.mtag:
m = "online (running mtp)"
if scan_vols:
thr = threading.Thread(target=self._run_all_mtp)
thr.daemon = True
else:
del self.pp
m = "online, idle"
for vol in vols:
self.volstate[vol.vpath] = m
if thr:
thr.start()
return have_e2d
def register_vpath(self, ptop, flags):
with self.mutex:
if ptop in self.registry:
return None
db_path = os.path.join(ptop, ".hist", "up2k.db")
if ptop in self.registry:
return [self.cur[ptop], db_path]
_, flags = self._expr_idx_filter(flags)
_, flags = self._expr_idx_filter(flags)
ft = "\033[0;32m{}{:.0}"
ff = "\033[0;35m{}{:.0}"
fv = "\033[0;36m{}:\033[1;30m{}"
a = [
(ft if v is True else ff if v is False else fv).format(k, str(v))
for k, v in flags.items()
]
if a:
self.log(" ".join(sorted(a)) + "\033[0m")
ft = "\033[0;32m{}{:.0}"
ff = "\033[0;35m{}{:.0}"
fv = "\033[0;36m{}:\033[1;30m{}"
a = [
(ft if v is True else ff if v is False else fv).format(k, str(v))
for k, v in flags.items()
]
if a:
self.log(" ".join(sorted(a)) + "\033[0m")
reg = {}
path = os.path.join(ptop, ".hist", "up2k.snap")
if "e2d" in flags and os.path.exists(path):
with gzip.GzipFile(path, "rb") as f:
j = f.read().decode("utf-8")
reg = {}
path = os.path.join(ptop, ".hist", "up2k.snap")
if "e2d" in flags and os.path.exists(path):
with gzip.GzipFile(path, "rb") as f:
j = f.read().decode("utf-8")
reg2 = json.loads(j)
for k, job in reg2.items():
path = os.path.join(job["ptop"], job["prel"], job["name"])
if os.path.exists(fsenc(path)):
reg[k] = job
job["poke"] = time.time()
else:
self.log("ign deleted file in snap: [{}]".format(path))
reg2 = json.loads(j)
for k, job in reg2.items():
path = os.path.join(job["ptop"], job["prel"], job["name"])
if os.path.exists(fsenc(path)):
reg[k] = job
job["poke"] = time.time()
else:
self.log("ign deleted file in snap: [{}]".format(path))
m = "loaded snap {} |{}|".format(path, len(reg.keys()))
m = [m] + self._vis_reg_progress(reg)
self.log("\n".join(m))
self.flags[ptop] = flags
self.registry[ptop] = reg
if not HAVE_SQLITE3 or "e2d" not in flags or "d2d" in flags:
return None
try:
os.mkdir(os.path.join(ptop, ".hist"))
except:
pass
db_path = os.path.join(ptop, ".hist", "up2k.db")
if ptop in self.cur:
return None
try:
sz0 = 0
if os.path.exists(db_path):
sz0 = os.path.getsize(db_path) // 1024
cur = self._open_db(db_path)
self.cur[ptop] = cur
return [cur, db_path, sz0]
except:
msg = "cannot use database at [{}]:\n{}"
self.log(msg.format(ptop, traceback.format_exc()))
m = "loaded snap {} |{}|".format(path, len(reg.keys()))
m = [m] + self._vis_reg_progress(reg)
self.log("\n".join(m))
self.flags[ptop] = flags
self.registry[ptop] = reg
if not HAVE_SQLITE3 or "e2d" not in flags or "d2d" in flags:
return None
try:
os.mkdir(os.path.join(ptop, ".hist"))
except:
pass
try:
cur = self._open_db(db_path)
self.cur[ptop] = cur
return [cur, db_path]
except:
msg = "cannot use database at [{}]:\n{}"
self.log(msg.format(ptop, traceback.format_exc()))
return None
def _build_file_index(self, vol, all_vols):
do_vac = False
top = vol.realpath
reg = self.register_vpath(top, vol.flags)
if not reg:
return
with self.mutex:
cur, _ = self.register_vpath(top, vol.flags)
_, db_path, sz0 = reg
dbw = [reg[0], 0, time.time()]
self.pp.n = next(dbw[0].execute("select count(w) from up"))[0]
dbw = [cur, 0, time.time()]
self.pp.n = next(dbw[0].execute("select count(w) from up"))[0]
excl = [
vol.realpath + "/" + d.vpath[len(vol.vpath) :].lstrip("/")
for d in all_vols
if d != vol and (d.vpath.startswith(vol.vpath + "/") or not vol.vpath)
]
n_add = self._build_dir(dbw, top, set(excl), top)
n_rm = self._drop_lost(dbw[0], top)
if dbw[1]:
self.log("commit {} new files".format(dbw[1]))
dbw[0].connection.commit()
excl = [
vol.realpath + "/" + d.vpath[len(vol.vpath) :].lstrip("/")
for d in all_vols
if d != vol and (d.vpath.startswith(vol.vpath + "/") or not vol.vpath)
]
if WINDOWS:
excl = [x.replace("/", "\\") for x in excl]
n_add, n_rm, success = self._build_tags_index(vol.realpath)
n_add = self._build_dir(dbw, top, set(excl), top)
n_rm = self._drop_lost(dbw[0], top)
if dbw[1]:
self.log("commit {} new files".format(dbw[1]))
dbw[0].connection.commit()
dbw[0].connection.commit()
if n_add or n_rm or do_vac:
self.vac(dbw[0], db_path, n_add, n_rm, sz0)
return success
def vac(self, cur, db_path, n_add, n_rm, sz0):
sz1 = os.path.getsize(db_path) // 1024
cur.execute("vacuum")
sz2 = os.path.getsize(db_path) // 1024
msg = "{} new, {} del, {} kB vacced, {} kB gain, {} kB now".format(
n_add, n_rm, sz1 - sz2, sz2 - sz0, sz2
)
self.log(msg)
return True, n_add or n_rm or do_vac
def _build_dir(self, dbw, top, excl, cdir):
self.pp.msg = "a{} {}".format(self.pp.n, cdir)
@ -413,45 +456,53 @@ class Up2k(object):
return len(rm)
def _build_tags_index(self, ptop):
entags = self.entags[ptop]
flags = self.flags[ptop]
cur = self.cur[ptop]
def _build_tags_index(self, vol):
ptop = vol.realpath
with self.mutex:
_, db_path = self.register_vpath(ptop, vol.flags)
entags = self.entags[ptop]
flags = self.flags[ptop]
cur = self.cur[ptop]
n_add = 0
n_rm = 0
n_buf = 0
last_write = time.time()
if "e2tsr" in flags:
n_rm = cur.execute("select count(w) from mt").fetchone()[0]
if n_rm:
self.log("discarding {} media tags for a full rescan".format(n_rm))
cur.execute("delete from mt")
else:
self.log("volume has e2tsr but there are no media tags to discard")
with self.mutex:
n_rm = cur.execute("select count(w) from mt").fetchone()[0]
if n_rm:
self.log("discarding {} media tags for a full rescan".format(n_rm))
cur.execute("delete from mt")
# integrity: drop tags for tracks that were deleted
if "e2t" in flags:
drops = []
c2 = cur.connection.cursor()
up_q = "select w from up where substr(w,1,16) = ?"
for (w,) in cur.execute("select w from mt"):
if not c2.execute(up_q, (w,)).fetchone():
drops.append(w[:16])
c2.close()
with self.mutex:
drops = []
c2 = cur.connection.cursor()
up_q = "select w from up where substr(w,1,16) = ?"
for (w,) in cur.execute("select w from mt"):
if not c2.execute(up_q, (w,)).fetchone():
drops.append(w[:16])
c2.close()
if drops:
msg = "discarding media tags for {} deleted files"
self.log(msg.format(len(drops)))
n_rm += len(drops)
for w in drops:
cur.execute("delete from mt where w = ?", (w,))
if drops:
msg = "discarding media tags for {} deleted files"
self.log(msg.format(len(drops)))
n_rm += len(drops)
for w in drops:
cur.execute("delete from mt where w = ?", (w,))
# bail if a volume flag disables indexing
if "d2t" in flags or "d2d" in flags:
return n_add, n_rm, True
# add tags for new files
gcur = cur
with self.mutex:
gcur.connection.commit()
if "e2ts" in flags:
if not self.mtag:
return n_add, n_rm, False
@ -460,8 +511,10 @@ class Up2k(object):
if self.mtag.prefer_mt and not self.args.no_mtag_mt:
mpool = self._start_mpool()
c2 = cur.connection.cursor()
c3 = cur.connection.cursor()
conn = sqlite3.connect(db_path, timeout=15)
cur = conn.cursor()
c2 = conn.cursor()
c3 = conn.cursor()
n_left = cur.execute("select count(w) from up").fetchone()[0]
for w, rd, fn in cur.execute("select w, rd, fn from up"):
n_left -= 1
@ -483,7 +536,8 @@ class Up2k(object):
n_tags = self._tag_file(c3, *args)
else:
mpool.put(["mtag"] + args)
n_tags = len(self._flush_mpool(c3))
with self.mutex:
n_tags = len(self._flush_mpool(c3))
n_add += n_tags
n_buf += n_tags
@ -495,26 +549,32 @@ class Up2k(object):
last_write = time.time()
n_buf = 0
self._stop_mpool(mpool, c3)
self._stop_mpool(mpool)
with self.mutex:
n_add += len(self._flush_mpool(c3))
conn.commit()
c3.close()
c2.close()
cur.close()
conn.close()
with self.mutex:
gcur.connection.commit()
return n_add, n_rm, True
def _flush_mpool(self, wcur):
with self.mutex:
ret = []
for x in self.pending_tags:
self._tag_file(wcur, *x)
ret.append(x[1])
ret = []
for x in self.pending_tags:
self._tag_file(wcur, *x)
ret.append(x[1])
self.pending_tags = []
return ret
self.pending_tags = []
return ret
def _run_all_mtp(self):
t0 = time.time()
self.mtp_parsers = {}
for ptop, flags in self.flags.items():
if "mtp" in flags:
self._run_one_mtp(ptop)
@ -523,10 +583,11 @@ class Up2k(object):
msg = "mtp finished in {:.2f} sec ({})"
self.log(msg.format(td, s2hms(td, True)))
def _run_one_mtp(self, ptop):
db_path = os.path.join(ptop, ".hist", "up2k.db")
sz0 = os.path.getsize(db_path) // 1024
del self.pp
for k in list(self.volstate.keys()):
self.volstate[k] = "online, idle"
def _run_one_mtp(self, ptop):
entags = self.entags[ptop]
parsers = {}
@ -585,9 +646,8 @@ class Up2k(object):
jobs.append([parsers, None, w, abspath])
in_progress[w] = True
done = self._flush_mpool(wcur)
with self.mutex:
done = self._flush_mpool(wcur)
for w in done:
to_delete[w] = True
in_progress.pop(w)
@ -628,15 +688,16 @@ class Up2k(object):
with self.mutex:
cur.connection.commit()
done = self._stop_mpool(mpool, wcur)
self._stop_mpool(mpool)
with self.mutex:
done = self._flush_mpool(wcur)
for w in done:
q = "delete from mt where w = ? and k = 't:mtp'"
cur.execute(q, (w,))
cur.connection.commit()
if n_done:
self.vac(cur, db_path, n_done, 0, sz0)
cur.execute("vacuum")
wcur.close()
cur.close()
@ -693,7 +754,7 @@ class Up2k(object):
return mpool
def _stop_mpool(self, mpool, wcur):
def _stop_mpool(self, mpool):
if not mpool:
return
@ -701,8 +762,6 @@ class Up2k(object):
mpool.put(None)
mpool.join()
done = self._flush_mpool(wcur)
return done
def _tag_thr(self, q):
while True:

View file

@ -26,6 +26,13 @@ a {
border-radius: .2em;
padding: .2em .8em;
}
td, th {
padding: .3em .6em;
text-align: left;
}
.btns {
margin: 1em 0;
}
html.dark,

View file

@ -13,6 +13,23 @@
<div id="wrap">
<p>hello {{ this.uname }}</p>
{%- if avol %}
<h1>admin panel:</h1>
<table>
<thead><tr><th>vol</th><th>action</th><th>status</th></tr></thead>
<tbody>
{% for mp in avol %}
{%- if mp in vstate and vstate[mp] %}
<tr><td>/{{ mp }}</td><td><a href="/{{ mp }}?scan">rescan</a></td><td>{{ vstate[mp] }}</td></tr>
{%- endif %}
{% endfor %}
</tbody>
</table>
<div class="btns">
<a href="/{{ avol[0] }}?stack">dump stack</a>
</div>
{%- endif %}
{%- if rvol %}
<h1>you can browse these:</h1>
<ul>