From e8a653ca0c423731d5fc7b91d8f30ec6c01e50be Mon Sep 17 00:00:00 2001 From: ed Date: Mon, 8 Jan 2024 22:10:16 +0000 Subject: [PATCH] don't block non-up2k uploads during indexing; due to all upload APIs invoking up2k.hash_file to index uploads, the uploads could block during a rescan for a crazy long time (past most gateway timeouts); now this is mostly fire-and-forget "mostly" because this also adds a conditional slowdown to help the hasher churn through if the queue gets too big worst case, if the server is restarted before it catches up, this would rely on filesystem reindexing to eventually index the files after a restart or on a schedule, meaning uploader info would be lost on shutdown, but this is usually fine anyways (and this was also the case until now) --- copyparty/up2k.py | 44 +++++++++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 3001e60b..eb1c1357 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -145,9 +145,12 @@ class Up2k(object): self.entags: dict[str, set[str]] = {} self.mtp_parsers: dict[str, dict[str, MParser]] = {} self.pending_tags: list[tuple[set[str], str, str, dict[str, Any]]] = [] - self.hashq: Queue[tuple[str, str, str, str, str, float, str, bool]] = Queue() + self.hashq: Queue[ + tuple[str, str, dict[str, Any], str, str, str, float, str, bool] + ] = Queue() self.tagq: Queue[tuple[str, str, str, str, str, float]] = Queue() self.tag_event = threading.Condition() + self.hashq_mutex = threading.Lock() self.n_hashq = 0 self.n_tagq = 0 self.mpool_used = False @@ -2351,7 +2354,8 @@ class Up2k(object): t = "cannot receive uploads right now;\nserver busy with {}.\nPlease wait; the client will retry..." raise Pebkac(503, t.format(self.blocked or "[unknown]")) except TypeError: - # py2 + if not PY2: + raise with self.mutex: self._job_volchk(cj) @@ -3991,16 +3995,16 @@ class Up2k(object): self.log("tagged {} ({}+{})".format(abspath, ntags1, len(tags) - ntags1)) def _hasher(self) -> None: - with self.mutex: + with self.hashq_mutex: self.n_hashq += 1 while True: - with self.mutex: + with self.hashq_mutex: self.n_hashq -= 1 # self.log("hashq {}".format(self.n_hashq)) task = self.hashq.get() - if len(task) != 8: + if len(task) != 9: raise Exception("invalid hash task") try: @@ -4009,11 +4013,14 @@ class Up2k(object): except Exception as ex: self.log("failed to hash %s: %s" % (task, ex), 1) - def _hash_t(self, task: tuple[str, str, str, str, str, float, str, bool]) -> bool: - ptop, vtop, rd, fn, ip, at, usr, skip_xau = task + def _hash_t( + self, task: tuple[str, str, dict[str, Any], str, str, str, float, str, bool] + ) -> bool: + ptop, vtop, flags, rd, fn, ip, at, usr, skip_xau = task # self.log("hashq {} pop {}/{}/{}".format(self.n_hashq, ptop, rd, fn)) - if "e2d" not in self.flags[ptop]: - return True + with self.mutex: + if not self.register_vpath(ptop, flags): + return True abspath = djoin(ptop, rd, fn) self.log("hashing " + abspath) @@ -4064,11 +4071,22 @@ class Up2k(object): usr: str, skip_xau: bool = False, ) -> None: - with self.mutex: - self.register_vpath(ptop, flags) - self.hashq.put((ptop, vtop, rd, fn, ip, at, usr, skip_xau)) + if "e2d" not in flags: + return + + if self.n_hashq > 1024: + t = "%d files in hashq; taking a nap" + self.log(t % (self.n_hashq,), 6) + + for _ in range(self.n_hashq // 1024): + time.sleep(0.1) + if self.n_hashq < 1024: + break + + zt = (ptop, vtop, flags, rd, fn, ip, at, usr, skip_xau) + with self.hashq_mutex: + self.hashq.put(zt) self.n_hashq += 1 - # self.log("hashq {} push {}/{}/{}".format(self.n_hashq, ptop, rd, fn)) def shutdown(self) -> None: self.stop = True