up2k: shrink request headers

v1.13.5 made some proxies angry with its massive chunklists

when stitching chunks, only list the first chunk hash in full,
and include a truncated hash for the consecutive chunks

should be enough for logfiles to make sense
and to smoketest that clients are behaving
This commit is contained in:
ed 2024-08-08 18:24:18 +00:00
parent 373194c38a
commit 0da719f4c2
4 changed files with 61 additions and 13 deletions

View file

@ -1,8 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from __future__ import print_function, unicode_literals from __future__ import print_function, unicode_literals
S_VERSION = "1.21" S_VERSION = "1.22"
S_BUILD_DT = "2024-07-26" S_BUILD_DT = "2024-08-08"
""" """
u2c.py: upload to copyparty u2c.py: upload to copyparty
@ -660,8 +660,15 @@ def upload(fsl, pw, stats):
# type: (FileSlice, str, str) -> None # type: (FileSlice, str, str) -> None
"""upload a range of file data, defined by one or more `cid` (chunk-hash)""" """upload a range of file data, defined by one or more `cid` (chunk-hash)"""
ctxt = fsl.cids[0]
if len(fsl.cids) > 1:
n = 192 // len(fsl.cids)
n = 9 if n > 9 else 2 if n < 2 else n
zsl = [zs[:n] for zs in fsl.cids[1:]]
ctxt += ",%d,%s" % (n, "".join(zsl))
headers = { headers = {
"X-Up2k-Hash": ",".join(fsl.cids), "X-Up2k-Hash": ctxt,
"X-Up2k-Wark": fsl.file.wark, "X-Up2k-Wark": fsl.file.wark,
"Content-Type": "application/octet-stream", "Content-Type": "application/octet-stream",
} }

View file

@ -2214,13 +2214,21 @@ class HttpCli(object):
raise Pebkac(400, "need hash and wark headers for binary POST") raise Pebkac(400, "need hash and wark headers for binary POST")
chashes = [x.strip() for x in chashes] chashes = [x.strip() for x in chashes]
if len(chashes) == 3 and len(chashes[1]) == 1:
# the first hash, then length of consecutive hashes,
# then a list of stitched hashes as one long string
clen = int(chashes[1])
siblings = chashes[2]
chashes = [chashes[0]]
for n in range(0, len(siblings), clen):
chashes.append(siblings[n : n + clen])
vfs, _ = self.asrv.vfs.get(self.vpath, self.uname, False, True) vfs, _ = self.asrv.vfs.get(self.vpath, self.uname, False, True)
ptop = (vfs.dbv or vfs).realpath ptop = (vfs.dbv or vfs).realpath
x = self.conn.hsrv.broker.ask("up2k.handle_chunks", ptop, wark, chashes) x = self.conn.hsrv.broker.ask("up2k.handle_chunks", ptop, wark, chashes)
response = x.get() response = x.get()
chunksize, cstarts, path, lastmod, sprs = response chashes, chunksize, cstarts, path, lastmod, sprs = response
maxsize = chunksize * len(chashes) maxsize = chunksize * len(chashes)
cstart0 = cstarts[0] cstart0 = cstarts[0]

View file

@ -2851,11 +2851,11 @@ class Up2k(object):
# one chunk may occur multiple times in a file; # one chunk may occur multiple times in a file;
# filter to unique values for the list of missing chunks # filter to unique values for the list of missing chunks
# (preserve order to reduce disk thrashing) # (preserve order to reduce disk thrashing)
lut = {} lut = set()
for k in cj["hash"]: for k in cj["hash"]:
if k not in lut: if k not in lut:
job["need"].append(k) job["need"].append(k)
lut[k] = 1 lut.add(k)
try: try:
self._new_upload(job) self._new_upload(job)
@ -3015,7 +3015,7 @@ class Up2k(object):
def handle_chunks( def handle_chunks(
self, ptop: str, wark: str, chashes: list[str] self, ptop: str, wark: str, chashes: list[str]
) -> tuple[int, list[list[int]], str, float, bool]: ) -> tuple[list[str], int, list[list[int]], str, float, bool]:
with self.mutex, self.reg_mutex: with self.mutex, self.reg_mutex:
self.db_act = self.vol_act[ptop] = time.time() self.db_act = self.vol_act[ptop] = time.time()
job = self.registry[ptop].get(wark) job = self.registry[ptop].get(wark)
@ -3024,12 +3024,37 @@ class Up2k(object):
self.log("unknown wark [{}], known: {}".format(wark, known)) self.log("unknown wark [{}], known: {}".format(wark, known))
raise Pebkac(400, "unknown wark" + SSEELOG) raise Pebkac(400, "unknown wark" + SSEELOG)
if len(chashes) > 1 and len(chashes[1]) < 44:
# first hash is full-length; expand remaining ones
uniq = []
lut = set()
for chash in job["hash"]:
if chash not in lut:
uniq.append(chash)
lut.add(chash)
try:
nchunk = uniq.index(chashes[0])
except:
raise Pebkac(400, "unknown chunk0 [%s]" % (chashes[0]))
expanded = [chashes[0]]
for prefix in chashes[1:]:
nchunk += 1
chash = uniq[nchunk]
if not chash.startswith(prefix):
t = "next sibling chunk does not start with expected prefix [%s]: [%s]"
raise Pebkac(400, t % (prefix, chash))
expanded.append(chash)
chashes = expanded
for chash in chashes: for chash in chashes:
if chash not in job["need"]: if chash not in job["need"]:
msg = "chash = {} , need:\n".format(chash) msg = "chash = {} , need:\n".format(chash)
msg += "\n".join(job["need"]) msg += "\n".join(job["need"])
self.log(msg) self.log(msg)
raise Pebkac(400, "already got that (%s) but thanks??" % (chash,)) t = "already got that (%s) but thanks??"
if chash not in job["hash"]:
t = "unknown chunk wtf: %s"
raise Pebkac(400, t % (chash,))
if chash in job["busy"]: if chash in job["busy"]:
nh = len(job["hash"]) nh = len(job["hash"])
@ -3070,7 +3095,7 @@ class Up2k(object):
job["poke"] = time.time() job["poke"] = time.time()
return chunksize, coffsets, path, job["lmod"], job["sprs"] return chashes, chunksize, coffsets, path, job["lmod"], job["sprs"]
def release_chunks(self, ptop: str, wark: str, chashes: list[str]) -> bool: def release_chunks(self, ptop: str, wark: str, chashes: list[str]) -> bool:
with self.reg_mutex: with self.reg_mutex:

View file

@ -2662,12 +2662,20 @@ function up2k_init(subtle) {
console.log('chunkpit onerror,', ++tries, t.name, t); console.log('chunkpit onerror,', ++tries, t.name, t);
orz2(xhr); orz2(xhr);
}; };
var chashes = [];
for (var a = pcar; a <= pcdr; a++) var chashes = [],
chashes.push(t.hash[a]); ctxt = t.hash[pcar],
plen = Math.floor(192 / nparts.length);
plen = plen > 9 ? 9 : plen < 2 ? 2 : plen;
for (var a = pcar + 1; a <= pcdr; a++)
chashes.push(t.hash[a].slice(0, plen));
if (chashes.length)
ctxt += ',' + plen + ',' + chashes.join('');
xhr.open('POST', t.purl, true); xhr.open('POST', t.purl, true);
xhr.setRequestHeader("X-Up2k-Hash", chashes.join(",")); xhr.setRequestHeader("X-Up2k-Hash", ctxt);
xhr.setRequestHeader("X-Up2k-Wark", t.wark); xhr.setRequestHeader("X-Up2k-Wark", t.wark);
xhr.setRequestHeader("X-Up2k-Stat", "{0}/{1}/{2}/{3} {4}/{5} {6}".format( xhr.setRequestHeader("X-Up2k-Stat", "{0}/{1}/{2}/{3} {4}/{5} {6}".format(
pvis.ctr.ok, pvis.ctr.ng, pvis.ctr.bz, pvis.ctr.q, btot, btot - bfin, pvis.ctr.ok, pvis.ctr.ng, pvis.ctr.bz, pvis.ctr.q, btot, btot - bfin,