From 9a45549b66c04c0003f533cb8726f5e93b20434e Mon Sep 17 00:00:00 2001 From: ed Date: Sat, 7 Aug 2021 03:45:50 +0200 Subject: [PATCH] adding upload rules --- README.md | 22 ++++- copyparty/authsrv.py | 199 +++++++++++++++++++++++++++++++++++++++++- copyparty/httpcli.py | 97 +++++++++++++++----- copyparty/sutil.py | 3 +- copyparty/svchub.py | 7 +- copyparty/up2k.py | 13 +++ copyparty/util.py | 14 ++- copyparty/web/up2k.js | 11 +-- 8 files changed, 329 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index 499c82d9..331ca679 100644 --- a/README.md +++ b/README.md @@ -467,12 +467,30 @@ note: * `e2tsr` is probably always overkill, since `e2ds`/`e2dsa` would pick up any file modifications and `e2ts` would then reindex those, unless there is a new copyparty version with new parsers and the release note says otherwise * the rescan button in the admin panel has no effect unless the volume has `-e2ds` or higher -you can choose to only index filename/path/size/last-modified (and not the hash of the file contents) by setting `--no-hash` or the volume-flag `cdhash`, this has the following consequences: +you can choose to only index filename/path/size/last-modified (and not the hash of the file contents) by setting `--no-hash` or the volume-flag `:c,dhash`, this has the following consequences: * initial indexing is way faster, especially when the volume is on a network disk * makes it impossible to [file-search](#file-search) * if someone uploads the same file contents, the upload will not be detected as a dupe, so it will not get symlinked or rejected -if you set `--no-hash`, you can enable hashing for specific volumes using flag `cehash` +if you set `--no-hash`, you can enable hashing for specific volumes using flag `:c,ehash` + + +## upload rules (Coming Soon™) + +you can set upload rules using volume flags, some examples: + +* `:c,sz=1k-3m` sets allowed filesize between 1 KiB and 3 MiB inclusive (suffixes: b, k, m, g) +* `:c,nosub` disallow uploading into subdirectories; goes well with `rotn` and `rotf`: +* `:c,rotn=1000,2` moves uploads into subfolders, up to 1000 files in each folder before making a new one, two levels deep (must be at least 1) +* `:c,rotf=%Y/%m/%d/%H` enforces files to be uploaded into a structure of subfolders according to that date format + * if someone uploads to `/foo/bar` the path would be rewritten to `/foo/bar/2021/08/06/23` for example + * but the actual date is not verified, just the structure, so the uploader can choose any values which conform to the format string + * just to avoid additional complexity in up2k which is enough of a mess already + +you can also set transaction limits which apply per-IP and per-volume, but these assume `-j 1` (default) otherwise the limits will be off, for example `-j 4` would allow anywhere between 1x and 4x the limits you set depending on which processing node the client gets routed to + +* `:c,maxn=250,3600` allows 250 files over 1 hour from each IP (tracked per-volume) +* `:c,maxb=1g,300` allows 1 GiB total over 5 minutes from each IP (tracked per-volume) ## database location diff --git a/copyparty/authsrv.py b/copyparty/authsrv.py index c3fdfa1f..a31684c0 100644 --- a/copyparty/authsrv.py +++ b/copyparty/authsrv.py @@ -5,12 +5,23 @@ import re import os import sys import stat +import time import base64 import hashlib import threading +from datetime import datetime from .__init__ import WINDOWS -from .util import IMPLICATIONS, uncyg, undot, absreal, Pebkac, fsdec, fsenc, statdir +from .util import ( + IMPLICATIONS, + uncyg, + undot, + unhumanize, + absreal, + Pebkac, + fsenc, + statdir, +) from .bos import bos @@ -30,6 +41,154 @@ class AXS(object): ) +class Lim(object): + def __init__(self): + self.nups = {} # num tracker + self.bups = {} # byte tracker list + self.bupc = {} # byte tracker cache + + self.nosub = False # disallow subdirectories + + self.smin = None # filesize min + self.smax = None # filesize max + + self.bwin = None # bytes window + self.bmax = None # bytes max + self.nwin = None # num window + self.nmax = None # num max + + self.rotn = None # rot num files + self.rotl = None # rot depth + self.rotf = None # rot datefmt + self.rot_re = None # rotf check + + def set_rotf(self, fmt): + self.rotf = fmt + r = re.escape(fmt).replace("%Y", "[0-9]{4}").replace("%j", "[0-9]{3}") + r = re.sub("%[mdHMSWU]", "[0-9]{2}", r) + self.rot_re = re.compile("(^|/)" + r + "$") + + def all(self, ip, rem, sz, abspath): + self.chk_nup(ip) + self.chk_bup(ip) + self.chk_rem(rem) + if sz != -1: + self.chk_sz(sz) + + ap2, vp2 = self.rot(abspath) + if abspath == ap2: + return ap2, rem + + return ap2, ("{}/{}".format(rem, vp2) if rem else vp2) + + def chk_sz(self, sz): + if self.smin is not None and sz < self.smin: + raise Pebkac(400, "file too small") + + if self.smax is not None and sz > self.smax: + raise Pebkac(400, "file too big") + + def chk_rem(self, rem): + if self.nosub and rem: + raise Pebkac(500, "no subdirectories allowed") + + def rot(self, path): + if not self.rotf and not self.rotn: + return path, "" + + if self.rotf: + path = path.rstrip("/\\") + if self.rot_re.search(path.replace("\\", "/")): + return path, "" + + suf = datetime.utcnow().strftime(self.rotf) + if path: + path += "/" + + return path + suf, suf + + ret = self.dive(path, self.rotl) + if not ret: + raise Pebkac(500, "no available slots in volume") + + d = ret[len(path) :].strip("/\\").replace("\\", "/") + return ret, d + + def dive(self, path, lvs): + items = bos.listdir(path) + + if not lvs: + # at leaf level + return None if len(items) >= self.rotn else "" + + dirs = [int(x) for x in items if x and all(y in "1234567890" for y in x)] + dirs.sort() + + if not dirs: + # no branches yet; make one + sub = os.path.join(path, "0") + bos.mkdir(sub) + else: + # try newest branch only + sub = os.path.join(path, str(dirs[-1])) + + ret = self.dive(sub, lvs - 1) + if ret is not None: + return os.path.join(sub, ret) + + if len(dirs) >= self.rotn: + # full branch or root + return None + + # make a branch + sub = os.path.join(path, str(dirs[-1] + 1)) + bos.mkdir(sub) + ret = self.dive(sub, lvs - 1) + if ret is None: + raise Pebkac(500, "rotation bug") + + return os.path.join(sub, ret) + + def nup(self, ip): + try: + self.nups[ip].append(time.time()) + except: + self.nups[ip] = [time.time()] + + def bup(self, ip, nbytes): + v = [time.time(), nbytes] + try: + self.bups[ip].append(v) + self.bupc[ip] += nbytes + except: + self.bups[ip] = [v] + self.bupc[ip] = nbytes + + def chk_nup(self, ip): + if not self.nmax or ip not in self.nups: + return + + nups = self.nups[ip] + cutoff = time.time() - self.nwin + while nups and nups[0] < cutoff: + nups.pop(0) + + if len(nups) >= self.nmax: + raise Pebkac(429, "too many uploads") + + def chk_bup(self, ip): + if not self.bmax or ip not in self.bups: + return + + bups = self.bups[ip] + cutoff = time.time() - self.bwin + while bups and bups[0][0] < cutoff: + self.bupc[ip] -= bups.pop(0)[1] + + if len(bups) >= self.bmax: + raise Pebkac(429, "ingress saturated") + + class VFS(object): """single level in the virtual fs""" @@ -42,6 +201,7 @@ class VFS(object): self.nodes = {} # child nodes self.histtab = None # all realpath->histpath self.dbv = None # closest full/non-jump parent + self.lim = None # type: Lim # upload limits; only set for dbv if realpath: self.histpath = os.path.join(realpath, ".hist") # db / thumbcache @@ -172,6 +332,7 @@ class VFS(object): return vn, rem def get_dbv(self, vrem): + # type: (str) -> tuple[VFS, str] dbv = self.dbv if not dbv: return self, vrem @@ -604,6 +765,42 @@ class AuthSrv(object): vfs.histtab = {v.realpath: v.histpath for v in vfs.all_vols.values()} + for vol in vfs.all_vols.values(): + lim = Lim() + use = False + + if vol.flags.get("nosub"): + use = True + lim.nosub = True + + v = vol.flags.get("sz") + if v: + use = True + lim.smin, lim.smax = [unhumanize(x) for x in v.split("-")] + + v = vol.flags.get("rotn") + if v: + use = True + lim.rotn, lim.rotl = [int(x) for x in v.split(",")] + + v = vol.flags.get("rotf") + if v: + use = True + lim.set_rotf(v) + + v = vol.flags.get("maxn") + if v: + use = True + lim.nmax, lim.nwin = [int(x) for x in v.split(",")] + + v = vol.flags.get("maxb") + if v: + use = True + lim.bmax, lim.bwin = [unhumanize(x) for x in v.split(",")] + + if use: + vol.lim = lim + all_mte = {} errors = False for vol in vfs.all_vols.values(): diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 858ae279..d50fb036 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -16,7 +16,7 @@ import calendar from .__init__ import E, PY2, WINDOWS, ANYWIN, unicode from .util import * # noqa # pylint: disable=unused-wildcard-import from .bos import bos -from .authsrv import AuthSrv +from .authsrv import AuthSrv, Lim from .szip import StreamZip from .star import StreamTar @@ -491,7 +491,11 @@ class HttpCli(object): def dump_to_file(self): reader, remains = self.get_body_reader() vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True) + lim = vfs.get_dbv(rem)[0].lim fdir = os.path.join(vfs.realpath, rem) + if lim: + fdir, rem = lim.all(self.ip, rem, remains, fdir) + bos.makedirs(fdir) addr = self.ip.replace(":", ".") fn = "put-{:.6f}-{}.bin".format(time.time(), addr) @@ -502,6 +506,15 @@ class HttpCli(object): with open(fsenc(path), "wb", 512 * 1024) as f: post_sz, _, sha_b64 = hashcopy(reader, f) + if lim: + lim.nup(self.ip) + lim.bup(self.ip, post_sz) + try: + lim.chk_sz(post_sz) + except: + bos.unlink(path) + raise + if not self.args.nw: vfs, vrem = vfs.get_dbv(rem) self.conn.hsrv.broker.put( @@ -583,7 +596,7 @@ class HttpCli(object): try: remains = int(self.headers["content-length"]) except: - raise Pebkac(400, "you must supply a content-length for JSON POST") + raise Pebkac(411) if remains > 1024 * 1024: raise Pebkac(413, "json 2big") @@ -880,6 +893,11 @@ class HttpCli(object): vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True) self._assert_safe_rem(rem) + lim = vfs.get_dbv(rem)[0].lim + fdir_base = os.path.join(vfs.realpath, rem) + if lim: + fdir_base, rem = lim.all(self.ip, rem, -1, fdir_base) + files = [] errmsg = "" t0 = time.time() @@ -889,12 +907,9 @@ class HttpCli(object): self.log("discarding incoming file without filename") # fallthrough + fdir = fdir_base + fname = sanitize_fn(p_file, "", [".prologue.html", ".epilogue.html"]) if p_file and not nullwrite: - fdir = os.path.join(vfs.realpath, rem) - fname = sanitize_fn( - p_file, "", [".prologue.html", ".epilogue.html"] - ) - if not bos.path.isdir(fdir): raise Pebkac(404, "that folder does not exist") @@ -905,27 +920,43 @@ class HttpCli(object): fname = os.devnull fdir = "" + if lim: + lim.chk_bup(self.ip) + lim.chk_nup(self.ip) + if not nullwrite: + bos.makedirs(fdir) + try: with ren_open(fname, "wb", 512 * 1024, **open_args) as f: f, fname = f["orz"] - self.log("writing to {}/{}".format(fdir, fname)) + abspath = os.path.join(fdir, fname) + self.log("writing to {}".format(abspath)) sz, sha512_hex, _ = hashcopy(p_data, f) if sz == 0: raise Pebkac(400, "empty files in post") - files.append([sz, sha512_hex, p_file, fname]) - dbv, vrem = vfs.get_dbv(rem) - self.conn.hsrv.broker.put( - False, - "up2k.hash_file", - dbv.realpath, - dbv.flags, - vrem, - fname, - self.ip, - time.time(), - ) - self.conn.nbyte += sz + if lim: + lim.nup(self.ip) + lim.bup(self.ip, sz) + try: + lim.chk_sz(sz) + except: + bos.unlink(abspath) + raise + + files.append([sz, sha512_hex, p_file, fname]) + dbv, vrem = vfs.get_dbv(rem) + self.conn.hsrv.broker.put( + False, + "up2k.hash_file", + dbv.realpath, + dbv.flags, + vrem, + fname, + self.ip, + time.time(), + ) + self.conn.nbyte += sz except Pebkac: if fname != os.devnull: @@ -1023,6 +1054,20 @@ class HttpCli(object): vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True) self._assert_safe_rem(rem) + clen = int(self.headers.get("content-length", -1)) + if clen == -1: + raise Pebkac(411) + + rp, fn = vsplit(rem) + fp = os.path.join(vfs.realpath, rp) + lim = vfs.get_dbv(rem)[0].lim + if lim: + fp, rp = lim.all(self.ip, rp, clen, fp) + bos.makedirs(fp) + + fp = os.path.join(fp, fn) + rem = "{}/{}".format(rp, fn).strip("/") + if not rem.endswith(".md"): raise Pebkac(400, "only markdown pls") @@ -1034,7 +1079,6 @@ class HttpCli(object): self.reply(response.encode("utf-8")) return True - fp = os.path.join(vfs.realpath, rem) srv_lastmod = srv_lastmod3 = -1 try: st = bos.stat(fp) @@ -1088,6 +1132,15 @@ class HttpCli(object): with open(fsenc(fp), "wb", 512 * 1024) as f: sz, sha512, _ = hashcopy(p_data, f) + if lim: + lim.nup(self.ip) + lim.bup(self.ip, sz) + try: + lim.chk_sz(sz) + except: + bos.unlink(fp) + raise + new_lastmod = bos.stat(fp).st_mtime new_lastmod3 = int(new_lastmod * 1000) sha512 = sha512[:56] diff --git a/copyparty/sutil.py b/copyparty/sutil.py index 210f6dac..8d27757d 100644 --- a/copyparty/sutil.py +++ b/copyparty/sutil.py @@ -18,8 +18,7 @@ def errdesc(errors): tf_path = tf.name tf.write("\r\n".join(report).encode("utf-8", "replace")) - dt = datetime.utcfromtimestamp(time.time()) - dt = dt.strftime("%Y-%m%d-%H%M%S") + dt = datetime.utcnow().strftime("%Y-%m%d-%H%M%S") bos.chmod(tf_path, 0o444) return { diff --git a/copyparty/svchub.py b/copyparty/svchub.py index a0f0dc23..3f05e5b0 100644 --- a/copyparty/svchub.py +++ b/copyparty/svchub.py @@ -111,7 +111,7 @@ class SvcHub(object): thr.start() def _logname(self): - dt = datetime.utcfromtimestamp(time.time()) + dt = datetime.utcnow() fn = self.args.lo for fs in "YmdHMS": fs = "%" + fs @@ -244,8 +244,7 @@ class SvcHub(object): return with self.log_mutex: - ts = datetime.utcfromtimestamp(time.time()) - ts = ts.strftime("%Y-%m%d-%H%M%S.%f")[:-3] + ts = datetime.utcnow().strftime("%Y-%m%d-%H%M%S.%f")[:-3] self.logf.write("@{} [{}] {}\n".format(ts, src, msg)) now = time.time() @@ -257,7 +256,7 @@ class SvcHub(object): self.logf.close() self._setup_logfile("") - dt = datetime.utcfromtimestamp(time.time()) + dt = datetime.utcnow() # unix timestamp of next 00:00:00 (leap-seconds safe) day_now = dt.day diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 23b27a9c..bdae1868 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -1148,6 +1148,16 @@ class Up2k(object): cur.connection.commit() if not job: + vfs = self.asrv.vfs.all_vols[cj["vtop"]] + if vfs.lim: + ap1 = os.path.join(cj["ptop"], cj["prel"]) + ap2, cj["prel"] = vfs.lim.all( + cj["addr"], cj["prel"], cj["size"], ap1 + ) + bos.makedirs(ap2) + vfs.lim.nup(cj["addr"]) + vfs.lim.bup(cj["addr"], cj["size"]) + job = { "wark": wark, "t0": now, @@ -1178,8 +1188,11 @@ class Up2k(object): self._new_upload(job) + purl = "/{}/".format("{}/{}".format(job["vtop"], job["prel"]).strip("/")) + return { "name": job["name"], + "purl": purl, "size": job["size"], "lmod": job["lmod"], "hash": job["need"], diff --git a/copyparty/util.py b/copyparty/util.py index bb351485..3450d000 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -77,6 +77,7 @@ HTTPCODE = { 403: "Forbidden", 404: "Not Found", 405: "Method Not Allowed", + 411: "Length Required", 413: "Payload Too Large", 416: "Requested Range Not Satisfiable", 422: "Unprocessable Entity", @@ -684,6 +685,17 @@ def humansize(sz, terse=False): return ret.replace("iB", "").replace(" ", "") +def unhumanize(sz): + try: + return float(sz) + except: + pass + + mul = sz[-1:].lower() + mul = {"k": 1024, "m": 1024 * 1024, "g": 1024 * 1024 * 1024}.get(mul, 1) + return float(sz[:-1]) * mul + + def get_spd(nbyte, t0, t=None): if t is None: t = time.time() @@ -1065,7 +1077,7 @@ def statdir(logger, scandir, lstat, top): def rmdirs(logger, scandir, lstat, top): if not os.path.exists(fsenc(top)) or not os.path.isdir(fsenc(top)): top = os.path.dirname(top) - + dirs = statdir(logger, scandir, lstat, top) dirs = [x[0] for x in dirs if stat.S_ISDIR(x[1].st_mode)] dirs = [os.path.join(top, x) for x in dirs] diff --git a/copyparty/web/up2k.js b/copyparty/web/up2k.js index 3f84ee9f..4b4daa7b 100644 --- a/copyparty/web/up2k.js +++ b/copyparty/web/up2k.js @@ -1325,9 +1325,10 @@ function up2k_init(subtle) { return; } - if (response.name !== t.name) { - // file exists; server renamed us - console.log("server-rename [" + t.name + "] to [" + response.name + "]"); + if (response.purl !== t.purl || response.name !== t.name) { + // server renamed us (file exists / path restrictions) + console.log("server-rename [" + t.purl + "] [" + t.name + "] to [" + response.purl + "] [" + response.name + "]"); + t.purl = response.purl; t.name = response.name; pvis.seth(t.n, 0, linksplit(t.purl + t.name).join(' ')); } @@ -1456,7 +1457,7 @@ function up2k_init(subtle) { if (fsearch) req.srch = 1; - xhr.open('POST', t.purl + 'handshake.php', true); + xhr.open('POST', t.purl, true); xhr.responseType = 'text'; xhr.send(JSON.stringify(req)); } @@ -1524,7 +1525,7 @@ function up2k_init(subtle) { console.log('chunkpit onerror, retrying', t); do_send(); }; - xhr.open('POST', t.purl + 'chunkpit.php', true); + xhr.open('POST', t.purl, true); xhr.setRequestHeader("X-Up2k-Hash", t.hash[npart]); xhr.setRequestHeader("X-Up2k-Wark", t.wark); xhr.setRequestHeader('Content-Type', 'application/octet-stream');