pipe: add tapering to keep tcp alive

This commit is contained in:
ed 2024-04-18 23:10:37 +00:00
parent 74266af6d1
commit 8244d3b4fc

View file

@ -3157,18 +3157,21 @@ class HttpCli(object):
mime: str,
logmsg: str,
) -> bool:
self.log("pipe: streaming data from an unfinished upload", 6)
M = 1048576
self.send_headers(length=upper - lower, status=status, mime=mime)
wr_slp = self.args.s_wr_slp
wr_sz = self.args.s_wr_sz
file_size = job["size"]
chunk_size = up2k_chunksize(file_size)
num_need = -1
data_end = 0
remains = upper - lower
slp = self.args.s_wr_slp
bufsz = self.args.s_wr_sz
broken = False
spins = 0
tier = 0
tiers = ["uncapped", "reduced speed", "one byte per sec"]
while lower < upper:
while lower < upper and not broken:
x = self.conn.hsrv.broker.ask("up2k.find_job_by_ap", ptop, req_path)
job = json.loads(x.get())
if not job:
@ -3183,25 +3186,57 @@ class HttpCli(object):
if cid in job["need"]:
break
data_end += chunk_size
t = "pipe: can stream %d MiB; requested range is %d to %d"
self.log(
t % (data_end // 1048576, lower // 1048576, upper // 1048576), 6
)
t = "pipe: can stream %.2f MiB; requested range is %.2f to %.2f"
self.log(t % (data_end / M, lower / M, upper / M), 6)
if lower >= data_end:
t = "pipe: downloader is at %d MiB, but only %d is uploaded so far; waiting"
self.log(t % (lower // 1048576, data_end // 1048576), 6)
time.sleep(5)
continue
if data_end:
t = "pipe: uploader is too slow; aborting download at %.2f MiB"
self.log(t % (data_end / M))
raise Pebkac(416, "uploader is too slow")
raise Pebkac(416, "no data available yet; please retry in a bit")
slack = data_end - lower
if slack >= 8 * M:
ntier = 0
winsz = M
bufsz = wr_sz
slp = wr_slp
else:
winsz = max(40, int(M * (slack / (12 * M))))
base_rate = M if not wr_slp else wr_sz / wr_slp
if winsz > base_rate:
ntier = 0
bufsz = wr_sz
slp = wr_slp
elif winsz > 300:
ntier = 1
bufsz = winsz // 5
slp = 0.2
else:
ntier = 2
bufsz = winsz = slp = 1
if tier != ntier:
tier = ntier
self.log("moved to tier %d (%s)" % (tier, tiers[tier]))
try:
with open(ap_data, "rb") as f:
f.seek(lower)
page = f.read(min(1048576, data_end - lower, upper - lower))
page = f.read(min(winsz, data_end - lower, upper - lower))
if not page:
t = "pipe: BUG: read returned no data for min(1M, %d - %d, %d - %d)"
self.log(t % (data_end, lower, upper, lower), 1)
return False
raise Exception("got 0 bytes (EOF?)")
except Exception as ex:
self.log("pipe: read failed at %.2f MiB: %s" % (lower / M, ex), 3)
spins += 1
if spins > 3:
raise Pebkac(500, "file became unreadable")
time.sleep(2)
continue
spins = 0
pofs = 0
while pofs < len(page):
if slp: