up2k works (no registry persistence yet)

This commit is contained in:
ed 2019-07-02 00:14:55 +00:00
parent 19d6487eaf
commit 2ccb9facd7
6 changed files with 205 additions and 35 deletions

View file

@ -18,7 +18,7 @@ turn your phone or raspi into a portable file server with resumable uploads/down
* [x] sanic multipart parser * [x] sanic multipart parser
* [x] load balancer (multiprocessing) * [x] load balancer (multiprocessing)
* [x] upload (plain multipart, ie6 support) * [x] upload (plain multipart, ie6 support)
* [ ] upload (js, resumable, multithreaded) * [x] upload (js, resumable, multithreaded)
* [x] download * [x] download
* [x] browser * [x] browser
* [x] media player * [x] media player
@ -27,7 +27,7 @@ turn your phone or raspi into a portable file server with resumable uploads/down
* [x] volumes * [x] volumes
* [x] accounts * [x] accounts
summary: it works summary: close to beta
# dependencies # dependencies
@ -78,6 +78,5 @@ roughly sorted by priority
* support pillow-simd * support pillow-simd
* cache sha512 chunks on client * cache sha512 chunks on client
* symlink existing files on upload * symlink existing files on upload
* enforce chunksize and sha512('\n'.join(chunks))
* figure out the deal with pixel3a not being connectable as hotspot * figure out the deal with pixel3a not being connectable as hotspot
* pixel3a having unpredictable 3sec latency in general :|||| * pixel3a having unpredictable 3sec latency in general :||||

View file

@ -247,15 +247,69 @@ class HttpCli(object):
except: except:
raise Pebkac(422, "you POSTed invalid json") raise Pebkac(422, "you POSTed invalid json")
# \suger0r/ # prefer this over undot; no reason to allow traversion
x = self.conn.hsrv.broker.put(True, "up2k._get_wark", body) if "/" in body["name"]:
wark = x.get() raise Pebkac(400, "folders verboten")
msg = '{{ "wark": "{}" }}'.format(wark)
self.log(msg) # up2k-php compat
self.reply(msg.encode("utf-8"), headers=["Content-Type: application/json"]) for k in "/chunkpit.php", "/handshake.php":
if self.vpath.endswith(k):
self.vpath = self.vpath[: -len(k)]
vfs, rem = self.conn.auth.vfs.get(self.vpath, self.uname, False, True)
body["vdir"] = os.path.join(vfs.realpath, rem)
body["addr"] = self.conn.addr[0]
x = self.conn.hsrv.broker.put(True, "up2k.handle_json", body)
response = x.get()
response = json.dumps(response)
self.log(response)
self.reply(response.encode("utf-8"), headers=["Content-Type: application/json"])
def handle_post_binary(self): def handle_post_binary(self):
raise Exception("todo") try:
remains = int(self.headers["content-length"])
except:
raise Pebkac(400, "you must supply a content-length for binary POST")
try:
chash = self.headers["x-up2k-hash"]
wark = self.headers["x-up2k-wark"]
except KeyError:
raise Pebkac(400, "need hash and wark headers for binary POST")
x = self.conn.hsrv.broker.put(True, "up2k.handle_chunk", wark, chash)
response = x.get()
chunksize, ofs, path = response
if self.args.nw:
path = os.devnull
if remains > chunksize:
raise Pebkac(400, "your chunk is too big to fit")
self.log("writing {} #{} @{} len {}".format(path, chash, ofs, remains))
reader = read_socket(self.sr, remains)
with open(path, "rb+") as f:
f.seek(ofs)
post_sz, _, sha_b64 = hashcopy(self.conn, reader, f)
if sha_b64 != chash:
raise Pebkac(
400,
"your chunk got corrupted somehow:\n{} expected,\n{} received ({} bytes)".format(
chash, sha_b64, post_sz
),
)
x = self.conn.hsrv.broker.put(True, "up2k.confirm_chunk", wark, chash)
response = x.get()
self.loud_reply("thank")
def handle_login(self): def handle_login(self):
pwd = self.parser.require("cppwd", 64) pwd = self.parser.require("cppwd", 64)
@ -306,11 +360,11 @@ class HttpCli(object):
try: try:
with open(fsenc(fn), "wb") as f: with open(fsenc(fn), "wb") as f:
self.log("writing to {0}".format(fn)) self.log("writing to {0}".format(fn))
sz, sha512 = hashcopy(self.conn, p_data, f) sz, sha512_hex, _ = hashcopy(self.conn, p_data, f)
if sz == 0: if sz == 0:
raise Pebkac(400, "empty files in post") raise Pebkac(400, "empty files in post")
files.append([sz, sha512]) files.append([sz, sha512_hex])
except Pebkac: except Pebkac:
if not nullwrite: if not nullwrite:

View file

@ -2,36 +2,130 @@
from __future__ import print_function, unicode_literals from __future__ import print_function, unicode_literals
import os
import re import re
import time
import math
import base64 import base64
import hashlib import hashlib
import threading
from copy import deepcopy
from .util import Pebkac from .util import Pebkac
class Up2k(object): class Up2k(object):
"""
TODO:
* documentation
* registry persistence
* ~/.config flatfiles for active jobs
* wark->path database for finished uploads
"""
def __init__(self, broker): def __init__(self, broker):
self.broker = broker self.broker = broker
self.args = broker.args self.args = broker.args
self.log = broker.log self.log = broker.log
# config
self.salt = "hunter2" # TODO: config self.salt = "hunter2" # TODO: config
# state
self.registry = {}
self.mutex = threading.Lock()
# static
self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$") self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$")
def _get_wark(self, j): def handle_json(self, cj):
if len(j["name"]) > 4096 or len(j["hash"]) > 256: wark = self._get_wark(cj)
raise Pebkac(400, "bad name or numchunks") with self.mutex:
try:
job = self.registry[wark]
if job["vdir"] != cj["vdir"] or job["name"] != cj["name"]:
raise Pebkac(400, "unexpected filepath")
for k in j["hash"]: except KeyError:
job = {
"wark": wark,
"t0": int(time.time()),
"addr": cj["addr"],
"vdir": cj["vdir"],
# client-provided, sanitized by _get_wark:
"name": cj["name"],
"size": cj["size"],
"hash": deepcopy(cj["hash"]),
# upload state
"pend": deepcopy(cj["hash"]),
}
self._new_upload(job)
return {
"name": job["name"],
"size": job["size"],
"hash": job["pend"],
"wark": wark,
}
def handle_chunk(self, wark, chash):
with self.mutex:
job = self.registry.get(wark)
if not job:
raise Pebkac(404, "unknown wark")
if chash not in job["pend"]:
raise Pebkac(200, "already got that but thanks??")
try:
nchunk = job["hash"].index(chash)
except ValueError:
raise Pebkac(404, "unknown chunk")
chunksize = self._get_chunksize(job["size"])
ofs = nchunk * chunksize
path = os.path.join(job["vdir"], job["name"])
return [chunksize, ofs, path]
def confirm_chunk(self, wark, chash):
with self.mutex:
self.registry[wark]["pend"].remove(chash)
def _get_chunksize(self, filesize):
chunksize = 1024 * 1024
stepsize = 512 * 1024
while True:
for mul in [1, 2]:
nchunks = math.ceil(filesize * 1.0 / chunksize)
if nchunks <= 256:
return chunksize
chunksize += stepsize
stepsize *= mul
def _get_wark(self, cj):
if len(cj["name"]) > 1024 or len(cj["hash"]) > 256:
raise Pebkac(400, "name or numchunks not according to spec")
for k in cj["hash"]:
if not self.r_hash.match(k): if not self.r_hash.match(k):
raise Pebkac(400, "at least one bad hash") raise Pebkac(400, "at least one hash is not according to spec")
plaintext = "\n".join([self.salt, j["name"], str(j["size"]), *j["hash"]]) # server-reproducible file identifier, independent of name or location
ident = "\n".join([self.salt, str(cj["size"]), *cj["hash"]])
hasher = hashlib.sha512() hasher = hashlib.sha512()
hasher.update(plaintext.encode("utf-8")) hasher.update(ident.encode("utf-8"))
digest = hasher.digest()[:32] digest = hasher.digest()[:32]
wark = base64.urlsafe_b64encode(digest) wark = base64.urlsafe_b64encode(digest)
return wark.decode("utf-8").rstrip("=") return wark.decode("utf-8").rstrip("=")
def _new_upload(self, job):
self.registry[job["wark"]] = job
path = os.path.join(job["vdir"], job["name"])
with open(path, "wb") as f:
f.seek(job["size"] - 1)
f.write(b"e")

View file

@ -3,6 +3,7 @@ from __future__ import print_function, unicode_literals
import re import re
import sys import sys
import base64
import struct import struct
import hashlib import hashlib
import threading import threading
@ -17,11 +18,9 @@ from .stolen import surrogateescape
if not PY2: if not PY2:
from urllib.parse import unquote_to_bytes as unquote from urllib.parse import unquote_to_bytes as unquote
from urllib.parse import quote_from_bytes as quote from urllib.parse import quote_from_bytes as quote
from queue import Queue # noqa: F401
else: else:
from urllib import unquote # pylint: disable=no-name-in-module from urllib import unquote # pylint: disable=no-name-in-module
from urllib import quote # pylint: disable=no-name-in-module from urllib import quote # pylint: disable=no-name-in-module
from Queue import Queue # pylint: disable=no-name-in-module # noqa: F401
surrogateescape.register_surrogateescape() surrogateescape.register_surrogateescape()
@ -385,6 +384,18 @@ def fsenc(txt):
return txt.encode(FS_ENCODING, "surrogateescape") return txt.encode(FS_ENCODING, "surrogateescape")
def read_socket(sr, total_size):
remains = total_size
while remains > 0:
bufsz = 32 * 1024
if bufsz > remains:
bufsz = remains
buf = sr.recv(bufsz)
remains -= len(buf)
yield buf
def hashcopy(actor, fin, fout): def hashcopy(actor, fin, fout):
u32_lim = int((2 ** 31) * 0.9) u32_lim = int((2 ** 31) * 0.9)
hashobj = hashlib.sha512() hashobj = hashlib.sha512()
@ -398,7 +409,10 @@ def hashcopy(actor, fin, fout):
hashobj.update(buf) hashobj.update(buf)
fout.write(buf) fout.write(buf)
return tlen, hashobj.hexdigest() digest32 = hashobj.digest()[:32]
digest_b64 = base64.urlsafe_b64encode(digest32).decode("utf-8").rstrip("=")
return tlen, hashobj.hexdigest(), digest_b64
def unescape_cookie(orig): def unescape_cookie(orig):

View file

@ -443,7 +443,7 @@ function up2k_init(have_crypto) {
}; };
var hash_done = function (hashbuf) { var hash_done = function (hashbuf) {
t.hash.push(buf2b64(hashbuf).substr(0, 43)); t.hash.push(buf2b64(hashbuf.slice(0, 32)).replace(/=$/, ''));
prog(t.n, nchunk, col_hashed); prog(t.n, nchunk, col_hashed);
if (++nchunk < nchunks) { if (++nchunk < nchunks) {
@ -451,6 +451,13 @@ function up2k_init(have_crypto) {
return segm_next(); return segm_next();
} }
// TODO remove
if (t.n == 0) {
var ts = new Date().getTime();
var spd = (t.size / ((ts - t.t0) / 1000.)) / (1024 * 1024.);
alert('{0} ms, {1} MB/s\n'.format(ts - t.t0, spd.toFixed(3)) + t.hash.join('\n'));
}
o('f{0}t'.format(t.n)).innerHTML = 'connecting'; o('f{0}t'.format(t.n)).innerHTML = 'connecting';
st.busy.hash.splice(st.busy.hash.indexOf(t), 1); st.busy.hash.splice(st.busy.hash.indexOf(t), 1);
st.todo.handshake.push(t); st.todo.handshake.push(t);
@ -472,17 +479,14 @@ function up2k_init(have_crypto) {
var t = st.todo.handshake.shift(); var t = st.todo.handshake.shift();
st.busy.handshake.push(t); st.busy.handshake.push(t);
// TODO remove
var ts = new Date().getTime();
var spd = (t.size / ((ts - t.t0) / 1000.)) / (1024 * 1024.);
alert('{0} ms, {1} MB/s\n'.format(ts - t.t0, spd.toFixed(3)) + t.hash.join('\n'));
var xhr = new XMLHttpRequest(); var xhr = new XMLHttpRequest();
xhr.onload = function (ev) { xhr.onload = function (ev) {
if (xhr.status == 200) { if (xhr.status == 200) {
var response = JSON.parse(xhr.responseText);
t.postlist = []; t.postlist = [];
t.wark = xhr.response.wark; t.wark = response.wark;
var missing = xhr.response.hash; var missing = response.hash;
for (var a = 0; a < missing.length; a++) { for (var a = 0; a < missing.length; a++) {
var idx = t.hash.indexOf(missing[a]); var idx = t.hash.indexOf(missing[a]);
if (idx < 0) if (idx < 0)
@ -510,11 +514,13 @@ function up2k_init(have_crypto) {
} }
else else
alert("server broke (error {0}):\n\"{1}\"\n".format( alert("server broke (error {0}):\n\"{1}\"\n".format(
xhr.status, (xhr.response && xhr.response.err) || xhr.status,
(xhr.response && xhr.response.err) ||
(xhr.responseText && xhr.responseText) ||
"no further information")); "no further information"));
}; };
xhr.open('POST', 'handshake.php', true); xhr.open('POST', 'handshake.php', true);
xhr.responseType = 'json'; xhr.responseType = 'text';
xhr.send(JSON.stringify({ xhr.send(JSON.stringify({
"name": t.name, "name": t.name,
"size": t.size, "size": t.size,
@ -566,7 +572,9 @@ function up2k_init(have_crypto) {
} }
else else
alert("server broke (error {0}):\n\"{1}\"\n".format( alert("server broke (error {0}):\n\"{1}\"\n".format(
xhr.status, (xhr.response && xhr.response.err) || xhr.status,
(xhr.response && xhr.response.err) ||
(xhr.responseText && xhr.responseText) ||
"no further information")); "no further information"));
}; };
xhr.open('POST', 'chunkpit.php', true); xhr.open('POST', 'chunkpit.php', true);
@ -575,7 +583,7 @@ function up2k_init(have_crypto) {
xhr.setRequestHeader("X-Up2k-Wark", t.wark); xhr.setRequestHeader("X-Up2k-Wark", t.wark);
xhr.setRequestHeader('Content-Type', 'application/octet-stream'); xhr.setRequestHeader('Content-Type', 'application/octet-stream');
xhr.overrideMimeType('Content-Type', 'application/octet-stream'); xhr.overrideMimeType('Content-Type', 'application/octet-stream');
xhr.responseType = 'json'; xhr.responseType = 'text';
xhr.send(ev.target.result); xhr.send(ev.target.result);
}; };

View file

@ -39,8 +39,9 @@ wget -S --header='Accept-Encoding: gzip' -U 'MSIE 6.0; SV1' http://127.0.0.1:123
## ##
## sha512(file) | base64 ## sha512(file) | base64
## usage: shab64 chunksize_mb filepath
f=/boot/vmlinuz-4.19-x86_64; sp=2; v=0; sz=$(stat -c%s "$f"); while true; do w=$((v+sp*1024*1024)); printf $(tail -c +$((v+1)) "$f" | head -c $((w-v)) | sha512sum | sed -r 's/ .*//;s/(..)/\\x\1/g') | base64 -w0 | cut -c-44 | tr '+/' '-_'; v=$w; [ $v -lt $sz ] || break; done shab64() { sp=$1; f="$2"; v=0; sz=$(stat -c%s "$f"); while true; do w=$((v+sp*1024*1024)); printf $(tail -c +$((v+1)) "$f" | head -c $((w-v)) | sha512sum | cut -c-64 | sed -r 's/ .*//;s/(..)/\\x\1/g') | base64 -w0 | cut -c-43 | tr '+/' '-_'; v=$w; [ $v -lt $sz ] || break; done; }
## ##