adding upload rules

This commit is contained in:
ed 2021-08-07 03:45:50 +02:00
parent ae3a01038b
commit 9a45549b66
8 changed files with 329 additions and 37 deletions

View file

@ -467,12 +467,30 @@ note:
* `e2tsr` is probably always overkill, since `e2ds`/`e2dsa` would pick up any file modifications and `e2ts` would then reindex those, unless there is a new copyparty version with new parsers and the release note says otherwise * `e2tsr` is probably always overkill, since `e2ds`/`e2dsa` would pick up any file modifications and `e2ts` would then reindex those, unless there is a new copyparty version with new parsers and the release note says otherwise
* the rescan button in the admin panel has no effect unless the volume has `-e2ds` or higher * the rescan button in the admin panel has no effect unless the volume has `-e2ds` or higher
you can choose to only index filename/path/size/last-modified (and not the hash of the file contents) by setting `--no-hash` or the volume-flag `cdhash`, this has the following consequences: you can choose to only index filename/path/size/last-modified (and not the hash of the file contents) by setting `--no-hash` or the volume-flag `:c,dhash`, this has the following consequences:
* initial indexing is way faster, especially when the volume is on a network disk * initial indexing is way faster, especially when the volume is on a network disk
* makes it impossible to [file-search](#file-search) * makes it impossible to [file-search](#file-search)
* if someone uploads the same file contents, the upload will not be detected as a dupe, so it will not get symlinked or rejected * if someone uploads the same file contents, the upload will not be detected as a dupe, so it will not get symlinked or rejected
if you set `--no-hash`, you can enable hashing for specific volumes using flag `cehash` if you set `--no-hash`, you can enable hashing for specific volumes using flag `:c,ehash`
## upload rules (Coming Soon™)
you can set upload rules using volume flags, some examples:
* `:c,sz=1k-3m` sets allowed filesize between 1 KiB and 3 MiB inclusive (suffixes: b, k, m, g)
* `:c,nosub` disallow uploading into subdirectories; goes well with `rotn` and `rotf`:
* `:c,rotn=1000,2` moves uploads into subfolders, up to 1000 files in each folder before making a new one, two levels deep (must be at least 1)
* `:c,rotf=%Y/%m/%d/%H` enforces files to be uploaded into a structure of subfolders according to that date format
* if someone uploads to `/foo/bar` the path would be rewritten to `/foo/bar/2021/08/06/23` for example
* but the actual date is not verified, just the structure, so the uploader can choose any values which conform to the format string
* just to avoid additional complexity in up2k which is enough of a mess already
you can also set transaction limits which apply per-IP and per-volume, but these assume `-j 1` (default) otherwise the limits will be off, for example `-j 4` would allow anywhere between 1x and 4x the limits you set depending on which processing node the client gets routed to
* `:c,maxn=250,3600` allows 250 files over 1 hour from each IP (tracked per-volume)
* `:c,maxb=1g,300` allows 1 GiB total over 5 minutes from each IP (tracked per-volume)
## database location ## database location

View file

@ -5,12 +5,23 @@ import re
import os import os
import sys import sys
import stat import stat
import time
import base64 import base64
import hashlib import hashlib
import threading import threading
from datetime import datetime
from .__init__ import WINDOWS from .__init__ import WINDOWS
from .util import IMPLICATIONS, uncyg, undot, absreal, Pebkac, fsdec, fsenc, statdir from .util import (
IMPLICATIONS,
uncyg,
undot,
unhumanize,
absreal,
Pebkac,
fsenc,
statdir,
)
from .bos import bos from .bos import bos
@ -30,6 +41,154 @@ class AXS(object):
) )
class Lim(object):
def __init__(self):
self.nups = {} # num tracker
self.bups = {} # byte tracker list
self.bupc = {} # byte tracker cache
self.nosub = False # disallow subdirectories
self.smin = None # filesize min
self.smax = None # filesize max
self.bwin = None # bytes window
self.bmax = None # bytes max
self.nwin = None # num window
self.nmax = None # num max
self.rotn = None # rot num files
self.rotl = None # rot depth
self.rotf = None # rot datefmt
self.rot_re = None # rotf check
def set_rotf(self, fmt):
self.rotf = fmt
r = re.escape(fmt).replace("%Y", "[0-9]{4}").replace("%j", "[0-9]{3}")
r = re.sub("%[mdHMSWU]", "[0-9]{2}", r)
self.rot_re = re.compile("(^|/)" + r + "$")
def all(self, ip, rem, sz, abspath):
self.chk_nup(ip)
self.chk_bup(ip)
self.chk_rem(rem)
if sz != -1:
self.chk_sz(sz)
ap2, vp2 = self.rot(abspath)
if abspath == ap2:
return ap2, rem
return ap2, ("{}/{}".format(rem, vp2) if rem else vp2)
def chk_sz(self, sz):
if self.smin is not None and sz < self.smin:
raise Pebkac(400, "file too small")
if self.smax is not None and sz > self.smax:
raise Pebkac(400, "file too big")
def chk_rem(self, rem):
if self.nosub and rem:
raise Pebkac(500, "no subdirectories allowed")
def rot(self, path):
if not self.rotf and not self.rotn:
return path, ""
if self.rotf:
path = path.rstrip("/\\")
if self.rot_re.search(path.replace("\\", "/")):
return path, ""
suf = datetime.utcnow().strftime(self.rotf)
if path:
path += "/"
return path + suf, suf
ret = self.dive(path, self.rotl)
if not ret:
raise Pebkac(500, "no available slots in volume")
d = ret[len(path) :].strip("/\\").replace("\\", "/")
return ret, d
def dive(self, path, lvs):
items = bos.listdir(path)
if not lvs:
# at leaf level
return None if len(items) >= self.rotn else ""
dirs = [int(x) for x in items if x and all(y in "1234567890" for y in x)]
dirs.sort()
if not dirs:
# no branches yet; make one
sub = os.path.join(path, "0")
bos.mkdir(sub)
else:
# try newest branch only
sub = os.path.join(path, str(dirs[-1]))
ret = self.dive(sub, lvs - 1)
if ret is not None:
return os.path.join(sub, ret)
if len(dirs) >= self.rotn:
# full branch or root
return None
# make a branch
sub = os.path.join(path, str(dirs[-1] + 1))
bos.mkdir(sub)
ret = self.dive(sub, lvs - 1)
if ret is None:
raise Pebkac(500, "rotation bug")
return os.path.join(sub, ret)
def nup(self, ip):
try:
self.nups[ip].append(time.time())
except:
self.nups[ip] = [time.time()]
def bup(self, ip, nbytes):
v = [time.time(), nbytes]
try:
self.bups[ip].append(v)
self.bupc[ip] += nbytes
except:
self.bups[ip] = [v]
self.bupc[ip] = nbytes
def chk_nup(self, ip):
if not self.nmax or ip not in self.nups:
return
nups = self.nups[ip]
cutoff = time.time() - self.nwin
while nups and nups[0] < cutoff:
nups.pop(0)
if len(nups) >= self.nmax:
raise Pebkac(429, "too many uploads")
def chk_bup(self, ip):
if not self.bmax or ip not in self.bups:
return
bups = self.bups[ip]
cutoff = time.time() - self.bwin
while bups and bups[0][0] < cutoff:
self.bupc[ip] -= bups.pop(0)[1]
if len(bups) >= self.bmax:
raise Pebkac(429, "ingress saturated")
class VFS(object): class VFS(object):
"""single level in the virtual fs""" """single level in the virtual fs"""
@ -42,6 +201,7 @@ class VFS(object):
self.nodes = {} # child nodes self.nodes = {} # child nodes
self.histtab = None # all realpath->histpath self.histtab = None # all realpath->histpath
self.dbv = None # closest full/non-jump parent self.dbv = None # closest full/non-jump parent
self.lim = None # type: Lim # upload limits; only set for dbv
if realpath: if realpath:
self.histpath = os.path.join(realpath, ".hist") # db / thumbcache self.histpath = os.path.join(realpath, ".hist") # db / thumbcache
@ -172,6 +332,7 @@ class VFS(object):
return vn, rem return vn, rem
def get_dbv(self, vrem): def get_dbv(self, vrem):
# type: (str) -> tuple[VFS, str]
dbv = self.dbv dbv = self.dbv
if not dbv: if not dbv:
return self, vrem return self, vrem
@ -604,6 +765,42 @@ class AuthSrv(object):
vfs.histtab = {v.realpath: v.histpath for v in vfs.all_vols.values()} vfs.histtab = {v.realpath: v.histpath for v in vfs.all_vols.values()}
for vol in vfs.all_vols.values():
lim = Lim()
use = False
if vol.flags.get("nosub"):
use = True
lim.nosub = True
v = vol.flags.get("sz")
if v:
use = True
lim.smin, lim.smax = [unhumanize(x) for x in v.split("-")]
v = vol.flags.get("rotn")
if v:
use = True
lim.rotn, lim.rotl = [int(x) for x in v.split(",")]
v = vol.flags.get("rotf")
if v:
use = True
lim.set_rotf(v)
v = vol.flags.get("maxn")
if v:
use = True
lim.nmax, lim.nwin = [int(x) for x in v.split(",")]
v = vol.flags.get("maxb")
if v:
use = True
lim.bmax, lim.bwin = [unhumanize(x) for x in v.split(",")]
if use:
vol.lim = lim
all_mte = {} all_mte = {}
errors = False errors = False
for vol in vfs.all_vols.values(): for vol in vfs.all_vols.values():

View file

@ -16,7 +16,7 @@ import calendar
from .__init__ import E, PY2, WINDOWS, ANYWIN, unicode from .__init__ import E, PY2, WINDOWS, ANYWIN, unicode
from .util import * # noqa # pylint: disable=unused-wildcard-import from .util import * # noqa # pylint: disable=unused-wildcard-import
from .bos import bos from .bos import bos
from .authsrv import AuthSrv from .authsrv import AuthSrv, Lim
from .szip import StreamZip from .szip import StreamZip
from .star import StreamTar from .star import StreamTar
@ -491,7 +491,11 @@ class HttpCli(object):
def dump_to_file(self): def dump_to_file(self):
reader, remains = self.get_body_reader() reader, remains = self.get_body_reader()
vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True) vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True)
lim = vfs.get_dbv(rem)[0].lim
fdir = os.path.join(vfs.realpath, rem) fdir = os.path.join(vfs.realpath, rem)
if lim:
fdir, rem = lim.all(self.ip, rem, remains, fdir)
bos.makedirs(fdir)
addr = self.ip.replace(":", ".") addr = self.ip.replace(":", ".")
fn = "put-{:.6f}-{}.bin".format(time.time(), addr) fn = "put-{:.6f}-{}.bin".format(time.time(), addr)
@ -502,6 +506,15 @@ class HttpCli(object):
with open(fsenc(path), "wb", 512 * 1024) as f: with open(fsenc(path), "wb", 512 * 1024) as f:
post_sz, _, sha_b64 = hashcopy(reader, f) post_sz, _, sha_b64 = hashcopy(reader, f)
if lim:
lim.nup(self.ip)
lim.bup(self.ip, post_sz)
try:
lim.chk_sz(post_sz)
except:
bos.unlink(path)
raise
if not self.args.nw: if not self.args.nw:
vfs, vrem = vfs.get_dbv(rem) vfs, vrem = vfs.get_dbv(rem)
self.conn.hsrv.broker.put( self.conn.hsrv.broker.put(
@ -583,7 +596,7 @@ class HttpCli(object):
try: try:
remains = int(self.headers["content-length"]) remains = int(self.headers["content-length"])
except: except:
raise Pebkac(400, "you must supply a content-length for JSON POST") raise Pebkac(411)
if remains > 1024 * 1024: if remains > 1024 * 1024:
raise Pebkac(413, "json 2big") raise Pebkac(413, "json 2big")
@ -880,6 +893,11 @@ class HttpCli(object):
vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True) vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True)
self._assert_safe_rem(rem) self._assert_safe_rem(rem)
lim = vfs.get_dbv(rem)[0].lim
fdir_base = os.path.join(vfs.realpath, rem)
if lim:
fdir_base, rem = lim.all(self.ip, rem, -1, fdir_base)
files = [] files = []
errmsg = "" errmsg = ""
t0 = time.time() t0 = time.time()
@ -889,12 +907,9 @@ class HttpCli(object):
self.log("discarding incoming file without filename") self.log("discarding incoming file without filename")
# fallthrough # fallthrough
fdir = fdir_base
fname = sanitize_fn(p_file, "", [".prologue.html", ".epilogue.html"])
if p_file and not nullwrite: if p_file and not nullwrite:
fdir = os.path.join(vfs.realpath, rem)
fname = sanitize_fn(
p_file, "", [".prologue.html", ".epilogue.html"]
)
if not bos.path.isdir(fdir): if not bos.path.isdir(fdir):
raise Pebkac(404, "that folder does not exist") raise Pebkac(404, "that folder does not exist")
@ -905,27 +920,43 @@ class HttpCli(object):
fname = os.devnull fname = os.devnull
fdir = "" fdir = ""
if lim:
lim.chk_bup(self.ip)
lim.chk_nup(self.ip)
if not nullwrite:
bos.makedirs(fdir)
try: try:
with ren_open(fname, "wb", 512 * 1024, **open_args) as f: with ren_open(fname, "wb", 512 * 1024, **open_args) as f:
f, fname = f["orz"] f, fname = f["orz"]
self.log("writing to {}/{}".format(fdir, fname)) abspath = os.path.join(fdir, fname)
self.log("writing to {}".format(abspath))
sz, sha512_hex, _ = hashcopy(p_data, f) sz, sha512_hex, _ = hashcopy(p_data, f)
if sz == 0: if sz == 0:
raise Pebkac(400, "empty files in post") raise Pebkac(400, "empty files in post")
files.append([sz, sha512_hex, p_file, fname]) if lim:
dbv, vrem = vfs.get_dbv(rem) lim.nup(self.ip)
self.conn.hsrv.broker.put( lim.bup(self.ip, sz)
False, try:
"up2k.hash_file", lim.chk_sz(sz)
dbv.realpath, except:
dbv.flags, bos.unlink(abspath)
vrem, raise
fname,
self.ip, files.append([sz, sha512_hex, p_file, fname])
time.time(), dbv, vrem = vfs.get_dbv(rem)
) self.conn.hsrv.broker.put(
self.conn.nbyte += sz False,
"up2k.hash_file",
dbv.realpath,
dbv.flags,
vrem,
fname,
self.ip,
time.time(),
)
self.conn.nbyte += sz
except Pebkac: except Pebkac:
if fname != os.devnull: if fname != os.devnull:
@ -1023,6 +1054,20 @@ class HttpCli(object):
vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True) vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True)
self._assert_safe_rem(rem) self._assert_safe_rem(rem)
clen = int(self.headers.get("content-length", -1))
if clen == -1:
raise Pebkac(411)
rp, fn = vsplit(rem)
fp = os.path.join(vfs.realpath, rp)
lim = vfs.get_dbv(rem)[0].lim
if lim:
fp, rp = lim.all(self.ip, rp, clen, fp)
bos.makedirs(fp)
fp = os.path.join(fp, fn)
rem = "{}/{}".format(rp, fn).strip("/")
if not rem.endswith(".md"): if not rem.endswith(".md"):
raise Pebkac(400, "only markdown pls") raise Pebkac(400, "only markdown pls")
@ -1034,7 +1079,6 @@ class HttpCli(object):
self.reply(response.encode("utf-8")) self.reply(response.encode("utf-8"))
return True return True
fp = os.path.join(vfs.realpath, rem)
srv_lastmod = srv_lastmod3 = -1 srv_lastmod = srv_lastmod3 = -1
try: try:
st = bos.stat(fp) st = bos.stat(fp)
@ -1088,6 +1132,15 @@ class HttpCli(object):
with open(fsenc(fp), "wb", 512 * 1024) as f: with open(fsenc(fp), "wb", 512 * 1024) as f:
sz, sha512, _ = hashcopy(p_data, f) sz, sha512, _ = hashcopy(p_data, f)
if lim:
lim.nup(self.ip)
lim.bup(self.ip, sz)
try:
lim.chk_sz(sz)
except:
bos.unlink(fp)
raise
new_lastmod = bos.stat(fp).st_mtime new_lastmod = bos.stat(fp).st_mtime
new_lastmod3 = int(new_lastmod * 1000) new_lastmod3 = int(new_lastmod * 1000)
sha512 = sha512[:56] sha512 = sha512[:56]

View file

@ -18,8 +18,7 @@ def errdesc(errors):
tf_path = tf.name tf_path = tf.name
tf.write("\r\n".join(report).encode("utf-8", "replace")) tf.write("\r\n".join(report).encode("utf-8", "replace"))
dt = datetime.utcfromtimestamp(time.time()) dt = datetime.utcnow().strftime("%Y-%m%d-%H%M%S")
dt = dt.strftime("%Y-%m%d-%H%M%S")
bos.chmod(tf_path, 0o444) bos.chmod(tf_path, 0o444)
return { return {

View file

@ -111,7 +111,7 @@ class SvcHub(object):
thr.start() thr.start()
def _logname(self): def _logname(self):
dt = datetime.utcfromtimestamp(time.time()) dt = datetime.utcnow()
fn = self.args.lo fn = self.args.lo
for fs in "YmdHMS": for fs in "YmdHMS":
fs = "%" + fs fs = "%" + fs
@ -244,8 +244,7 @@ class SvcHub(object):
return return
with self.log_mutex: with self.log_mutex:
ts = datetime.utcfromtimestamp(time.time()) ts = datetime.utcnow().strftime("%Y-%m%d-%H%M%S.%f")[:-3]
ts = ts.strftime("%Y-%m%d-%H%M%S.%f")[:-3]
self.logf.write("@{} [{}] {}\n".format(ts, src, msg)) self.logf.write("@{} [{}] {}\n".format(ts, src, msg))
now = time.time() now = time.time()
@ -257,7 +256,7 @@ class SvcHub(object):
self.logf.close() self.logf.close()
self._setup_logfile("") self._setup_logfile("")
dt = datetime.utcfromtimestamp(time.time()) dt = datetime.utcnow()
# unix timestamp of next 00:00:00 (leap-seconds safe) # unix timestamp of next 00:00:00 (leap-seconds safe)
day_now = dt.day day_now = dt.day

View file

@ -1148,6 +1148,16 @@ class Up2k(object):
cur.connection.commit() cur.connection.commit()
if not job: if not job:
vfs = self.asrv.vfs.all_vols[cj["vtop"]]
if vfs.lim:
ap1 = os.path.join(cj["ptop"], cj["prel"])
ap2, cj["prel"] = vfs.lim.all(
cj["addr"], cj["prel"], cj["size"], ap1
)
bos.makedirs(ap2)
vfs.lim.nup(cj["addr"])
vfs.lim.bup(cj["addr"], cj["size"])
job = { job = {
"wark": wark, "wark": wark,
"t0": now, "t0": now,
@ -1178,8 +1188,11 @@ class Up2k(object):
self._new_upload(job) self._new_upload(job)
purl = "/{}/".format("{}/{}".format(job["vtop"], job["prel"]).strip("/"))
return { return {
"name": job["name"], "name": job["name"],
"purl": purl,
"size": job["size"], "size": job["size"],
"lmod": job["lmod"], "lmod": job["lmod"],
"hash": job["need"], "hash": job["need"],

View file

@ -77,6 +77,7 @@ HTTPCODE = {
403: "Forbidden", 403: "Forbidden",
404: "Not Found", 404: "Not Found",
405: "Method Not Allowed", 405: "Method Not Allowed",
411: "Length Required",
413: "Payload Too Large", 413: "Payload Too Large",
416: "Requested Range Not Satisfiable", 416: "Requested Range Not Satisfiable",
422: "Unprocessable Entity", 422: "Unprocessable Entity",
@ -684,6 +685,17 @@ def humansize(sz, terse=False):
return ret.replace("iB", "").replace(" ", "") return ret.replace("iB", "").replace(" ", "")
def unhumanize(sz):
try:
return float(sz)
except:
pass
mul = sz[-1:].lower()
mul = {"k": 1024, "m": 1024 * 1024, "g": 1024 * 1024 * 1024}.get(mul, 1)
return float(sz[:-1]) * mul
def get_spd(nbyte, t0, t=None): def get_spd(nbyte, t0, t=None):
if t is None: if t is None:
t = time.time() t = time.time()
@ -1065,7 +1077,7 @@ def statdir(logger, scandir, lstat, top):
def rmdirs(logger, scandir, lstat, top): def rmdirs(logger, scandir, lstat, top):
if not os.path.exists(fsenc(top)) or not os.path.isdir(fsenc(top)): if not os.path.exists(fsenc(top)) or not os.path.isdir(fsenc(top)):
top = os.path.dirname(top) top = os.path.dirname(top)
dirs = statdir(logger, scandir, lstat, top) dirs = statdir(logger, scandir, lstat, top)
dirs = [x[0] for x in dirs if stat.S_ISDIR(x[1].st_mode)] dirs = [x[0] for x in dirs if stat.S_ISDIR(x[1].st_mode)]
dirs = [os.path.join(top, x) for x in dirs] dirs = [os.path.join(top, x) for x in dirs]

View file

@ -1325,9 +1325,10 @@ function up2k_init(subtle) {
return; return;
} }
if (response.name !== t.name) { if (response.purl !== t.purl || response.name !== t.name) {
// file exists; server renamed us // server renamed us (file exists / path restrictions)
console.log("server-rename [" + t.name + "] to [" + response.name + "]"); console.log("server-rename [" + t.purl + "] [" + t.name + "] to [" + response.purl + "] [" + response.name + "]");
t.purl = response.purl;
t.name = response.name; t.name = response.name;
pvis.seth(t.n, 0, linksplit(t.purl + t.name).join(' ')); pvis.seth(t.n, 0, linksplit(t.purl + t.name).join(' '));
} }
@ -1456,7 +1457,7 @@ function up2k_init(subtle) {
if (fsearch) if (fsearch)
req.srch = 1; req.srch = 1;
xhr.open('POST', t.purl + 'handshake.php', true); xhr.open('POST', t.purl, true);
xhr.responseType = 'text'; xhr.responseType = 'text';
xhr.send(JSON.stringify(req)); xhr.send(JSON.stringify(req));
} }
@ -1524,7 +1525,7 @@ function up2k_init(subtle) {
console.log('chunkpit onerror, retrying', t); console.log('chunkpit onerror, retrying', t);
do_send(); do_send();
}; };
xhr.open('POST', t.purl + 'chunkpit.php', true); xhr.open('POST', t.purl, true);
xhr.setRequestHeader("X-Up2k-Hash", t.hash[npart]); xhr.setRequestHeader("X-Up2k-Hash", t.hash[npart]);
xhr.setRequestHeader("X-Up2k-Wark", t.wark); xhr.setRequestHeader("X-Up2k-Wark", t.wark);
xhr.setRequestHeader('Content-Type', 'application/octet-stream'); xhr.setRequestHeader('Content-Type', 'application/octet-stream');