diff --git a/README.md b/README.md index b7f263b2..493ca7d7 100644 --- a/README.md +++ b/README.md @@ -219,7 +219,7 @@ also see [comparison to similar software](./docs/versus.md) * upload * ☑ basic: plain multipart, ie6 support * ☑ [up2k](#uploading): js, resumable, multithreaded - * **no filesize limit!** ...unless you use Cloudflare, then it's 383.9 GiB + * **no filesize limit!** even on Cloudflare * ☑ stash: simple PUT filedropper * ☑ filename randomizer * ☑ write-only folders @@ -654,7 +654,7 @@ up2k has several advantages: * uploads resume if you reboot your browser or pc, just upload the same files again * server detects any corruption; the client reuploads affected chunks * the client doesn't upload anything that already exists on the server - * no filesize limit unless imposed by a proxy, for example Cloudflare, which blocks uploads over 383.9 GiB + * no filesize limit, even when a proxy limits the request size (for example Cloudflare) * much higher speeds than ftp/scp/tarpipe on some internet connections (mainly american ones) thanks to parallel connections * the last-modified timestamp of the file is preserved @@ -690,6 +690,8 @@ note that since up2k has to read each file twice, `[🎈] bup` can *theoreticall if you are resuming a massive upload and want to skip hashing the files which already finished, you can enable `turbo` in the `[⚙️] config` tab, but please read the tooltip on that button +if the server is behind a proxy which imposes a request-size limit, you can configure up2k to sneak below the limit with server-option `--u2sz` (the default is 96 MiB to support Cloudflare) + ### file-search diff --git a/bin/u2c.py b/bin/u2c.py index 2de5bd9b..aadee4f6 100755 --- a/bin/u2c.py +++ b/bin/u2c.py @@ -1,8 +1,8 @@ #!/usr/bin/env python3 from __future__ import print_function, unicode_literals -S_VERSION = "2.3" -S_BUILD_DT = "2024-10-15" +S_VERSION = "2.4" +S_BUILD_DT = "2024-10-16" """ u2c.py: upload to copyparty @@ -270,7 +270,7 @@ class FileSlice(object): raise Exception(9) tlen += clen - self.len = tlen + self.len = self.tlen = tlen self.cdr = self.car + self.len self.ofs = 0 # type: int @@ -278,6 +278,28 @@ class FileSlice(object): self.seek = self._seek0 self.read = self._read0 + def subchunk(self, maxsz, nth): + if self.tlen <= maxsz: + return -1 + + if not nth: + self.car0 = self.car + self.cdr0 = self.cdr + + self.car = self.car0 + maxsz * nth + if self.car >= self.cdr0: + return -2 + + self.cdr = self.car + min(self.cdr0 - self.car, maxsz) + self.len = self.cdr - self.car + self.seek(0) + return nth + + def unsub(self): + self.car = self.car0 + self.cdr = self.cdr0 + self.len = self.tlen + def _open(self): self.seek = self._seek self.read = self._read @@ -805,8 +827,8 @@ def handshake(ar, file, search): return r["hash"], r["sprs"] -def upload(fsl, stats): - # type: (FileSlice, str) -> None +def upload(fsl, stats, maxsz): + # type: (FileSlice, str, int) -> None """upload a range of file data, defined by one or more `cid` (chunk-hash)""" ctxt = fsl.cids[0] @@ -824,21 +846,33 @@ def upload(fsl, stats): if stats: headers["X-Up2k-Stat"] = stats + nsub = 0 try: - sc, txt = web.req("POST", fsl.file.url, headers, fsl, MO) + while nsub != -1: + nsub = fsl.subchunk(maxsz, nsub) + if nsub == -2: + return + if nsub >= 0: + headers["X-Up2k-Subc"] = str(maxsz * nsub) + headers.pop(CLEN, None) + nsub += 1 - if sc == 400: - if ( - "already being written" in txt - or "already got that" in txt - or "only sibling chunks" in txt - ): - fsl.file.nojoin = 1 + sc, txt = web.req("POST", fsl.file.url, headers, fsl, MO) - if sc >= 400: - raise Exception("http %s: %s" % (sc, txt)) + if sc == 400: + if ( + "already being written" in txt + or "already got that" in txt + or "only sibling chunks" in txt + ): + fsl.file.nojoin = 1 + + if sc >= 400: + raise Exception("http %s: %s" % (sc, txt)) finally: fsl.f.close() + if nsub != -1: + fsl.unsub() class Ctl(object): @@ -970,7 +1004,7 @@ class Ctl(object): print(" %d up %s" % (ncs - nc, cid)) stats = "%d/0/0/%d" % (nf, self.nfiles - nf) fslice = FileSlice(file, [cid]) - upload(fslice, stats) + upload(fslice, stats, self.ar.szm) print(" ok!") if file.recheck: @@ -1318,7 +1352,7 @@ class Ctl(object): self._check_if_done() continue - njoin = (self.ar.sz * 1024 * 1024) // chunksz + njoin = self.ar.sz // chunksz cs = hs[:] while cs: fsl = FileSlice(file, cs[:1]) @@ -1370,7 +1404,7 @@ class Ctl(object): ) try: - upload(fsl, stats) + upload(fsl, stats, self.ar.szm) except Exception as ex: t = "upload failed, retrying: %s #%s+%d (%s)\n" eprint(t % (file.name, cids[0][:8], len(cids) - 1, ex)) @@ -1459,6 +1493,7 @@ source file/folder selection uses rsync syntax, meaning that: ap.add_argument("-j", type=int, metavar="CONNS", default=2, help="parallel connections") ap.add_argument("-J", type=int, metavar="CORES", default=hcores, help="num cpu-cores to use for hashing; set 0 or 1 for single-core hashing") ap.add_argument("--sz", type=int, metavar="MiB", default=64, help="try to make each POST this big") + ap.add_argument("--szm", type=int, metavar="MiB", default=96, help="max size of each POST (default is cloudflare max)") ap.add_argument("-nh", action="store_true", help="disable hashing while uploading") ap.add_argument("-ns", action="store_true", help="no status panel (for slow consoles and macos)") ap.add_argument("--cd", type=float, metavar="SEC", default=5, help="delay before reattempting a failed handshake/upload") @@ -1486,6 +1521,9 @@ source file/folder selection uses rsync syntax, meaning that: if ar.dr: ar.ow = True + ar.sz *= 1024 * 1024 + ar.szm *= 1024 * 1024 + ar.x = "|".join(ar.x or []) setattr(ar, "wlist", ar.url == "-") diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 76df3e2c..cf54316d 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -1017,7 +1017,7 @@ def add_upload(ap): ap2.add_argument("--sparse", metavar="MiB", type=int, default=4, help="windows-only: minimum size of incoming uploads through up2k before they are made into sparse files") ap2.add_argument("--turbo", metavar="LVL", type=int, default=0, help="configure turbo-mode in up2k client; [\033[32m-1\033[0m] = forbidden/always-off, [\033[32m0\033[0m] = default-off and warn if enabled, [\033[32m1\033[0m] = default-off, [\033[32m2\033[0m] = on, [\033[32m3\033[0m] = on and disable datecheck") ap2.add_argument("--u2j", metavar="JOBS", type=int, default=2, help="web-client: number of file chunks to upload in parallel; 1 or 2 is good for low-latency (same-country) connections, 4-8 for android clients, 16 for cross-atlantic (max=64)") - ap2.add_argument("--u2sz", metavar="N,N,N", type=u, default="1,64,96", help="web-client: default upload chunksize (MiB); sets \033[33mmin,default,max\033[0m in the settings gui. Each HTTP POST will aim for this size. Cloudflare max is 96. Big values are good for cross-atlantic but may increase HDD fragmentation on some FS. Disable this optimization with [\033[32m1,1,1\033[0m]") + ap2.add_argument("--u2sz", metavar="N,N,N", type=u, default="1,64,96", help="web-client: default upload chunksize (MiB); sets \033[33mmin,default,max\033[0m in the settings gui. Each HTTP POST will aim for \033[33mdefault\033[0m, and never exceed \033[33mmax\033[0m. Cloudflare max is 96. Big values are good for cross-atlantic but may increase HDD fragmentation on some FS. Disable this optimization with [\033[32m1,1,1\033[0m]") ap2.add_argument("--u2sort", metavar="TXT", type=u, default="s", help="upload order; [\033[32ms\033[0m]=smallest-first, [\033[32mn\033[0m]=alphabetical, [\033[32mfs\033[0m]=force-s, [\033[32mfn\033[0m]=force-n -- alphabetical is a bit slower on fiber/LAN but makes it easier to eyeball if everything went fine") ap2.add_argument("--write-uplog", action="store_true", help="write POST reports to textfiles in working-directory") diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 1a5f9ad6..68d3bbd3 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -1884,7 +1884,7 @@ class HttpCli(object): f, fn = ren_open(fn, *open_a, **params) try: path = os.path.join(fdir, fn) - post_sz, sha_hex, sha_b64 = hashcopy(reader, f, self.args.s_wr_slp) + post_sz, sha_hex, sha_b64 = hashcopy(reader, f, None, 0, self.args.s_wr_slp) finally: f.close() @@ -2348,7 +2348,7 @@ class HttpCli(object): broker = self.conn.hsrv.broker x = broker.ask("up2k.handle_chunks", ptop, wark, chashes) response = x.get() - chashes, chunksize, cstarts, path, lastmod, sprs = response + chashes, chunksize, cstarts, path, lastmod, fsize, sprs = response maxsize = chunksize * len(chashes) cstart0 = cstarts[0] locked = chashes # remaining chunks to be received in this request @@ -2356,6 +2356,50 @@ class HttpCli(object): num_left = -1 # num chunks left according to most recent up2k release treport = time.time() # ratelimit up2k reporting to reduce overhead + if "x-up2k-subc" in self.headers: + sc_ofs = int(self.headers["x-up2k-subc"]) + chash = chashes[0] + + u2sc = self.conn.hsrv.u2sc + try: + sc_pofs, hasher = u2sc[chash] + if not sc_ofs: + t = "client restarted the chunk; forgetting subchunk offset %d" + self.log(t % (sc_pofs,)) + raise Exception() + except: + sc_pofs = 0 + hasher = hashlib.sha512() + + et = "subchunk protocol error; resetting chunk " + if sc_pofs != sc_ofs: + u2sc.pop(chash, None) + t = "%s[%s]: the expected resume-point was %d, not %d" + raise Pebkac(400, t % (et, chash, sc_pofs, sc_ofs)) + if len(cstarts) > 1: + u2sc.pop(chash, None) + t = "%s[%s]: only a single subchunk can be uploaded in one request; you are sending %d chunks" + raise Pebkac(400, t % (et, chash, len(cstarts))) + csize = min(chunksize, fsize - cstart0[0]) + cstart0[0] += sc_ofs # also sets cstarts[0][0] + sc_next_ofs = sc_ofs + postsize + if sc_next_ofs > csize: + u2sc.pop(chash, None) + t = "%s[%s]: subchunk offset (%d) plus postsize (%d) exceeds chunksize (%d)" + raise Pebkac(400, t % (et, chash, sc_ofs, postsize, csize)) + else: + final_subchunk = sc_next_ofs == csize + t = "subchunk %s %d:%d/%d %s" + zs = "END" if final_subchunk else "" + self.log(t % (chash[:15], sc_ofs, sc_next_ofs, csize, zs), 6) + if final_subchunk: + u2sc.pop(chash, None) + else: + u2sc[chash] = (sc_next_ofs, hasher) + else: + hasher = None + final_subchunk = True + try: if self.args.nw: path = os.devnull @@ -2386,9 +2430,11 @@ class HttpCli(object): reader = read_socket( self.sr, self.args.s_rd_sz, min(remains, chunksize) ) - post_sz, _, sha_b64 = hashcopy(reader, f, self.args.s_wr_slp) + post_sz, _, sha_b64 = hashcopy( + reader, f, hasher, 0, self.args.s_wr_slp + ) - if sha_b64 != chash: + if sha_b64 != chash and final_subchunk: try: self.bakflip( f, path, cstart[0], post_sz, chash, sha_b64, vfs.flags @@ -2420,7 +2466,8 @@ class HttpCli(object): # be quick to keep the tcp winsize scale; # if we can't confirm rn then that's fine - written.append(chash) + if final_subchunk: + written.append(chash) now = time.time() if now - treport < 1: continue @@ -2813,7 +2860,7 @@ class HttpCli(object): tabspath = os.path.join(fdir, tnam) self.log("writing to {}".format(tabspath)) sz, sha_hex, sha_b64 = hashcopy( - p_data, f, self.args.s_wr_slp, max_sz + p_data, f, None, max_sz, self.args.s_wr_slp ) if sz == 0: raise Pebkac(400, "empty files in post") @@ -3145,7 +3192,7 @@ class HttpCli(object): wunlink(self.log, fp, vfs.flags) with open(fsenc(fp), "wb", self.args.iobuf) as f: - sz, sha512, _ = hashcopy(p_data, f, self.args.s_wr_slp) + sz, sha512, _ = hashcopy(p_data, f, None, 0, self.args.s_wr_slp) if lim: lim.nup(self.ip) diff --git a/copyparty/httpsrv.py b/copyparty/httpsrv.py index 128a9b97..2737b841 100644 --- a/copyparty/httpsrv.py +++ b/copyparty/httpsrv.py @@ -1,6 +1,7 @@ # coding: utf-8 from __future__ import print_function, unicode_literals +import hashlib import math import os import re @@ -144,6 +145,7 @@ class HttpSrv(object): self.t_periodic: Optional[threading.Thread] = None self.u2fh = FHC() + self.u2sc: dict[str, tuple[int, "hashlib._Hash"]] = {} self.pipes = CachedDict(0.2) self.metrics = Metrics(self) self.nreq = 0 diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 265a9cee..e639ac5d 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -2891,9 +2891,6 @@ class Up2k(object): "user": cj["user"], "addr": ip, "at": at, - "hash": [], - "need": [], - "busy": {}, } for k in ["life"]: if k in cj: @@ -2935,9 +2932,12 @@ class Up2k(object): job = rj break - if job and wark in reg: - # self.log("pop " + wark + " " + job["name"] + " handle_json db", 4) - del reg[wark] + if job: + if wark in reg: + del reg[wark] + job["hash"] = job["need"] = [] + job["done"] = True + job["busy"] = {} if lost: c2 = None @@ -3373,7 +3373,7 @@ class Up2k(object): def handle_chunks( self, ptop: str, wark: str, chashes: list[str] - ) -> tuple[list[str], int, list[list[int]], str, float, bool]: + ) -> tuple[list[str], int, list[list[int]], str, float, int, bool]: with self.mutex, self.reg_mutex: self.db_act = self.vol_act[ptop] = time.time() job = self.registry[ptop].get(wark) @@ -3456,7 +3456,7 @@ class Up2k(object): job["poke"] = time.time() - return chashes, chunksize, coffsets, path, job["lmod"], job["sprs"] + return chashes, chunksize, coffsets, path, job["lmod"], job["size"], job["sprs"] def fast_confirm_chunks( self, ptop: str, wark: str, chashes: list[str] diff --git a/copyparty/util.py b/copyparty/util.py index 5d6a47b4..21f50b84 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -2723,10 +2723,12 @@ def yieldfile(fn: str, bufsz: int) -> Generator[bytes, None, None]: def hashcopy( fin: Generator[bytes, None, None], fout: Union[typing.BinaryIO, typing.IO[Any]], - slp: float = 0, - max_sz: int = 0, + hashobj: Optional["hashlib._Hash"], + max_sz: int, + slp: float, ) -> tuple[int, str, str]: - hashobj = hashlib.sha512() + if not hashobj: + hashobj = hashlib.sha512() tlen = 0 for buf in fin: tlen += len(buf) diff --git a/copyparty/web/up2k.js b/copyparty/web/up2k.js index 9b232be0..a05e414b 100644 --- a/copyparty/web/up2k.js +++ b/copyparty/web/up2k.js @@ -853,8 +853,13 @@ function up2k_init(subtle) { setmsg(suggest_up2k, 'msg'); + var u2szs = u2sz.split(','), + u2sz_min = parseInt(u2szs[0]), + u2sz_tgt = parseInt(u2szs[1]), + u2sz_max = parseInt(u2szs[2]); + var parallel_uploads = ebi('nthread').value = icfg_get('nthread', u2j), - stitch_tgt = ebi('u2szg').value = icfg_get('u2sz', u2sz.split(',')[1]), + stitch_tgt = ebi('u2szg').value = icfg_get('u2sz', u2sz_tgt), uc = {}, fdom_ctr = 0, biggest_file = 0; @@ -2574,8 +2579,7 @@ function up2k_init(subtle) { nparts = upt.nparts, pcar = nparts[0], pcdr = nparts[nparts.length - 1], - snpart = pcar == pcdr ? pcar : ('' + pcar + '~' + pcdr), - tries = 0; + maxsz = u2sz_max * 1024 * 1024; if (t.done) return console.log('done; skip chunk', t.name, t); @@ -2595,6 +2599,30 @@ function up2k_init(subtle) { if (cdr >= t.size) cdr = t.size; + if (cdr - car <= maxsz) + return upload_sub(t, upt, pcar, pcdr, car, cdr, chunksize, 0, car, []); + + var car0 = car, subs = []; + while (car < cdr) { + subs.push([car, Math.min(cdr, car + maxsz)]); + car += maxsz; + } + upload_sub(t, upt, pcar, pcdr, 0, 0, chunksize, car0, subs); + } + + function upload_sub(t, upt, pcar, pcdr, car, cdr, chunksize, car0, subs) { + var nparts = upt.nparts, + is_sub = subs.length; + + if (is_sub) { + var x = subs.shift(); + car = x[0]; + cdr = x[1]; + } + + var snpart = is_sub ? ('' + pcar + '(' + (car-car0) +'+'+ (cdr-car)) : + pcar == pcdr ? pcar : ('' + pcar + '~' + pcdr); + var orz = function (xhr) { st.bytes.inflight -= xhr.bsent; var txt = unpre((xhr.response && xhr.response.err) || xhr.responseText); @@ -2608,6 +2636,10 @@ function up2k_init(subtle) { return; } if (xhr.status == 200) { + car = car0; + if (subs.length) + return upload_sub(t, upt, pcar, pcdr, 0, 0, chunksize, car0, subs); + var bdone = cdr - car; for (var a = pcar; a <= pcdr; a++) { pvis.prog(t, a, Math.min(bdone, chunksize)); @@ -2674,7 +2706,7 @@ function up2k_init(subtle) { toast.warn(9.98, L.u_cuerr.format(snpart, Math.ceil(t.size / chunksize), t.name), t); t.nojoin = t.nojoin || t.postlist.length; // maybe rproxy postsize limit - console.log('chunkpit onerror,', ++tries, t.name, t); + console.log('chunkpit onerror,', t.name, t); orz2(xhr); }; @@ -2692,6 +2724,9 @@ function up2k_init(subtle) { xhr.open('POST', t.purl, true); xhr.setRequestHeader("X-Up2k-Hash", ctxt); xhr.setRequestHeader("X-Up2k-Wark", t.wark); + if (is_sub) + xhr.setRequestHeader("X-Up2k-Subc", car - car0); + xhr.setRequestHeader("X-Up2k-Stat", "{0}/{1}/{2}/{3} {4}/{5} {6}".format( pvis.ctr.ok, pvis.ctr.ng, pvis.ctr.bz, pvis.ctr.q, btot, btot - bfin, st.eta.t.split(' ').pop())); @@ -2812,11 +2847,11 @@ function up2k_init(subtle) { } var read_u2sz = function () { - var el = ebi('u2szg'), n = parseInt(el.value), dv = u2sz.split(','); + var el = ebi('u2szg'), n = parseInt(el.value); stitch_tgt = n = ( - isNaN(n) ? dv[1] : - n < dv[0] ? dv[0] : - n > dv[2] ? dv[2] : n + isNaN(n) ? u2sz_tgt : + n < u2sz_min ? u2sz_min : + n > u2sz_max ? u2sz_max : n ); if (n == dv[1]) sdrop('u2sz'); else swrite('u2sz', n); if (el.value != n) el.value = n;