mirror of
https://github.com/9001/copyparty.git
synced 2025-08-17 09:02:15 -06:00
incoming-ETA: improve accuracy
This commit is contained in:
parent
609c5921d4
commit
844194ee29
|
@ -2302,11 +2302,15 @@ class HttpCli(object):
|
|||
vfs, _ = self.asrv.vfs.get(self.vpath, self.uname, False, True)
|
||||
ptop = (vfs.dbv or vfs).realpath
|
||||
|
||||
x = self.conn.hsrv.broker.ask("up2k.handle_chunks", ptop, wark, chashes)
|
||||
broker = self.conn.hsrv.broker
|
||||
x = broker.ask("up2k.handle_chunks", ptop, wark, chashes)
|
||||
response = x.get()
|
||||
chashes, chunksize, cstarts, path, lastmod, sprs = response
|
||||
maxsize = chunksize * len(chashes)
|
||||
cstart0 = cstarts[0]
|
||||
locked = chashes # remaining chunks to be received in this request
|
||||
written = [] # chunks written to disk, but not yet released by up2k
|
||||
num_left = -1 # num chunks left according to most recent up2k release
|
||||
|
||||
try:
|
||||
if self.args.nw:
|
||||
|
@ -2371,6 +2375,20 @@ class HttpCli(object):
|
|||
|
||||
self.log("clone {} done".format(cstart[0]))
|
||||
|
||||
# be quick to keep the tcp winsize scale;
|
||||
# if we can't confirm rn then that's fine
|
||||
written.append(chash)
|
||||
x = broker.ask("up2k.fast_confirm_chunks", ptop, wark, written)
|
||||
num_left, t = x.get()
|
||||
if num_left < -1:
|
||||
self.loud_reply(t, status=500)
|
||||
locked = written = []
|
||||
return False
|
||||
elif num_left >= 0:
|
||||
self.log("got %d chunks, %d left" % (len(written), num_left), 6)
|
||||
locked = locked[len(written):]
|
||||
written = []
|
||||
|
||||
if not fpool:
|
||||
f.close()
|
||||
else:
|
||||
|
@ -2381,25 +2399,23 @@ class HttpCli(object):
|
|||
f.close()
|
||||
raise
|
||||
finally:
|
||||
x = self.conn.hsrv.broker.ask("up2k.release_chunks", ptop, wark, chashes)
|
||||
x.get() # block client until released
|
||||
|
||||
x = self.conn.hsrv.broker.ask("up2k.confirm_chunks", ptop, wark, chashes)
|
||||
ztis = x.get()
|
||||
try:
|
||||
num_left, fin_path = ztis
|
||||
except:
|
||||
self.loud_reply(ztis, status=500)
|
||||
if locked:
|
||||
# now block until all chunks released+confirmed
|
||||
x = broker.ask("up2k.confirm_chunks", ptop, wark, locked)
|
||||
num_left, t = x.get()
|
||||
if num_left < 0:
|
||||
self.loud_reply(t, status=500)
|
||||
return False
|
||||
|
||||
if num_left < 0:
|
||||
raise Pebkac(500, "unconfirmed; see serverlog")
|
||||
|
||||
if not num_left and fpool:
|
||||
with self.u2mutex:
|
||||
self.u2fh.close(path)
|
||||
|
||||
if not num_left and not self.args.nw:
|
||||
self.conn.hsrv.broker.ask(
|
||||
"up2k.finish_upload", ptop, wark, self.u2fh.aps
|
||||
).get()
|
||||
broker.ask("up2k.finish_upload", ptop, wark, self.u2fh.aps).get()
|
||||
|
||||
cinf = self.headers.get("x-up2k-stat", "")
|
||||
|
||||
|
|
|
@ -3343,19 +3343,30 @@ class Up2k(object):
|
|||
|
||||
return chashes, chunksize, coffsets, path, job["lmod"], job["sprs"]
|
||||
|
||||
def release_chunks(self, ptop: str, wark: str, chashes: list[str]) -> bool:
|
||||
with self.reg_mutex:
|
||||
job = self.registry[ptop].get(wark)
|
||||
if job:
|
||||
for chash in chashes:
|
||||
job["busy"].pop(chash, None)
|
||||
|
||||
return True
|
||||
def fast_confirm_chunks(
|
||||
self, ptop: str, wark: str, chashes: list[str]
|
||||
) -> tuple[int, str]:
|
||||
if not self.mutex.acquire(False):
|
||||
return -1, ""
|
||||
if not self.reg_mutex.acquire(False):
|
||||
self.mutex.release()
|
||||
return -1, ""
|
||||
try:
|
||||
return self._confirm_chunks(ptop, wark, chashes)
|
||||
finally:
|
||||
self.reg_mutex.release()
|
||||
self.mutex.release()
|
||||
|
||||
def confirm_chunks(
|
||||
self, ptop: str, wark: str, chashes: list[str]
|
||||
) -> tuple[int, str]:
|
||||
with self.mutex, self.reg_mutex:
|
||||
return self._confirm_chunks(ptop, wark, chashes)
|
||||
|
||||
def _confirm_chunks(
|
||||
self, ptop: str, wark: str, chashes: list[str]
|
||||
) -> tuple[int, str]:
|
||||
if True:
|
||||
self.db_act = self.vol_act[ptop] = time.time()
|
||||
try:
|
||||
job = self.registry[ptop][wark]
|
||||
|
@ -3363,7 +3374,7 @@ class Up2k(object):
|
|||
src = djoin(pdir, job["tnam"])
|
||||
dst = djoin(pdir, job["name"])
|
||||
except Exception as ex:
|
||||
return "confirm_chunk, wark(%r)" % (ex,) # type: ignore
|
||||
return -2, "confirm_chunk, wark(%r)" % (ex,) # type: ignore
|
||||
|
||||
for chash in chashes:
|
||||
job["busy"].pop(chash, None)
|
||||
|
@ -3372,7 +3383,7 @@ class Up2k(object):
|
|||
for chash in chashes:
|
||||
job["need"].remove(chash)
|
||||
except Exception as ex:
|
||||
return "confirm_chunk, chash(%s) %r" % (chash, ex) # type: ignore
|
||||
return -2, "confirm_chunk, chash(%s) %r" % (chash, ex) # type: ignore
|
||||
|
||||
ret = len(job["need"])
|
||||
if ret > 0:
|
||||
|
|
Loading…
Reference in a new issue