incoming-ETA: improve accuracy

This commit is contained in:
ed 2024-09-11 06:56:12 +00:00
parent 609c5921d4
commit 844194ee29
2 changed files with 50 additions and 23 deletions

View file

@ -2302,11 +2302,15 @@ class HttpCli(object):
vfs, _ = self.asrv.vfs.get(self.vpath, self.uname, False, True) vfs, _ = self.asrv.vfs.get(self.vpath, self.uname, False, True)
ptop = (vfs.dbv or vfs).realpath ptop = (vfs.dbv or vfs).realpath
x = self.conn.hsrv.broker.ask("up2k.handle_chunks", ptop, wark, chashes) broker = self.conn.hsrv.broker
x = broker.ask("up2k.handle_chunks", ptop, wark, chashes)
response = x.get() response = x.get()
chashes, chunksize, cstarts, path, lastmod, sprs = response chashes, chunksize, cstarts, path, lastmod, sprs = response
maxsize = chunksize * len(chashes) maxsize = chunksize * len(chashes)
cstart0 = cstarts[0] cstart0 = cstarts[0]
locked = chashes # remaining chunks to be received in this request
written = [] # chunks written to disk, but not yet released by up2k
num_left = -1 # num chunks left according to most recent up2k release
try: try:
if self.args.nw: if self.args.nw:
@ -2371,6 +2375,20 @@ class HttpCli(object):
self.log("clone {} done".format(cstart[0])) self.log("clone {} done".format(cstart[0]))
# be quick to keep the tcp winsize scale;
# if we can't confirm rn then that's fine
written.append(chash)
x = broker.ask("up2k.fast_confirm_chunks", ptop, wark, written)
num_left, t = x.get()
if num_left < -1:
self.loud_reply(t, status=500)
locked = written = []
return False
elif num_left >= 0:
self.log("got %d chunks, %d left" % (len(written), num_left), 6)
locked = locked[len(written):]
written = []
if not fpool: if not fpool:
f.close() f.close()
else: else:
@ -2381,25 +2399,23 @@ class HttpCli(object):
f.close() f.close()
raise raise
finally: finally:
x = self.conn.hsrv.broker.ask("up2k.release_chunks", ptop, wark, chashes) if locked:
x.get() # block client until released # now block until all chunks released+confirmed
x = broker.ask("up2k.confirm_chunks", ptop, wark, locked)
num_left, t = x.get()
if num_left < 0:
self.loud_reply(t, status=500)
return False
x = self.conn.hsrv.broker.ask("up2k.confirm_chunks", ptop, wark, chashes) if num_left < 0:
ztis = x.get() raise Pebkac(500, "unconfirmed; see serverlog")
try:
num_left, fin_path = ztis
except:
self.loud_reply(ztis, status=500)
return False
if not num_left and fpool: if not num_left and fpool:
with self.u2mutex: with self.u2mutex:
self.u2fh.close(path) self.u2fh.close(path)
if not num_left and not self.args.nw: if not num_left and not self.args.nw:
self.conn.hsrv.broker.ask( broker.ask("up2k.finish_upload", ptop, wark, self.u2fh.aps).get()
"up2k.finish_upload", ptop, wark, self.u2fh.aps
).get()
cinf = self.headers.get("x-up2k-stat", "") cinf = self.headers.get("x-up2k-stat", "")

View file

@ -3343,19 +3343,30 @@ class Up2k(object):
return chashes, chunksize, coffsets, path, job["lmod"], job["sprs"] return chashes, chunksize, coffsets, path, job["lmod"], job["sprs"]
def release_chunks(self, ptop: str, wark: str, chashes: list[str]) -> bool: def fast_confirm_chunks(
with self.reg_mutex: self, ptop: str, wark: str, chashes: list[str]
job = self.registry[ptop].get(wark) ) -> tuple[int, str]:
if job: if not self.mutex.acquire(False):
for chash in chashes: return -1, ""
job["busy"].pop(chash, None) if not self.reg_mutex.acquire(False):
self.mutex.release()
return True return -1, ""
try:
return self._confirm_chunks(ptop, wark, chashes)
finally:
self.reg_mutex.release()
self.mutex.release()
def confirm_chunks( def confirm_chunks(
self, ptop: str, wark: str, chashes: list[str] self, ptop: str, wark: str, chashes: list[str]
) -> tuple[int, str]: ) -> tuple[int, str]:
with self.mutex, self.reg_mutex: with self.mutex, self.reg_mutex:
return self._confirm_chunks(ptop, wark, chashes)
def _confirm_chunks(
self, ptop: str, wark: str, chashes: list[str]
) -> tuple[int, str]:
if True:
self.db_act = self.vol_act[ptop] = time.time() self.db_act = self.vol_act[ptop] = time.time()
try: try:
job = self.registry[ptop][wark] job = self.registry[ptop][wark]
@ -3363,7 +3374,7 @@ class Up2k(object):
src = djoin(pdir, job["tnam"]) src = djoin(pdir, job["tnam"])
dst = djoin(pdir, job["name"]) dst = djoin(pdir, job["name"])
except Exception as ex: except Exception as ex:
return "confirm_chunk, wark(%r)" % (ex,) # type: ignore return -2, "confirm_chunk, wark(%r)" % (ex,) # type: ignore
for chash in chashes: for chash in chashes:
job["busy"].pop(chash, None) job["busy"].pop(chash, None)
@ -3372,7 +3383,7 @@ class Up2k(object):
for chash in chashes: for chash in chashes:
job["need"].remove(chash) job["need"].remove(chash)
except Exception as ex: except Exception as ex:
return "confirm_chunk, chash(%s) %r" % (chash, ex) # type: ignore return -2, "confirm_chunk, chash(%s) %r" % (chash, ex) # type: ignore
ret = len(job["need"]) ret = len(job["need"])
if ret > 0: if ret > 0: