diff --git a/README.md b/README.md index d537feba..a597b755 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ turn your phone or raspi into a portable file server with resumable uploads/down * [x] sanic multipart parser * [x] load balancer (multiprocessing) * [x] upload (plain multipart, ie6 support) -* [ ] upload (js, resumable, multithreaded) +* [x] upload (js, resumable, multithreaded) * [x] download * [x] browser * [x] media player @@ -27,7 +27,7 @@ turn your phone or raspi into a portable file server with resumable uploads/down * [x] volumes * [x] accounts -summary: it works +summary: close to beta # dependencies @@ -78,6 +78,5 @@ roughly sorted by priority * support pillow-simd * cache sha512 chunks on client * symlink existing files on upload - * enforce chunksize and sha512('\n'.join(chunks)) * figure out the deal with pixel3a not being connectable as hotspot * pixel3a having unpredictable 3sec latency in general :|||| diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index a6fe3b9e..62430790 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -247,15 +247,69 @@ class HttpCli(object): except: raise Pebkac(422, "you POSTed invalid json") - # \suger0r/ - x = self.conn.hsrv.broker.put(True, "up2k._get_wark", body) - wark = x.get() - msg = '{{ "wark": "{}" }}'.format(wark) - self.log(msg) - self.reply(msg.encode("utf-8"), headers=["Content-Type: application/json"]) + # prefer this over undot; no reason to allow traversion + if "/" in body["name"]: + raise Pebkac(400, "folders verboten") + + # up2k-php compat + for k in "/chunkpit.php", "/handshake.php": + if self.vpath.endswith(k): + self.vpath = self.vpath[: -len(k)] + + vfs, rem = self.conn.auth.vfs.get(self.vpath, self.uname, False, True) + + body["vdir"] = os.path.join(vfs.realpath, rem) + body["addr"] = self.conn.addr[0] + + x = self.conn.hsrv.broker.put(True, "up2k.handle_json", body) + response = x.get() + response = json.dumps(response) + + self.log(response) + self.reply(response.encode("utf-8"), headers=["Content-Type: application/json"]) def handle_post_binary(self): - raise Exception("todo") + try: + remains = int(self.headers["content-length"]) + except: + raise Pebkac(400, "you must supply a content-length for binary POST") + + try: + chash = self.headers["x-up2k-hash"] + wark = self.headers["x-up2k-wark"] + except KeyError: + raise Pebkac(400, "need hash and wark headers for binary POST") + + x = self.conn.hsrv.broker.put(True, "up2k.handle_chunk", wark, chash) + response = x.get() + chunksize, ofs, path = response + + if self.args.nw: + path = os.devnull + + if remains > chunksize: + raise Pebkac(400, "your chunk is too big to fit") + + self.log("writing {} #{} @{} len {}".format(path, chash, ofs, remains)) + + reader = read_socket(self.sr, remains) + + with open(path, "rb+") as f: + f.seek(ofs) + post_sz, _, sha_b64 = hashcopy(self.conn, reader, f) + + if sha_b64 != chash: + raise Pebkac( + 400, + "your chunk got corrupted somehow:\n{} expected,\n{} received ({} bytes)".format( + chash, sha_b64, post_sz + ), + ) + + x = self.conn.hsrv.broker.put(True, "up2k.confirm_chunk", wark, chash) + response = x.get() + + self.loud_reply("thank") def handle_login(self): pwd = self.parser.require("cppwd", 64) @@ -306,11 +360,11 @@ class HttpCli(object): try: with open(fsenc(fn), "wb") as f: self.log("writing to {0}".format(fn)) - sz, sha512 = hashcopy(self.conn, p_data, f) + sz, sha512_hex, _ = hashcopy(self.conn, p_data, f) if sz == 0: raise Pebkac(400, "empty files in post") - files.append([sz, sha512]) + files.append([sz, sha512_hex]) except Pebkac: if not nullwrite: diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 3aac7a5b..e7e3f65e 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -2,36 +2,130 @@ from __future__ import print_function, unicode_literals +import os import re +import time +import math import base64 import hashlib +import threading +from copy import deepcopy from .util import Pebkac class Up2k(object): + """ + TODO: + * documentation + * registry persistence + * ~/.config flatfiles for active jobs + * wark->path database for finished uploads + """ + def __init__(self, broker): self.broker = broker self.args = broker.args self.log = broker.log + # config self.salt = "hunter2" # TODO: config + # state + self.registry = {} + self.mutex = threading.Lock() + + # static self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$") - def _get_wark(self, j): - if len(j["name"]) > 4096 or len(j["hash"]) > 256: - raise Pebkac(400, "bad name or numchunks") + def handle_json(self, cj): + wark = self._get_wark(cj) + with self.mutex: + try: + job = self.registry[wark] + if job["vdir"] != cj["vdir"] or job["name"] != cj["name"]: + raise Pebkac(400, "unexpected filepath") - for k in j["hash"]: + except KeyError: + job = { + "wark": wark, + "t0": int(time.time()), + "addr": cj["addr"], + "vdir": cj["vdir"], + # client-provided, sanitized by _get_wark: + "name": cj["name"], + "size": cj["size"], + "hash": deepcopy(cj["hash"]), + # upload state + "pend": deepcopy(cj["hash"]), + } + self._new_upload(job) + + return { + "name": job["name"], + "size": job["size"], + "hash": job["pend"], + "wark": wark, + } + + def handle_chunk(self, wark, chash): + with self.mutex: + job = self.registry.get(wark) + if not job: + raise Pebkac(404, "unknown wark") + + if chash not in job["pend"]: + raise Pebkac(200, "already got that but thanks??") + + try: + nchunk = job["hash"].index(chash) + except ValueError: + raise Pebkac(404, "unknown chunk") + + chunksize = self._get_chunksize(job["size"]) + ofs = nchunk * chunksize + + path = os.path.join(job["vdir"], job["name"]) + + return [chunksize, ofs, path] + + def confirm_chunk(self, wark, chash): + with self.mutex: + self.registry[wark]["pend"].remove(chash) + + def _get_chunksize(self, filesize): + chunksize = 1024 * 1024 + stepsize = 512 * 1024 + while True: + for mul in [1, 2]: + nchunks = math.ceil(filesize * 1.0 / chunksize) + if nchunks <= 256: + return chunksize + + chunksize += stepsize + stepsize *= mul + + def _get_wark(self, cj): + if len(cj["name"]) > 1024 or len(cj["hash"]) > 256: + raise Pebkac(400, "name or numchunks not according to spec") + + for k in cj["hash"]: if not self.r_hash.match(k): - raise Pebkac(400, "at least one bad hash") + raise Pebkac(400, "at least one hash is not according to spec") - plaintext = "\n".join([self.salt, j["name"], str(j["size"]), *j["hash"]]) + # server-reproducible file identifier, independent of name or location + ident = "\n".join([self.salt, str(cj["size"]), *cj["hash"]]) hasher = hashlib.sha512() - hasher.update(plaintext.encode("utf-8")) + hasher.update(ident.encode("utf-8")) digest = hasher.digest()[:32] wark = base64.urlsafe_b64encode(digest) return wark.decode("utf-8").rstrip("=") + + def _new_upload(self, job): + self.registry[job["wark"]] = job + path = os.path.join(job["vdir"], job["name"]) + with open(path, "wb") as f: + f.seek(job["size"] - 1) + f.write(b"e") diff --git a/copyparty/util.py b/copyparty/util.py index 2c145b6a..27f52f91 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -3,6 +3,7 @@ from __future__ import print_function, unicode_literals import re import sys +import base64 import struct import hashlib import threading @@ -17,11 +18,9 @@ from .stolen import surrogateescape if not PY2: from urllib.parse import unquote_to_bytes as unquote from urllib.parse import quote_from_bytes as quote - from queue import Queue # noqa: F401 else: from urllib import unquote # pylint: disable=no-name-in-module from urllib import quote # pylint: disable=no-name-in-module - from Queue import Queue # pylint: disable=no-name-in-module # noqa: F401 surrogateescape.register_surrogateescape() @@ -385,6 +384,18 @@ def fsenc(txt): return txt.encode(FS_ENCODING, "surrogateescape") +def read_socket(sr, total_size): + remains = total_size + while remains > 0: + bufsz = 32 * 1024 + if bufsz > remains: + bufsz = remains + + buf = sr.recv(bufsz) + remains -= len(buf) + yield buf + + def hashcopy(actor, fin, fout): u32_lim = int((2 ** 31) * 0.9) hashobj = hashlib.sha512() @@ -398,7 +409,10 @@ def hashcopy(actor, fin, fout): hashobj.update(buf) fout.write(buf) - return tlen, hashobj.hexdigest() + digest32 = hashobj.digest()[:32] + digest_b64 = base64.urlsafe_b64encode(digest32).decode("utf-8").rstrip("=") + + return tlen, hashobj.hexdigest(), digest_b64 def unescape_cookie(orig): diff --git a/copyparty/web/up2k.js b/copyparty/web/up2k.js index 742aadfa..19ad966d 100644 --- a/copyparty/web/up2k.js +++ b/copyparty/web/up2k.js @@ -443,7 +443,7 @@ function up2k_init(have_crypto) { }; var hash_done = function (hashbuf) { - t.hash.push(buf2b64(hashbuf).substr(0, 43)); + t.hash.push(buf2b64(hashbuf.slice(0, 32)).replace(/=$/, '')); prog(t.n, nchunk, col_hashed); if (++nchunk < nchunks) { @@ -451,6 +451,13 @@ function up2k_init(have_crypto) { return segm_next(); } + // TODO remove + if (t.n == 0) { + var ts = new Date().getTime(); + var spd = (t.size / ((ts - t.t0) / 1000.)) / (1024 * 1024.); + alert('{0} ms, {1} MB/s\n'.format(ts - t.t0, spd.toFixed(3)) + t.hash.join('\n')); + } + o('f{0}t'.format(t.n)).innerHTML = 'connecting'; st.busy.hash.splice(st.busy.hash.indexOf(t), 1); st.todo.handshake.push(t); @@ -472,17 +479,14 @@ function up2k_init(have_crypto) { var t = st.todo.handshake.shift(); st.busy.handshake.push(t); - // TODO remove - var ts = new Date().getTime(); - var spd = (t.size / ((ts - t.t0) / 1000.)) / (1024 * 1024.); - alert('{0} ms, {1} MB/s\n'.format(ts - t.t0, spd.toFixed(3)) + t.hash.join('\n')); - var xhr = new XMLHttpRequest(); xhr.onload = function (ev) { if (xhr.status == 200) { + var response = JSON.parse(xhr.responseText); + t.postlist = []; - t.wark = xhr.response.wark; - var missing = xhr.response.hash; + t.wark = response.wark; + var missing = response.hash; for (var a = 0; a < missing.length; a++) { var idx = t.hash.indexOf(missing[a]); if (idx < 0) @@ -510,11 +514,13 @@ function up2k_init(have_crypto) { } else alert("server broke (error {0}):\n\"{1}\"\n".format( - xhr.status, (xhr.response && xhr.response.err) || + xhr.status, + (xhr.response && xhr.response.err) || + (xhr.responseText && xhr.responseText) || "no further information")); }; xhr.open('POST', 'handshake.php', true); - xhr.responseType = 'json'; + xhr.responseType = 'text'; xhr.send(JSON.stringify({ "name": t.name, "size": t.size, @@ -566,7 +572,9 @@ function up2k_init(have_crypto) { } else alert("server broke (error {0}):\n\"{1}\"\n".format( - xhr.status, (xhr.response && xhr.response.err) || + xhr.status, + (xhr.response && xhr.response.err) || + (xhr.responseText && xhr.responseText) || "no further information")); }; xhr.open('POST', 'chunkpit.php', true); @@ -575,7 +583,7 @@ function up2k_init(have_crypto) { xhr.setRequestHeader("X-Up2k-Wark", t.wark); xhr.setRequestHeader('Content-Type', 'application/octet-stream'); xhr.overrideMimeType('Content-Type', 'application/octet-stream'); - xhr.responseType = 'json'; + xhr.responseType = 'text'; xhr.send(ev.target.result); }; diff --git a/docs/notes.sh b/docs/notes.sh index 48d81d7a..ce7e94be 100644 --- a/docs/notes.sh +++ b/docs/notes.sh @@ -39,8 +39,9 @@ wget -S --header='Accept-Encoding: gzip' -U 'MSIE 6.0; SV1' http://127.0.0.1:123 ## ## sha512(file) | base64 +## usage: shab64 chunksize_mb filepath -f=/boot/vmlinuz-4.19-x86_64; sp=2; v=0; sz=$(stat -c%s "$f"); while true; do w=$((v+sp*1024*1024)); printf $(tail -c +$((v+1)) "$f" | head -c $((w-v)) | sha512sum | sed -r 's/ .*//;s/(..)/\\x\1/g') | base64 -w0 | cut -c-44 | tr '+/' '-_'; v=$w; [ $v -lt $sz ] || break; done +shab64() { sp=$1; f="$2"; v=0; sz=$(stat -c%s "$f"); while true; do w=$((v+sp*1024*1024)); printf $(tail -c +$((v+1)) "$f" | head -c $((w-v)) | sha512sum | cut -c-64 | sed -r 's/ .*//;s/(..)/\\x\1/g') | base64 -w0 | cut -c-43 | tr '+/' '-_'; v=$w; [ $v -lt $sz ] || break; done; } ##