From 08d8c8216776e2013768673f60d26d52831fb05b Mon Sep 17 00:00:00 2001 From: ed Date: Thu, 18 Apr 2024 00:10:54 +0000 Subject: [PATCH] PoC: ongoing uploads can be downloaded in lockstep --- copyparty/httpcli.py | 117 ++++++++++++++++++++++++++++++++++++++++++- copyparty/up2k.py | 17 +++++++ 2 files changed, 132 insertions(+), 2 deletions(-) diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 7dc88748..30d33203 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -36,6 +36,7 @@ from .bos import bos from .star import StreamTar from .sutil import StreamArc, gfilter from .szip import StreamZip +from .up2k import up2k_chunksize from .util import unquote # type: ignore from .util import ( APPLESAN_RE, @@ -2929,17 +2930,41 @@ class HttpCli(object): return txt - def tx_file(self, req_path: str) -> bool: + def tx_file(self, req_path: str, ptop: Optional[str] = None) -> bool: status = 200 logmsg = "{:4} {} ".format("", self.req) logtail = "" + if ptop is not None: + dp, fn = os.path.split(req_path) + tnam = fn + ".PARTIAL" + if self.args.dotpart: + tnam = "." + tnam + ap_data = os.path.join(dp, tnam) + try: + st_data = bos.stat(ap_data) + if not st_data.st_size: + raise Exception("partial is empty") + x = self.conn.hsrv.broker.ask("up2k.find_job_by_ap", ptop, req_path) + job = json.loads(x.get()) + if not job: + raise Exception("not found in registry") + except Exception as ex: + self.log("will not pipe [%s]; %s" % (ap_data, ex), 6) + ptop = None + # # if request is for foo.js, check if we have foo.js.gz file_ts = 0.0 editions: dict[str, tuple[str, int]] = {} for ext in ("", ".gz"): + if ptop is not None: + sz = job["size"] + file_ts = job["lmod"] + editions["plain"] = (ap_data, sz) + break + try: fs_path = req_path + ext st = bos.stat(fs_path) @@ -3096,6 +3121,11 @@ class HttpCli(object): self.send_headers(length=upper - lower, status=status, mime=mime) return True + if ptop is not None: + return self.tx_pipe( + ptop, req_path, ap_data, job, lower, upper, status, mime, logmsg + ) + ret = True with open_func(*open_args) as f: self.send_headers(length=upper - lower, status=status, mime=mime) @@ -3115,6 +3145,89 @@ class HttpCli(object): return ret + def tx_pipe( + self, + ptop: str, + req_path: str, + ap_data: str, + job: dict[str, Any], + lower: int, + upper: int, + status: int, + mime: str, + logmsg: str, + ) -> bool: + self.log("pipe: streaming data from an unfinished upload", 6) + self.send_headers(length=upper - lower, status=status, mime=mime) + file_size = job["size"] + chunk_size = up2k_chunksize(file_size) + num_need = -1 + data_end = 0 + remains = upper - lower + slp = self.args.s_wr_slp + bufsz = self.args.s_wr_sz + broken = False + + while lower < upper: + x = self.conn.hsrv.broker.ask("up2k.find_job_by_ap", ptop, req_path) + job = json.loads(x.get()) + if not job: + t = "pipe: upload has finished; yeeting remainder" + data_end = file_size + break + + if num_need != len(job["need"]): + num_need = len(job["need"]) + data_end = 0 + for cid in job["hash"]: + if cid in job["need"]: + break + data_end += chunk_size + t = "pipe: can stream %d MiB; requested range is %d to %d" + self.log( + t % (data_end // 1048576, lower // 1048576, upper // 1048576), 6 + ) + + if lower >= data_end: + t = "pipe: downloader is at %d MiB, but only %d is uploaded so far; waiting" + self.log(t % (lower // 1048576, data_end // 1048576), 6) + time.sleep(5) + continue + + with open(ap_data, "rb") as f: + f.seek(lower) + page = f.read(min(1048576, data_end - lower, upper - lower)) + if not page: + t = "pipe: BUG: read returned no data for min(1M, %d - %d, %d - %d)" + self.log(t % (data_end, lower, upper, lower), 1) + return False + + pofs = 0 + while pofs < len(page): + if slp: + time.sleep(slp) + + try: + buf = page[pofs : pofs + bufsz] + self.s.sendall(buf) + zi = len(buf) + remains -= zi + lower += zi + pofs += zi + except: + broken = True + break + + if lower < upper and not broken: + with open(req_path, "rb") as f: + remains = sendfile_py(self.log, lower, upper, f, self.s, bufsz, slp) + + spd = self._spd((upper - lower) - remains) + if self.do_log: + self.log("{}, {}".format(logmsg, spd)) + + return not broken + def tx_zip( self, fmt: str, @@ -4031,7 +4144,7 @@ class HttpCli(object): ): return self.tx_md(vn, abspath) - return self.tx_file(abspath) + return self.tx_file(abspath, None if st.st_size else vn.realpath) elif is_dir and not self.can_read: if self._use_dirkey(abspath): diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 3222618c..015e9e7b 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -293,6 +293,23 @@ class Up2k(object): } return json.dumps(ret, indent=4) + def find_job_by_ap(self, ptop: str, ap: str) -> str: + try: + if ANYWIN: + ap = ap.replace("\\", "/") + + vp = ap[len(ptop) :].strip("/") + dn, fn = vsplit(vp) + with self.reg_mutex: + tab2 = self.registry[ptop] + for job in tab2.values(): + if job["prel"] == dn and job["name"] == fn: + return json.dumps(job, indent=0) + except: + pass + + return "{}" + def get_unfinished_by_user(self, uname, ip) -> str: if PY2 or not self.reg_mutex.acquire(timeout=2): return '[{"timeout":1}]'