From 2b4ccdbebb5b3a676628b9a888be203ab4c2ebed Mon Sep 17 00:00:00 2001 From: ed Date: Thu, 4 Mar 2021 20:28:03 +0100 Subject: [PATCH] multithread the slow mtag backends --- copyparty/__main__.py | 1 + copyparty/mtag.py | 11 ++++--- copyparty/up2k.py | 73 ++++++++++++++++++++++++++++++++++++++----- 3 files changed, 73 insertions(+), 12 deletions(-) diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 3dc2b136..cbb08822 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -256,6 +256,7 @@ def main(): ap2.add_argument("-e2ts", action="store_true", help="enable metadata scanner, sets -e2t") ap2.add_argument("-e2tsr", action="store_true", help="rescan all metadata, sets -e2ts") ap2.add_argument("--no-mutagen", action="store_true", help="use ffprobe for tags instead") + ap2.add_argument("--no-mtag-mt", action="store_true", help="disable tag-read parallelism") ap2.add_argument("-mtm", metavar="M=t,t,t", action="append", type=str, help="add/replace metadata mapping") ap2.add_argument("-mte", metavar="M,M,M", type=str, help="tags to index/display (comma-sep.)", default="circle,album,.tn,artist,title,.bpm,key,.dur,.q") diff --git a/copyparty/mtag.py b/copyparty/mtag.py index 9451f5c7..abaa9583 100644 --- a/copyparty/mtag.py +++ b/copyparty/mtag.py @@ -1,6 +1,5 @@ # coding: utf-8 from __future__ import print_function, unicode_literals -from math import fabs import re import os @@ -16,19 +15,21 @@ class MTag(object): def __init__(self, log_func, args): self.log_func = log_func self.usable = True + self.prefer_mt = False mappings = args.mtm - backend = "ffprobe" if args.no_mutagen else "mutagen" + self.backend = "ffprobe" if args.no_mutagen else "mutagen" - if backend == "mutagen": + if self.backend == "mutagen": self.get = self.get_mutagen try: import mutagen except: self.log("\033[33mcould not load mutagen, trying ffprobe instead") - backend = "ffprobe" + self.backend = "ffprobe" - if backend == "ffprobe": + if self.backend == "ffprobe": self.get = self.get_ffprobe + self.prefer_mt = True # about 20x slower if PY2: cmd = ["ffprobe", "-version"] diff --git a/copyparty/up2k.py b/copyparty/up2k.py index e9a768cd..378c6a6e 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -63,6 +63,9 @@ class Up2k(object): self.entags = {} self.flags = {} self.cur = {} + self.mtag = None + self.n_mtag_thr_alive = 0 + self.n_mtag_tags_added = 0 self.mem_cur = None if HAVE_SQLITE3: @@ -77,10 +80,6 @@ class Up2k(object): thr.daemon = True thr.start() - self.mtag = MTag(self.log_func, self.args) - if not self.mtag.usable: - self.mtag = None - # static self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$") @@ -158,6 +157,16 @@ class Up2k(object): vols = live_vols + need_mtag = False + for vol in auth.vfs.all_vols.values(): + if "e2t" in vol.flags: + need_mtag = True + + if need_mtag: + self.mtag = MTag(self.log_func, self.args) + if not self.mtag.usable: + self.mtag = None + # e2ds(a) volumes first, # also covers tags where e2ts is set for vol in vols: @@ -419,7 +428,24 @@ class Up2k(object): if not self.mtag: return n_add, n_rm, False + mpool = False + if self.mtag.prefer_mt and not self.args.no_mtag_mt: + # mp.pool.ThreadPool and concurrent.futures.ThreadPoolExecutor + # both do crazy runahead so lets reinvent another wheel + nw = os.cpu_count() + if not self.n_mtag_thr_alive: + msg = 'using {} cores for tag reader "{}"' + self.log(msg.format(nw, self.mtag.backend)) + + self.n_mtag_thr_alive = nw + mpool = Queue(nw) + for _ in range(nw): + thr = threading.Thread(target=self._tag_thr, args=(mpool,)) + thr.daemon = True + thr.start() + c2 = cur.connection.cursor() + c3 = cur.connection.cursor() n_left = cur.execute("select count(w) from up").fetchone()[0] for w, rd, fn in cur.execute("select w, rd, fn from up"): n_left -= 1 @@ -429,7 +455,15 @@ class Up2k(object): abspath = os.path.join(ptop, rd, fn) self.pp.msg = "c{} {}".format(n_left, abspath) - n_tags = self._tag_file(c2, entags, w, abspath) + args = c3, entags, w, abspath + if not mpool: + n_tags = self._tag_file(*args) + else: + mpool.put(args) + with self.mutex: + n_tags = self.n_mtag_tags_added + self.n_mtag_tags_added = 0 + n_add += n_tags n_buf += n_tags @@ -440,12 +474,37 @@ class Up2k(object): last_write = time.time() n_buf = 0 + if self.n_mtag_thr_alive: + mpool.join() + for _ in range(self.n_mtag_thr_alive): + mpool.put(None) + + c3.close() c2.close() return n_add, n_rm, True - def _tag_file(self, write_cur, entags, wark, abspath): - tags = self.mtag.get(abspath) + def _tag_thr(self, q): + while True: + task = q.get() + if not task: + break + + try: + write_cur, entags, wark, abspath = task + tags = self.mtag.get(abspath) + with self.mutex: + n = self._tag_file(write_cur, entags, wark, abspath, tags) + self.n_mtag_tags_added += n + except: + with self.mutex: + self.n_mtag_thr_alive -= 1 + raise + finally: + q.task_done() + + def _tag_file(self, write_cur, entags, wark, abspath, tags=None): + tags = tags or self.mtag.get(abspath) tags = {k: v for k, v in tags.items() if k in entags} if not tags: # indicate scanned without tags