PoC: ongoing uploads can be downloaded in lockstep

This commit is contained in:
ed 2024-04-18 00:10:54 +00:00
parent 5239e7ac0c
commit 08d8c82167
2 changed files with 132 additions and 2 deletions

View file

@ -36,6 +36,7 @@ from .bos import bos
from .star import StreamTar from .star import StreamTar
from .sutil import StreamArc, gfilter from .sutil import StreamArc, gfilter
from .szip import StreamZip from .szip import StreamZip
from .up2k import up2k_chunksize
from .util import unquote # type: ignore from .util import unquote # type: ignore
from .util import ( from .util import (
APPLESAN_RE, APPLESAN_RE,
@ -2929,17 +2930,41 @@ class HttpCli(object):
return txt return txt
def tx_file(self, req_path: str) -> bool: def tx_file(self, req_path: str, ptop: Optional[str] = None) -> bool:
status = 200 status = 200
logmsg = "{:4} {} ".format("", self.req) logmsg = "{:4} {} ".format("", self.req)
logtail = "" logtail = ""
if ptop is not None:
dp, fn = os.path.split(req_path)
tnam = fn + ".PARTIAL"
if self.args.dotpart:
tnam = "." + tnam
ap_data = os.path.join(dp, tnam)
try:
st_data = bos.stat(ap_data)
if not st_data.st_size:
raise Exception("partial is empty")
x = self.conn.hsrv.broker.ask("up2k.find_job_by_ap", ptop, req_path)
job = json.loads(x.get())
if not job:
raise Exception("not found in registry")
except Exception as ex:
self.log("will not pipe [%s]; %s" % (ap_data, ex), 6)
ptop = None
# #
# if request is for foo.js, check if we have foo.js.gz # if request is for foo.js, check if we have foo.js.gz
file_ts = 0.0 file_ts = 0.0
editions: dict[str, tuple[str, int]] = {} editions: dict[str, tuple[str, int]] = {}
for ext in ("", ".gz"): for ext in ("", ".gz"):
if ptop is not None:
sz = job["size"]
file_ts = job["lmod"]
editions["plain"] = (ap_data, sz)
break
try: try:
fs_path = req_path + ext fs_path = req_path + ext
st = bos.stat(fs_path) st = bos.stat(fs_path)
@ -3096,6 +3121,11 @@ class HttpCli(object):
self.send_headers(length=upper - lower, status=status, mime=mime) self.send_headers(length=upper - lower, status=status, mime=mime)
return True return True
if ptop is not None:
return self.tx_pipe(
ptop, req_path, ap_data, job, lower, upper, status, mime, logmsg
)
ret = True ret = True
with open_func(*open_args) as f: with open_func(*open_args) as f:
self.send_headers(length=upper - lower, status=status, mime=mime) self.send_headers(length=upper - lower, status=status, mime=mime)
@ -3115,6 +3145,89 @@ class HttpCli(object):
return ret return ret
def tx_pipe(
self,
ptop: str,
req_path: str,
ap_data: str,
job: dict[str, Any],
lower: int,
upper: int,
status: int,
mime: str,
logmsg: str,
) -> bool:
self.log("pipe: streaming data from an unfinished upload", 6)
self.send_headers(length=upper - lower, status=status, mime=mime)
file_size = job["size"]
chunk_size = up2k_chunksize(file_size)
num_need = -1
data_end = 0
remains = upper - lower
slp = self.args.s_wr_slp
bufsz = self.args.s_wr_sz
broken = False
while lower < upper:
x = self.conn.hsrv.broker.ask("up2k.find_job_by_ap", ptop, req_path)
job = json.loads(x.get())
if not job:
t = "pipe: upload has finished; yeeting remainder"
data_end = file_size
break
if num_need != len(job["need"]):
num_need = len(job["need"])
data_end = 0
for cid in job["hash"]:
if cid in job["need"]:
break
data_end += chunk_size
t = "pipe: can stream %d MiB; requested range is %d to %d"
self.log(
t % (data_end // 1048576, lower // 1048576, upper // 1048576), 6
)
if lower >= data_end:
t = "pipe: downloader is at %d MiB, but only %d is uploaded so far; waiting"
self.log(t % (lower // 1048576, data_end // 1048576), 6)
time.sleep(5)
continue
with open(ap_data, "rb") as f:
f.seek(lower)
page = f.read(min(1048576, data_end - lower, upper - lower))
if not page:
t = "pipe: BUG: read returned no data for min(1M, %d - %d, %d - %d)"
self.log(t % (data_end, lower, upper, lower), 1)
return False
pofs = 0
while pofs < len(page):
if slp:
time.sleep(slp)
try:
buf = page[pofs : pofs + bufsz]
self.s.sendall(buf)
zi = len(buf)
remains -= zi
lower += zi
pofs += zi
except:
broken = True
break
if lower < upper and not broken:
with open(req_path, "rb") as f:
remains = sendfile_py(self.log, lower, upper, f, self.s, bufsz, slp)
spd = self._spd((upper - lower) - remains)
if self.do_log:
self.log("{}, {}".format(logmsg, spd))
return not broken
def tx_zip( def tx_zip(
self, self,
fmt: str, fmt: str,
@ -4031,7 +4144,7 @@ class HttpCli(object):
): ):
return self.tx_md(vn, abspath) return self.tx_md(vn, abspath)
return self.tx_file(abspath) return self.tx_file(abspath, None if st.st_size else vn.realpath)
elif is_dir and not self.can_read: elif is_dir and not self.can_read:
if self._use_dirkey(abspath): if self._use_dirkey(abspath):

View file

@ -293,6 +293,23 @@ class Up2k(object):
} }
return json.dumps(ret, indent=4) return json.dumps(ret, indent=4)
def find_job_by_ap(self, ptop: str, ap: str) -> str:
try:
if ANYWIN:
ap = ap.replace("\\", "/")
vp = ap[len(ptop) :].strip("/")
dn, fn = vsplit(vp)
with self.reg_mutex:
tab2 = self.registry[ptop]
for job in tab2.values():
if job["prel"] == dn and job["name"] == fn:
return json.dumps(job, indent=0)
except:
pass
return "{}"
def get_unfinished_by_user(self, uname, ip) -> str: def get_unfinished_by_user(self, uname, ip) -> str:
if PY2 or not self.reg_mutex.acquire(timeout=2): if PY2 or not self.reg_mutex.acquire(timeout=2):
return '[{"timeout":1}]' return '[{"timeout":1}]'