diff --git a/.vscode/launch.json b/.vscode/launch.json index d2ab2c8b..9630da18 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -12,6 +12,8 @@ //"-nw", "-ed", "-emp", + "-e2d", + "-e2s", "-a", "ed:wark", "-v", diff --git a/README.md b/README.md index 1dd74525..5199f7f7 100644 --- a/README.md +++ b/README.md @@ -144,8 +144,7 @@ roughly sorted by priority * `os.copy_file_range` for up2k cloning * support pillow-simd * cache sha512 chunks on client -* ~~symlink existing files on upload~~ - * ok at runtime, up2k db still not persisted +* persist unfinished up2k uploads too * comment field * ~~look into android thumbnail cache file format~~ bad idea * figure out the deal with pixel3a not being connectable as hotspot diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 66652e4e..b7431139 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -105,17 +105,22 @@ def main(): epilog=dedent( """ -a takes username:password, - -v takes src:dst:permset:permset:... where "permset" is - accesslevel followed by username (no separator) + -v takes src:dst:permset:permset:cflag:cflag:... + where "permset" is accesslevel followed by username (no separator) + and "cflag" is config flags to set on this volume + list of cflags: + cnodupe rejects existing files (instead of symlinking them) + example:\033[35m - -a ed:hunter2 -v .::r:aed -v ../inc:dump:w:aed \033[36m + -a ed:hunter2 -v .::r:aed -v ../inc:dump:w:aed:cnodupe \033[36m mount current directory at "/" with * r (read-only) for everyone * a (read+write) for ed mount ../inc at "/dump" with * w (write-only) for everyone - * a (read+write) for ed \033[0m + * a (read+write) for ed + * reject duplicate files \033[0m if no accounts or volumes are configured, current folder will be read/write for everyone @@ -125,6 +130,7 @@ def main(): """ ), ) + # fmt: off ap.add_argument("-c", metavar="PATH", type=str, action="append", help="add config file") ap.add_argument("-i", metavar="IP", type=str, default="0.0.0.0", help="ip to bind") ap.add_argument("-p", metavar="PORT", type=int, default=3923, help="port to bind") @@ -135,12 +141,15 @@ def main(): ap.add_argument("-q", action="store_true", help="quiet") ap.add_argument("-ed", action="store_true", help="enable ?dots") ap.add_argument("-emp", action="store_true", help="enable markdown plugins") + ap.add_argument("-e2d", action="store_true", help="enable up2k database") + ap.add_argument("-e2s", action="store_true", help="enable up2k db-scanner") ap.add_argument("-mcr", metavar="SEC", type=int, default=60, help="md-editor mod-chk rate") ap.add_argument("-nw", action="store_true", help="disable writes (benchmark)") ap.add_argument("-nih", action="store_true", help="no info hostname") ap.add_argument("-nid", action="store_true", help="no info disk-usage") ap.add_argument("--no-sendfile", action="store_true", help="disable sendfile") al = ap.parse_args() + # fmt: on SvcHub(al).run() diff --git a/copyparty/authsrv.py b/copyparty/authsrv.py index 6fc93bcc..e21f2018 100644 --- a/copyparty/authsrv.py +++ b/copyparty/authsrv.py @@ -258,6 +258,7 @@ class AuthSrv(object): with open(cfg_fn, "rb") as f: self._parse_config_file(f, user, mread, mwrite, mflags, mount) + self.all_writable = [] if not mount: # -h says our defaults are CWD at root and read/write for everyone vfs = VFS(os.path.abspath("."), "", ["*"], ["*"]) @@ -280,6 +281,11 @@ class AuthSrv(object): v.uread = mread[dst] v.uwrite = mwrite[dst] v.flags = mflags[dst] + if v.uwrite: + self.all_writable.append(v) + + if vfs.uwrite and vfs not in self.all_writable: + self.all_writable.append(vfs) missing_users = {} for d in [mread, mwrite]: diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index d208a04f..2fda3376 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -28,6 +28,7 @@ class HttpCli(object): self.conn = conn self.s = conn.s self.sr = conn.sr + self.ip = conn.addr[0] self.addr = conn.addr self.args = conn.args self.auth = conn.auth @@ -42,7 +43,7 @@ class HttpCli(object): self.log_func(self.log_src, msg) def _check_nonfatal(self, ex): - return ex.code in [404] + return ex.code < 400 or ex.code == 404 def _assert_safe_rem(self, rem): # sanity check to prevent any disasters @@ -85,7 +86,8 @@ class HttpCli(object): v = self.headers.get("x-forwarded-for", None) if v is not None and self.conn.addr[0] in ["127.0.0.1", "::1"]: - self.log_src = self.conn.set_rproxy(v.split(",")[0]) + self.ip = v.split(",")[0] + self.log_src = self.conn.set_rproxy(self.ip) self.uname = "*" if "cookie" in self.headers: @@ -305,7 +307,7 @@ class HttpCli(object): vfs, rem = self.conn.auth.vfs.get(self.vpath, self.uname, False, True) fdir = os.path.join(vfs.realpath, rem) - addr = self.conn.addr[0].replace(":", ".") + addr = self.ip.replace(":", ".") fn = "put-{:.6f}-{}.bin".format(time.time(), addr) path = os.path.join(fdir, fn) @@ -384,9 +386,10 @@ class HttpCli(object): vfs, rem = self.conn.auth.vfs.get(self.vpath, self.uname, False, True) - body["vdir"] = self.vpath - body["rdir"] = os.path.join(vfs.realpath, rem) - body["addr"] = self.addr[0] + body["vtop"] = vfs.vpath + body["ptop"] = vfs.realpath + body["prel"] = rem + body["addr"] = self.ip body["flag"] = vfs.flags x = self.conn.hsrv.broker.put(True, "up2k.handle_json", body) @@ -409,7 +412,10 @@ class HttpCli(object): except KeyError: raise Pebkac(400, "need hash and wark headers for binary POST") - x = self.conn.hsrv.broker.put(True, "up2k.handle_chunk", wark, chash) + vfs, _ = self.conn.auth.vfs.get(self.vpath, self.uname, False, True) + ptop = vfs.realpath + + x = self.conn.hsrv.broker.put(True, "up2k.handle_chunk", ptop, wark, chash) response = x.get() chunksize, cstart, path, lastmod = response @@ -454,7 +460,7 @@ class HttpCli(object): self.log("clone {} done".format(cstart[0])) - x = self.conn.hsrv.broker.put(True, "up2k.confirm_chunk", wark, chash) + x = self.conn.hsrv.broker.put(True, "up2k.confirm_chunk", ptop, wark, chash) num_left = x.get() if not WINDOWS and num_left == 0: @@ -576,7 +582,7 @@ class HttpCli(object): if not os.path.isdir(fsenc(fdir)): raise Pebkac(404, "that folder does not exist") - suffix = ".{:.6f}-{}".format(time.time(), self.addr[0]) + suffix = ".{:.6f}-{}".format(time.time(), self.ip) open_args = {"fdir": fdir, "suffix": suffix} else: open_args = {} @@ -638,7 +644,7 @@ class HttpCli(object): "\n".join( unicode(x) for x in [ - ":".join(unicode(x) for x in self.addr), + ":".join(unicode(x) for x in [self.ip, self.addr[1]]), msg.rstrip(), ] ) @@ -895,7 +901,7 @@ class HttpCli(object): open_func = open # 512 kB is optimal for huge files, use 64k open_args = [fsenc(fs_path), "rb", 64 * 1024] - if hasattr(os, 'sendfile'): + if hasattr(os, "sendfile"): use_sendfile = not self.args.no_sendfile # @@ -1021,6 +1027,9 @@ class HttpCli(object): if abspath.endswith(".md") and "raw" not in self.uparam: return self.tx_md(abspath) + if abspath.endswith("{0}.hist{0}up2k.db".format(os.sep)): + raise Pebkac(403) + return self.tx_file(abspath) fsroot, vfs_ls, vfs_virt = vn.ls(rem, self.uname) diff --git a/copyparty/httpconn.py b/copyparty/httpconn.py index d9509bbe..1eb3915a 100644 --- a/copyparty/httpconn.py +++ b/copyparty/httpconn.py @@ -65,6 +65,7 @@ class HttpConn(object): color = 34 self.rproxy = ip + self.ip = ip self.log_src = "{} \033[{}m{}".format(ip, color, self.addr[1]).ljust(26) return self.log_src diff --git a/copyparty/svchub.py b/copyparty/svchub.py index bb9bbc2f..47cd42de 100644 --- a/copyparty/svchub.py +++ b/copyparty/svchub.py @@ -9,6 +9,7 @@ from datetime import datetime, timedelta import calendar from .__init__ import PY2, WINDOWS, MACOS, VT100 +from .authsrv import AuthSrv from .tcpsrv import TcpSrv from .up2k import Up2k from .util import mp @@ -38,6 +39,10 @@ class SvcHub(object): self.tcpsrv = TcpSrv(self) self.up2k = Up2k(self) + if self.args.e2d and self.args.e2s: + auth = AuthSrv(self.args, self.log) + self.up2k.build_indexes(auth.all_writable) + # decide which worker impl to use if self.check_mp_enable(): from .broker_mp import BrokerMp as Broker diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 1a56d6df..747ac5c4 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -6,6 +6,7 @@ import os import re import time import math +import stat import shutil import base64 import hashlib @@ -13,7 +14,15 @@ import threading from copy import deepcopy from .__init__ import WINDOWS -from .util import Pebkac, Queue, fsenc, sanitize_fn, ren_open +from .util import Pebkac, Queue, fsdec, fsenc, sanitize_fn, ren_open + +HAVE_SQLITE3 = False +try: + import sqlite3 + + HAVE_SQLITE3 = True +except: + pass class Up2k(object): @@ -22,20 +31,21 @@ class Up2k(object): * documentation * registry persistence * ~/.config flatfiles for active jobs - * wark->path database for finished uploads """ def __init__(self, broker): self.broker = broker self.args = broker.args self.log = broker.log + self.persist = self.args.e2d # config self.salt = "hunter2" # TODO: config # state - self.registry = {} self.mutex = threading.Lock() + self.registry = {} + self.db = {} if WINDOWS: # usually fails to set lastmod too quickly @@ -47,57 +57,234 @@ class Up2k(object): # static self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$") + if self.persist and not HAVE_SQLITE3: + m = "could not initialize sqlite3, will use in-memory registry only" + self.log("up2k", m) + + def register_vpath(self, ptop): + with self.mutex: + if ptop in self.registry: + return None + + self.registry[ptop] = {} + if not self.persist or not HAVE_SQLITE3: + return None + + try: + os.mkdir(os.path.join(ptop, ".hist")) + except: + pass + + db_path = os.path.join(ptop, ".hist", "up2k.db") + if ptop in self.db: + # self.db[ptop].close() + return None + + try: + db = self._open_db(db_path) + self.db[ptop] = db + return db + except Exception as ex: + m = "failed to open [{}]: {}".format(ptop, repr(ex)) + self.log("up2k", m) + + return None + + def build_indexes(self, writeables): + tops = [d.realpath for d in writeables] + for top in tops: + db = self.register_vpath(top) + if db: + # can be symlink so don't `and d.startswith(top)`` + excl = set([d for d in tops if d != top]) + self._build_dir([db, 0], top, excl, top) + self._drop_lost(db, top) + db.commit() + + def _build_dir(self, dbw, top, excl, cdir): + histdir = os.path.join(top, ".hist") + for inode in [fsdec(x) for x in os.listdir(fsenc(cdir))]: + abspath = os.path.join(cdir, inode) + inf = os.stat(fsenc(abspath)) + if stat.S_ISDIR(inf.st_mode): + if abspath in excl or abspath == histdir: + continue + self.log("up2k", "dir: {}".format(abspath)) + self._build_dir(dbw, top, excl, abspath) + else: + # self.log("up2k", "file: {}".format(abspath)) + rp = abspath[len(top) :].replace("\\", "/").strip("/") + c = dbw[0].execute("select * from up where rp = ?", (rp,)) + in_db = list(c.fetchall()) + if in_db: + _, dts, dsz, _ = in_db[0] + if len(in_db) > 1: + m = "WARN: multiple entries: [{}] => [{}] ({})" + self.log("up2k", m.format(top, rp, len(in_db))) + dts = -1 + + if dts == inf.st_mtime and dsz == inf.st_size: + continue + + m = "reindex [{}] => [{}] ({}/{}) ({}/{})".format( + top, rp, dts, inf.st_mtime, dsz, inf.st_size + ) + self.log("up2k", m) + self.db_rm(dbw[0], rp) + dbw[1] += 1 + in_db = None + + self.log("up2k", "file: {}".format(abspath)) + hashes = self._hashlist_from_file(abspath) + wark = self._wark_from_hashlist(inf.st_size, hashes) + self.db_add(dbw[0], wark, rp, inf.st_mtime, inf.st_size) + dbw[1] += 1 + if dbw[1] > 1024: + dbw[0].commit() + dbw[1] = 0 + + def _drop_lost(self, db, top): + rm = [] + c = db.execute("select * from up") + for dwark, dts, dsz, drp in c: + abspath = os.path.join(top, drp) + if not os.path.exists(abspath): + rm.append(drp) + + if not rm: + return + + self.log("up2k", "forgetting {} deleted files".format(len(rm))) + for rp in rm: + self.db_rm(db, rp) + + def _open_db(self, db_path): + conn = sqlite3.connect(db_path, check_same_thread=False) + try: + c = conn.execute(r"select * from kv where k = 'sver'") + rows = c.fetchall() + if rows: + ver = rows[0][1] + else: + self.log("up2k", "WARN: no sver in kv, DB corrupt?") + ver = "unknown" + + if ver == "1": + try: + nfiles = next(conn.execute("select count(w) from up"))[0] + self.log("up2k", "found DB at {} |{}|".format(db_path, nfiles)) + return conn + except Exception as ex: + m = "WARN: could not list files, DB corrupt?\n " + repr(ex) + self.log("up2k", m) + + m = "REPLACING unsupported DB (v.{}) at {}".format(ver, db_path) + self.log("up2k", m) + conn.close() + os.unlink(db_path) + conn = sqlite3.connect(db_path, check_same_thread=False) + except: + pass + + # sqlite is variable-width only, no point in using char/nchar/varchar + for cmd in [ + r"create table kv (k text, v text)", + r"create table up (w text, mt int, sz int, rp text)", + r"insert into kv values ('sver', '1')", + r"create index up_w on up(w)", + ]: + conn.execute(cmd) + + conn.commit() + self.log("up2k", "created DB at {}".format(db_path)) + return conn + def handle_json(self, cj): + self.register_vpath(cj["ptop"]) cj["name"] = sanitize_fn(cj["name"]) wark = self._get_wark(cj) now = time.time() + job = None with self.mutex: - # TODO use registry persistence here to symlink any matching wark - if wark in self.registry: - job = self.registry[wark] - if job["rdir"] != cj["rdir"] or job["name"] != cj["name"]: - src = os.path.join(job["rdir"], job["name"]) - dst = os.path.join(cj["rdir"], cj["name"]) + db = self.db.get(cj["ptop"], None) + reg = self.registry[cj["ptop"]] + if wark not in reg and db: + cur = db.execute(r"select * from up where w = ?", (wark,)) + for _, dtime, dsize, dp_rel in cur: + dp_abs = os.path.join(cj["ptop"], dp_rel).replace("\\", "/") + # relying on path.exists to return false on broken symlinks + if os.path.exists(dp_abs): + try: + prel, name = dp_rel.rsplit("/", 1) + except: + prel = "" + name = dp_rel + + job = { + "name": name, + "prel": prel, + "vtop": cj["vtop"], + "ptop": cj["ptop"], + "flag": cj["flag"], + "size": dsize, + "lmod": dtime, + "hash": [], + "need": [], + } + break + + if job or wark in reg: + job = job or reg[wark] + if job["prel"] != cj["prel"] or job["name"] != cj["name"]: + src = os.path.join(job["ptop"], job["prel"], job["name"]) + dst = os.path.join(cj["ptop"], cj["prel"], cj["name"]) + vsrc = os.path.join(job["vtop"], job["prel"], job["name"]) + vsrc = vsrc.replace("\\", "/") # just for prints anyways if job["need"]: self.log("up2k", "unfinished:\n {0}\n {1}".format(src, dst)) - err = "partial upload exists at a different location; please resume uploading here instead:\n{0}{1} ".format( - job["vdir"], job["name"] - ) + err = "partial upload exists at a different location; please resume uploading here instead:\n" + err += vsrc + " " raise Pebkac(400, err) elif "nodupe" in job["flag"]: self.log("up2k", "dupe-reject:\n {0}\n {1}".format(src, dst)) - err = "upload rejected, file already exists:\n{0}{1} ".format( - job["vdir"], job["name"] - ) + err = "upload rejected, file already exists:\n " + vsrc + " " raise Pebkac(400, err) else: # symlink to the client-provided name, # returning the previous upload info job = deepcopy(job) - job["rdir"] = cj["rdir"] - job["name"] = self._untaken(cj["rdir"], cj["name"], now, cj["addr"]) - dst = os.path.join(job["rdir"], job["name"]) + for k in ["ptop", "vtop", "prel"]: + job[k] = cj[k] + + pdir = os.path.join(cj["ptop"], cj["prel"]) + job["name"] = self._untaken(pdir, cj["name"], now, cj["addr"]) + dst = os.path.join(job["ptop"], job["prel"], job["name"]) os.unlink(fsenc(dst)) # TODO ed pls self._symlink(src, dst) - else: + + if not job: job = { "wark": wark, "t0": now, - "addr": cj["addr"], - "vdir": cj["vdir"], - "rdir": cj["rdir"], - "flag": cj["flag"], - # client-provided, sanitized by _get_wark: - "name": cj["name"], - "size": cj["size"], - "lmod": cj["lmod"], "hash": deepcopy(cj["hash"]), + "need": [], } + # client-provided, sanitized by _get_wark: name, size, lmod + for k in [ + "addr", + "vtop", + "ptop", + "prel", + "flag", + "name", + "size", + "lmod", + ]: + job[k] = cj[k] # one chunk may occur multiple times in a file; # filter to unique values for the list of missing chunks # (preserve order to reduce disk thrashing) - job["need"] = [] lut = {} for k in cj["hash"]: if k not in lut: @@ -149,36 +336,47 @@ class Up2k(object): self.log("up2k", "cannot symlink; creating copy: " + repr(ex)) shutil.copy2(fsenc(src), fsenc(dst)) - def handle_chunk(self, wark, chash): + def handle_chunk(self, ptop, wark, chash): with self.mutex: - job = self.registry.get(wark) + job = self.registry[ptop].get(wark, None) if not job: - raise Pebkac(404, "unknown wark") + raise Pebkac(400, "unknown wark") if chash not in job["need"]: raise Pebkac(200, "already got that but thanks??") nchunk = [n for n, v in enumerate(job["hash"]) if v == chash] if not nchunk: - raise Pebkac(404, "unknown chunk") + raise Pebkac(400, "unknown chunk") chunksize = self._get_chunksize(job["size"]) ofs = [chunksize * x for x in nchunk] - path = os.path.join(job["rdir"], job["name"]) + path = os.path.join(job["ptop"], job["prel"], job["name"]) return [chunksize, ofs, path, job["lmod"]] - def confirm_chunk(self, wark, chash): + def confirm_chunk(self, ptop, wark, chash): with self.mutex: - job = self.registry[wark] + job = self.registry[ptop][wark] job["need"].remove(chash) ret = len(job["need"]) + if ret > 0: + return ret - if WINDOWS and ret == 0: - path = os.path.join(job["rdir"], job["name"]) + if WINDOWS: + path = os.path.join(job["ptop"], job["prel"], job["name"]) self.lastmod_q.put([path, (int(time.time()), int(job["lmod"]))]) + db = self.db.get(job["ptop"], None) + if db: + rp = os.path.join(job["prel"], job["name"]).replace("\\", "/") + self.db_rm(db, rp) + self.db_add(db, job["wark"], rp, job["lmod"], job["size"]) + db.commit() + del self.registry[ptop][wark] + # in-memory registry is reserved for unfinished uploads + return ret def _get_chunksize(self, filesize): @@ -193,6 +391,14 @@ class Up2k(object): chunksize += stepsize stepsize *= mul + def db_rm(self, db, rp): + db.execute("delete from up where rp = ?", (rp,)) + + def db_add(self, db, wark, rp, ts, sz): + db.execute( + "insert into up values (?,?,?,?)", (wark, ts, sz, rp,), + ) + def _get_wark(self, cj): if len(cj["name"]) > 1024 or len(cj["hash"]) > 512 * 1024: # 16TiB raise Pebkac(400, "name or numchunks not according to spec") @@ -209,9 +415,13 @@ class Up2k(object): except: cj["lmod"] = int(time.time()) - # server-reproducible file identifier, independent of name or location - ident = [self.salt, str(cj["size"])] - ident.extend(cj["hash"]) + wark = self._wark_from_hashlist(cj["size"], cj["hash"]) + return wark + + def _wark_from_hashlist(self, filesize, hashes): + """ server-reproducible file identifier, independent of name or location """ + ident = [self.salt, str(filesize)] + ident.extend(hashes) ident = "\n".join(ident) hasher = hashlib.sha512() @@ -221,10 +431,34 @@ class Up2k(object): wark = base64.urlsafe_b64encode(digest) return wark.decode("utf-8").rstrip("=") + def _hashlist_from_file(self, path): + fsz = os.path.getsize(path) + csz = self._get_chunksize(fsz) + ret = [] + with open(path, "rb", 512 * 1024) as f: + while fsz > 0: + hashobj = hashlib.sha512() + rem = min(csz, fsz) + fsz -= rem + while rem > 0: + buf = f.read(min(rem, 64 * 1024)) + if not buf: + raise Exception("EOF at " + str(f.tell())) + + hashobj.update(buf) + rem -= len(buf) + + digest = hashobj.digest()[:32] + digest = base64.urlsafe_b64encode(digest) + ret.append(digest.decode("utf-8").rstrip("=")) + + return ret + def _new_upload(self, job): - self.registry[job["wark"]] = job + self.registry[job["ptop"]][job["wark"]] = job suffix = ".{:.6f}-{}".format(job["t0"], job["addr"]) - with ren_open(job["name"], "wb", fdir=job["rdir"], suffix=suffix) as f: + pdir = os.path.join(job["ptop"], job["prel"]) + with ren_open(job["name"], "wb", fdir=pdir, suffix=suffix) as f: f, job["name"] = f["orz"] f.seek(job["size"] - 1) f.write(b"e") diff --git a/copyparty/util.py b/copyparty/util.py index cf24e654..a009a1c8 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -712,3 +712,6 @@ class Pebkac(Exception): def __init__(self, code, msg=None): super(Pebkac, self).__init__(msg or HTTPCODE[code]) self.code = code + + def __repr__(self): + return "Pebkac({}, {})".format(self.code, repr(self.args)) diff --git a/copyparty/web/up2k.js b/copyparty/web/up2k.js index 480c0c25..190216ca 100644 --- a/copyparty/web/up2k.js +++ b/copyparty/web/up2k.js @@ -672,7 +672,10 @@ function up2k_init(have_crypto) { var rsp = (xhr.responseText + ''); if (rsp.indexOf('partial upload exists') !== -1 || rsp.indexOf('file already exists') !== -1) { - err = rsp.slice(5); + err = rsp; + var ofs = err.lastIndexOf(' : '); + if (ofs > 0) + err = err.slice(0, ofs); } if (err != "") { ebi('f{0}t'.format(t.n)).innerHTML = "ERROR";