From 8244d3b4fc34c3169279bbf9fc8b019f39ffb4ae Mon Sep 17 00:00:00 2001 From: ed Date: Thu, 18 Apr 2024 23:10:37 +0000 Subject: [PATCH] pipe: add tapering to keep tcp alive --- copyparty/httpcli.py | 73 ++++++++++++++++++++++++++++++++------------ 1 file changed, 54 insertions(+), 19 deletions(-) diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 30d33203..937c43ee 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -3157,18 +3157,21 @@ class HttpCli(object): mime: str, logmsg: str, ) -> bool: - self.log("pipe: streaming data from an unfinished upload", 6) + M = 1048576 self.send_headers(length=upper - lower, status=status, mime=mime) + wr_slp = self.args.s_wr_slp + wr_sz = self.args.s_wr_sz file_size = job["size"] chunk_size = up2k_chunksize(file_size) num_need = -1 data_end = 0 remains = upper - lower - slp = self.args.s_wr_slp - bufsz = self.args.s_wr_sz broken = False + spins = 0 + tier = 0 + tiers = ["uncapped", "reduced speed", "one byte per sec"] - while lower < upper: + while lower < upper and not broken: x = self.conn.hsrv.broker.ask("up2k.find_job_by_ap", ptop, req_path) job = json.loads(x.get()) if not job: @@ -3183,25 +3186,57 @@ class HttpCli(object): if cid in job["need"]: break data_end += chunk_size - t = "pipe: can stream %d MiB; requested range is %d to %d" - self.log( - t % (data_end // 1048576, lower // 1048576, upper // 1048576), 6 - ) + t = "pipe: can stream %.2f MiB; requested range is %.2f to %.2f" + self.log(t % (data_end / M, lower / M, upper / M), 6) if lower >= data_end: - t = "pipe: downloader is at %d MiB, but only %d is uploaded so far; waiting" - self.log(t % (lower // 1048576, data_end // 1048576), 6) - time.sleep(5) + if data_end: + t = "pipe: uploader is too slow; aborting download at %.2f MiB" + self.log(t % (data_end / M)) + raise Pebkac(416, "uploader is too slow") + + raise Pebkac(416, "no data available yet; please retry in a bit") + + slack = data_end - lower + if slack >= 8 * M: + ntier = 0 + winsz = M + bufsz = wr_sz + slp = wr_slp + else: + winsz = max(40, int(M * (slack / (12 * M)))) + base_rate = M if not wr_slp else wr_sz / wr_slp + if winsz > base_rate: + ntier = 0 + bufsz = wr_sz + slp = wr_slp + elif winsz > 300: + ntier = 1 + bufsz = winsz // 5 + slp = 0.2 + else: + ntier = 2 + bufsz = winsz = slp = 1 + + if tier != ntier: + tier = ntier + self.log("moved to tier %d (%s)" % (tier, tiers[tier])) + + try: + with open(ap_data, "rb") as f: + f.seek(lower) + page = f.read(min(winsz, data_end - lower, upper - lower)) + if not page: + raise Exception("got 0 bytes (EOF?)") + except Exception as ex: + self.log("pipe: read failed at %.2f MiB: %s" % (lower / M, ex), 3) + spins += 1 + if spins > 3: + raise Pebkac(500, "file became unreadable") + time.sleep(2) continue - with open(ap_data, "rb") as f: - f.seek(lower) - page = f.read(min(1048576, data_end - lower, upper - lower)) - if not page: - t = "pipe: BUG: read returned no data for min(1M, %d - %d, %d - %d)" - self.log(t % (data_end, lower, upper, lower), 1) - return False - + spins = 0 pofs = 0 while pofs < len(page): if slp: