From b136a5b042127cad1272c5c8cde29c8993f6f249 Mon Sep 17 00:00:00 2001 From: ed Date: Tue, 23 Sep 2025 19:11:41 +0000 Subject: [PATCH] fast_confirm_chunks: release all on error; possibly fixes an issue someone has been runnning into: an upload could get stuck on "that chunk is already being written to" when the server was overloaded enough that connections kept dropping --- copyparty/httpcli.py | 4 +++- copyparty/up2k.py | 17 ++++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 185545b0..b52db5c7 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -2952,7 +2952,9 @@ class HttpCli(object): if now - treport < 1: continue treport = now - x = broker.ask("up2k.fast_confirm_chunks", ptop, wark, written) + x = broker.ask( + "up2k.fast_confirm_chunks", ptop, wark, written, locked + ) num_left, t = x.get() if num_left < -1: self.loud_reply(t, status=500) diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 203162f5..f9f30434 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -3679,14 +3679,15 @@ class Up2k(object): t = t.format(job["name"], nchunks[0][0], coffsets[0][0], cur_sz) raise Pebkac(400, t) - job["busy"][chash] = 1 + for chash in chashes: + job["busy"][chash] = 1 job["poke"] = time.time() return chashes, chunksize, coffsets, path, job["lmod"], job["size"], job["sprs"] def fast_confirm_chunks( - self, ptop: str, wark: str, chashes: list[str] + self, ptop: str, wark: str, chashes: list[str], locked: list[str] ) -> tuple[int, str]: if not self.mutex.acquire(False): return -1, "" @@ -3694,7 +3695,7 @@ class Up2k(object): self.mutex.release() return -1, "" try: - return self._confirm_chunks(ptop, wark, chashes, chashes) + return self._confirm_chunks(ptop, wark, chashes, locked, False) finally: self.reg_mutex.release() self.mutex.release() @@ -3703,10 +3704,10 @@ class Up2k(object): self, ptop: str, wark: str, written: list[str], locked: list[str] ) -> tuple[int, str]: with self.mutex, self.reg_mutex: - return self._confirm_chunks(ptop, wark, written, locked) + return self._confirm_chunks(ptop, wark, written, locked, True) def _confirm_chunks( - self, ptop: str, wark: str, written: list[str], locked: list[str] + self, ptop: str, wark: str, written: list[str], locked: list[str], final: bool ) -> tuple[int, str]: if True: self.db_act = self.vol_act[ptop] = time.time() @@ -3718,14 +3719,16 @@ class Up2k(object): except Exception as ex: return -2, "confirm_chunk, wark(%r)" % (ex,) # type: ignore - for chash in locked: + for chash in locked if final else written: job["busy"].pop(chash, None) try: for chash in written: job["need"].remove(chash) except Exception as ex: - # dead tcp connections can get here by timeout (OK) + for zs in locked: + if job["busy"].pop(zs, None): + self.log("panic-unlock wark(%s) chunk(%s)" % (wark, zs), 1) return -2, "confirm_chunk, chash(%s) %r" % (chash, ex) # type: ignore ret = len(job["need"])