From 844194ee29e080fb4bf596527491552941130e69 Mon Sep 17 00:00:00 2001 From: ed Date: Wed, 11 Sep 2024 06:56:12 +0000 Subject: [PATCH] incoming-ETA: improve accuracy --- copyparty/httpcli.py | 42 +++++++++++++++++++++++++++++------------- copyparty/up2k.py | 31 +++++++++++++++++++++---------- 2 files changed, 50 insertions(+), 23 deletions(-) diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 36bf858e..5ed50914 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -2302,11 +2302,15 @@ class HttpCli(object): vfs, _ = self.asrv.vfs.get(self.vpath, self.uname, False, True) ptop = (vfs.dbv or vfs).realpath - x = self.conn.hsrv.broker.ask("up2k.handle_chunks", ptop, wark, chashes) + broker = self.conn.hsrv.broker + x = broker.ask("up2k.handle_chunks", ptop, wark, chashes) response = x.get() chashes, chunksize, cstarts, path, lastmod, sprs = response maxsize = chunksize * len(chashes) cstart0 = cstarts[0] + locked = chashes # remaining chunks to be received in this request + written = [] # chunks written to disk, but not yet released by up2k + num_left = -1 # num chunks left according to most recent up2k release try: if self.args.nw: @@ -2371,6 +2375,20 @@ class HttpCli(object): self.log("clone {} done".format(cstart[0])) + # be quick to keep the tcp winsize scale; + # if we can't confirm rn then that's fine + written.append(chash) + x = broker.ask("up2k.fast_confirm_chunks", ptop, wark, written) + num_left, t = x.get() + if num_left < -1: + self.loud_reply(t, status=500) + locked = written = [] + return False + elif num_left >= 0: + self.log("got %d chunks, %d left" % (len(written), num_left), 6) + locked = locked[len(written):] + written = [] + if not fpool: f.close() else: @@ -2381,25 +2399,23 @@ class HttpCli(object): f.close() raise finally: - x = self.conn.hsrv.broker.ask("up2k.release_chunks", ptop, wark, chashes) - x.get() # block client until released + if locked: + # now block until all chunks released+confirmed + x = broker.ask("up2k.confirm_chunks", ptop, wark, locked) + num_left, t = x.get() + if num_left < 0: + self.loud_reply(t, status=500) + return False - x = self.conn.hsrv.broker.ask("up2k.confirm_chunks", ptop, wark, chashes) - ztis = x.get() - try: - num_left, fin_path = ztis - except: - self.loud_reply(ztis, status=500) - return False + if num_left < 0: + raise Pebkac(500, "unconfirmed; see serverlog") if not num_left and fpool: with self.u2mutex: self.u2fh.close(path) if not num_left and not self.args.nw: - self.conn.hsrv.broker.ask( - "up2k.finish_upload", ptop, wark, self.u2fh.aps - ).get() + broker.ask("up2k.finish_upload", ptop, wark, self.u2fh.aps).get() cinf = self.headers.get("x-up2k-stat", "") diff --git a/copyparty/up2k.py b/copyparty/up2k.py index dfaf95f7..18f1cafe 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -3343,19 +3343,30 @@ class Up2k(object): return chashes, chunksize, coffsets, path, job["lmod"], job["sprs"] - def release_chunks(self, ptop: str, wark: str, chashes: list[str]) -> bool: - with self.reg_mutex: - job = self.registry[ptop].get(wark) - if job: - for chash in chashes: - job["busy"].pop(chash, None) - - return True + def fast_confirm_chunks( + self, ptop: str, wark: str, chashes: list[str] + ) -> tuple[int, str]: + if not self.mutex.acquire(False): + return -1, "" + if not self.reg_mutex.acquire(False): + self.mutex.release() + return -1, "" + try: + return self._confirm_chunks(ptop, wark, chashes) + finally: + self.reg_mutex.release() + self.mutex.release() def confirm_chunks( self, ptop: str, wark: str, chashes: list[str] ) -> tuple[int, str]: with self.mutex, self.reg_mutex: + return self._confirm_chunks(ptop, wark, chashes) + + def _confirm_chunks( + self, ptop: str, wark: str, chashes: list[str] + ) -> tuple[int, str]: + if True: self.db_act = self.vol_act[ptop] = time.time() try: job = self.registry[ptop][wark] @@ -3363,7 +3374,7 @@ class Up2k(object): src = djoin(pdir, job["tnam"]) dst = djoin(pdir, job["name"]) except Exception as ex: - return "confirm_chunk, wark(%r)" % (ex,) # type: ignore + return -2, "confirm_chunk, wark(%r)" % (ex,) # type: ignore for chash in chashes: job["busy"].pop(chash, None) @@ -3372,7 +3383,7 @@ class Up2k(object): for chash in chashes: job["need"].remove(chash) except Exception as ex: - return "confirm_chunk, chash(%s) %r" % (chash, ex) # type: ignore + return -2, "confirm_chunk, chash(%s) %r" % (chash, ex) # type: ignore ret = len(job["need"]) if ret > 0: