diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 3001e60b..eb1c1357 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -145,9 +145,12 @@ class Up2k(object): self.entags: dict[str, set[str]] = {} self.mtp_parsers: dict[str, dict[str, MParser]] = {} self.pending_tags: list[tuple[set[str], str, str, dict[str, Any]]] = [] - self.hashq: Queue[tuple[str, str, str, str, str, float, str, bool]] = Queue() + self.hashq: Queue[ + tuple[str, str, dict[str, Any], str, str, str, float, str, bool] + ] = Queue() self.tagq: Queue[tuple[str, str, str, str, str, float]] = Queue() self.tag_event = threading.Condition() + self.hashq_mutex = threading.Lock() self.n_hashq = 0 self.n_tagq = 0 self.mpool_used = False @@ -2351,7 +2354,8 @@ class Up2k(object): t = "cannot receive uploads right now;\nserver busy with {}.\nPlease wait; the client will retry..." raise Pebkac(503, t.format(self.blocked or "[unknown]")) except TypeError: - # py2 + if not PY2: + raise with self.mutex: self._job_volchk(cj) @@ -3991,16 +3995,16 @@ class Up2k(object): self.log("tagged {} ({}+{})".format(abspath, ntags1, len(tags) - ntags1)) def _hasher(self) -> None: - with self.mutex: + with self.hashq_mutex: self.n_hashq += 1 while True: - with self.mutex: + with self.hashq_mutex: self.n_hashq -= 1 # self.log("hashq {}".format(self.n_hashq)) task = self.hashq.get() - if len(task) != 8: + if len(task) != 9: raise Exception("invalid hash task") try: @@ -4009,11 +4013,14 @@ class Up2k(object): except Exception as ex: self.log("failed to hash %s: %s" % (task, ex), 1) - def _hash_t(self, task: tuple[str, str, str, str, str, float, str, bool]) -> bool: - ptop, vtop, rd, fn, ip, at, usr, skip_xau = task + def _hash_t( + self, task: tuple[str, str, dict[str, Any], str, str, str, float, str, bool] + ) -> bool: + ptop, vtop, flags, rd, fn, ip, at, usr, skip_xau = task # self.log("hashq {} pop {}/{}/{}".format(self.n_hashq, ptop, rd, fn)) - if "e2d" not in self.flags[ptop]: - return True + with self.mutex: + if not self.register_vpath(ptop, flags): + return True abspath = djoin(ptop, rd, fn) self.log("hashing " + abspath) @@ -4064,11 +4071,22 @@ class Up2k(object): usr: str, skip_xau: bool = False, ) -> None: - with self.mutex: - self.register_vpath(ptop, flags) - self.hashq.put((ptop, vtop, rd, fn, ip, at, usr, skip_xau)) + if "e2d" not in flags: + return + + if self.n_hashq > 1024: + t = "%d files in hashq; taking a nap" + self.log(t % (self.n_hashq,), 6) + + for _ in range(self.n_hashq // 1024): + time.sleep(0.1) + if self.n_hashq < 1024: + break + + zt = (ptop, vtop, flags, rd, fn, ip, at, usr, skip_xau) + with self.hashq_mutex: + self.hashq.put(zt) self.n_hashq += 1 - # self.log("hashq {} push {}/{}/{}".format(self.n_hashq, ptop, rd, fn)) def shutdown(self) -> None: self.stop = True