From 4792c2770d94b65f69fcbae999dbcb3ddcde54a0 Mon Sep 17 00:00:00 2001 From: ed Date: Sun, 3 Jul 2022 02:39:15 +0200 Subject: [PATCH] fix a spin --- copyparty/up2k.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/copyparty/up2k.py b/copyparty/up2k.py index e1e1f3d1..1ee3c446 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -106,6 +106,7 @@ class Up2k(object): self.pending_tags: list[tuple[set[str], str, str, dict[str, Any]]] = [] self.hashq: Queue[tuple[str, str, str, str, float]] = Queue() self.tagq: Queue[tuple[str, str, str, str]] = Queue() + self.tag_event = threading.Condition() self.n_hashq = 0 self.n_tagq = 0 self.mpool_used = False @@ -958,6 +959,7 @@ class Up2k(object): to_delete = {} in_progress = {} while True: + did_nothing = True with self.mutex: if gid != self.gid: break @@ -979,6 +981,7 @@ class Up2k(object): zq2 = cur.execute(q, (w,)).fetchall() have: dict[str, Union[str, float]] = {x[0]: 1 for x in zq2} + did_nothing = False parsers = self._get_parsers(ptop, have, abspath) if not parsers: to_delete[w] = True @@ -992,6 +995,7 @@ class Up2k(object): done = self._flush_mpool(wcur) for w in done: to_delete[w] = True + did_nothing = False in_progress.pop(w) n_done += 1 @@ -1004,6 +1008,10 @@ class Up2k(object): if not warks: break + if did_nothing: + with self.tag_event: + self.tag_event.wait(0.2) + if not jobs: continue @@ -1132,6 +1140,10 @@ class Up2k(object): except: ex = traceback.format_exc() self._log_tag_err(qe.mtp or self.mtag.backend, qe.abspath, ex) + finally: + if qe.mtp: + with self.tag_event: + self.tag_event.notify_all() q.task_done()