multithread the slow mtag backends

This commit is contained in:
ed 2021-03-04 20:28:03 +01:00
parent 0d16b49489
commit 2b4ccdbebb
3 changed files with 73 additions and 12 deletions

View file

@ -256,6 +256,7 @@ def main():
ap2.add_argument("-e2ts", action="store_true", help="enable metadata scanner, sets -e2t")
ap2.add_argument("-e2tsr", action="store_true", help="rescan all metadata, sets -e2ts")
ap2.add_argument("--no-mutagen", action="store_true", help="use ffprobe for tags instead")
ap2.add_argument("--no-mtag-mt", action="store_true", help="disable tag-read parallelism")
ap2.add_argument("-mtm", metavar="M=t,t,t", action="append", type=str, help="add/replace metadata mapping")
ap2.add_argument("-mte", metavar="M,M,M", type=str, help="tags to index/display (comma-sep.)",
default="circle,album,.tn,artist,title,.bpm,key,.dur,.q")

View file

@ -1,6 +1,5 @@
# coding: utf-8
from __future__ import print_function, unicode_literals
from math import fabs
import re
import os
@ -16,19 +15,21 @@ class MTag(object):
def __init__(self, log_func, args):
self.log_func = log_func
self.usable = True
self.prefer_mt = False
mappings = args.mtm
backend = "ffprobe" if args.no_mutagen else "mutagen"
self.backend = "ffprobe" if args.no_mutagen else "mutagen"
if backend == "mutagen":
if self.backend == "mutagen":
self.get = self.get_mutagen
try:
import mutagen
except:
self.log("\033[33mcould not load mutagen, trying ffprobe instead")
backend = "ffprobe"
self.backend = "ffprobe"
if backend == "ffprobe":
if self.backend == "ffprobe":
self.get = self.get_ffprobe
self.prefer_mt = True
# about 20x slower
if PY2:
cmd = ["ffprobe", "-version"]

View file

@ -63,6 +63,9 @@ class Up2k(object):
self.entags = {}
self.flags = {}
self.cur = {}
self.mtag = None
self.n_mtag_thr_alive = 0
self.n_mtag_tags_added = 0
self.mem_cur = None
if HAVE_SQLITE3:
@ -77,10 +80,6 @@ class Up2k(object):
thr.daemon = True
thr.start()
self.mtag = MTag(self.log_func, self.args)
if not self.mtag.usable:
self.mtag = None
# static
self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$")
@ -158,6 +157,16 @@ class Up2k(object):
vols = live_vols
need_mtag = False
for vol in auth.vfs.all_vols.values():
if "e2t" in vol.flags:
need_mtag = True
if need_mtag:
self.mtag = MTag(self.log_func, self.args)
if not self.mtag.usable:
self.mtag = None
# e2ds(a) volumes first,
# also covers tags where e2ts is set
for vol in vols:
@ -419,7 +428,24 @@ class Up2k(object):
if not self.mtag:
return n_add, n_rm, False
mpool = False
if self.mtag.prefer_mt and not self.args.no_mtag_mt:
# mp.pool.ThreadPool and concurrent.futures.ThreadPoolExecutor
# both do crazy runahead so lets reinvent another wheel
nw = os.cpu_count()
if not self.n_mtag_thr_alive:
msg = 'using {} cores for tag reader "{}"'
self.log(msg.format(nw, self.mtag.backend))
self.n_mtag_thr_alive = nw
mpool = Queue(nw)
for _ in range(nw):
thr = threading.Thread(target=self._tag_thr, args=(mpool,))
thr.daemon = True
thr.start()
c2 = cur.connection.cursor()
c3 = cur.connection.cursor()
n_left = cur.execute("select count(w) from up").fetchone()[0]
for w, rd, fn in cur.execute("select w, rd, fn from up"):
n_left -= 1
@ -429,7 +455,15 @@ class Up2k(object):
abspath = os.path.join(ptop, rd, fn)
self.pp.msg = "c{} {}".format(n_left, abspath)
n_tags = self._tag_file(c2, entags, w, abspath)
args = c3, entags, w, abspath
if not mpool:
n_tags = self._tag_file(*args)
else:
mpool.put(args)
with self.mutex:
n_tags = self.n_mtag_tags_added
self.n_mtag_tags_added = 0
n_add += n_tags
n_buf += n_tags
@ -440,12 +474,37 @@ class Up2k(object):
last_write = time.time()
n_buf = 0
if self.n_mtag_thr_alive:
mpool.join()
for _ in range(self.n_mtag_thr_alive):
mpool.put(None)
c3.close()
c2.close()
return n_add, n_rm, True
def _tag_file(self, write_cur, entags, wark, abspath):
tags = self.mtag.get(abspath)
def _tag_thr(self, q):
while True:
task = q.get()
if not task:
break
try:
write_cur, entags, wark, abspath = task
tags = self.mtag.get(abspath)
with self.mutex:
n = self._tag_file(write_cur, entags, wark, abspath, tags)
self.n_mtag_tags_added += n
except:
with self.mutex:
self.n_mtag_thr_alive -= 1
raise
finally:
q.task_done()
def _tag_file(self, write_cur, entags, wark, abspath, tags=None):
tags = tags or self.mtag.get(abspath)
tags = {k: v for k, v in tags.items() if k in entags}
if not tags:
# indicate scanned without tags