diff --git a/.vscode/launch.json b/.vscode/launch.json index d0fa0ad1..fdaaf284 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -12,8 +12,7 @@ //"-nw", "-ed", "-emp", - "-e2d", - "-e2s", + "-e2dsa", "-a", "ed:wark", "-v", diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 592d8710..f4a637f1 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -8,7 +8,7 @@ }, { "label": "no_dbg", - "command": "${config:python.pythonPath} -m copyparty -ed -emp -e2d -e2s -a ed:wark -v srv::r:aed:cnodupe ;exit 1", + "command": "${config:python.pythonPath} -m copyparty -ed -emp -e2dsa -a ed:wark -v srv::r:aed:cnodupe ;exit 1", "type": "shell" } ] diff --git a/copyparty/__main__.py b/copyparty/__main__.py index ab8e17c7..63a402b7 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -174,6 +174,18 @@ def main(): if HAVE_SSL: ensure_cert() + deprecated = [["-e2s", "-e2ds"]] + for dk, nk in deprecated: + try: + idx = sys.argv.index(dk) + except: + continue + + msg = "\033[1;31mWARNING:\033[0;1m\n {} \033[0;33mwas replaced with\033[0;1m {} \033[0;33mand will be removed\n\033[0m" + print(msg.format(dk, nk)) + sys.argv[idx] = nk + time.sleep(2) + ap = argparse.ArgumentParser( formatter_class=RiceFormatter, prog="copyparty", @@ -228,13 +240,15 @@ def main(): ap.add_argument("-ed", action="store_true", help="enable ?dots") ap.add_argument("-emp", action="store_true", help="enable markdown plugins") ap.add_argument("-e2d", action="store_true", help="enable up2k database") - ap.add_argument("-e2s", action="store_true", help="enable up2k db-scanner") + ap.add_argument("-e2ds", action="store_true", help="enable up2k db-scanner, sets -e2d") + ap.add_argument("-e2dsa", action="store_true", help="scan all folders (for search), sets -e2ds") ap.add_argument("-mcr", metavar="SEC", type=int, default=60, help="md-editor mod-chk rate") ap.add_argument("-nw", action="store_true", help="disable writes (benchmark)") ap.add_argument("-nih", action="store_true", help="no info hostname") ap.add_argument("-nid", action="store_true", help="no info disk-usage") ap.add_argument("--no-sendfile", action="store_true", help="disable sendfile") ap.add_argument("--urlform", type=str, default="print,get", help="how to handle url-forms") + ap.add_argument("--salt", type=str, default="hunter2", help="up2k file-hash salt") ap2 = ap.add_argument_group('SSL/TLS options') ap2.add_argument("--http-only", action="store_true", help="disable ssl/tls") @@ -246,6 +260,12 @@ def main(): al = ap.parse_args() # fmt: on + if al.e2dsa: + al.e2ds = True + + if al.e2ds: + al.e2d = True + al.i = al.i.split(",") try: if "-" in al.p: diff --git a/copyparty/authsrv.py b/copyparty/authsrv.py index bd7b72dd..6d888fbc 100644 --- a/copyparty/authsrv.py +++ b/copyparty/authsrv.py @@ -19,6 +19,11 @@ class VFS(object): self.uwrite = uwrite # users who can write this self.flags = flags # config switches self.nodes = {} # child nodes + self.all_vols = {vpath: self} # flattened recursive + + def _trk(self, vol): + self.all_vols[vol.vpath] = vol + return vol def add(self, src, dst): """get existing, or add new path to the vfs""" @@ -30,7 +35,7 @@ class VFS(object): name, dst = dst.split("/", 1) if name in self.nodes: # exists; do not manipulate permissions - return self.nodes[name].add(src, dst) + return self._trk(self.nodes[name].add(src, dst)) vn = VFS( "{}/{}".format(self.realpath, name), @@ -40,7 +45,7 @@ class VFS(object): self.flags, ) self.nodes[name] = vn - return vn.add(src, dst) + return self._trk(vn.add(src, dst)) if dst in self.nodes: # leaf exists; return as-is @@ -50,7 +55,7 @@ class VFS(object): vp = "{}/{}".format(self.vpath, dst).lstrip("/") vn = VFS(src, vp) self.nodes[dst] = vn - return vn + return self._trk(vn) def _find(self, vpath): """return [vfs,remainder]""" @@ -257,7 +262,6 @@ class AuthSrv(object): with open(cfg_fn, "rb") as f: self._parse_config_file(f, user, mread, mwrite, mflags, mount) - self.all_writable = [] if not mount: # -h says our defaults are CWD at root and read/write for everyone vfs = VFS(os.path.abspath("."), "", ["*"], ["*"]) @@ -280,11 +284,6 @@ class AuthSrv(object): v.uread = mread[dst] v.uwrite = mwrite[dst] v.flags = mflags[dst] - if v.uwrite: - self.all_writable.append(v) - - if vfs.uwrite and vfs not in self.all_writable: - self.all_writable.append(vfs) missing_users = {} for d in [mread, mwrite]: diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 36544bd1..e7bb0877 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -5,6 +5,7 @@ import os import stat import gzip import time +import copy import json import socket import ctypes @@ -125,15 +126,15 @@ class HttpCli(object): k, v = k.split("=", 1) uparam[k.lower()] = v.strip() else: - uparam[k.lower()] = True + uparam[k.lower()] = False self.uparam = uparam self.vpath = unquotep(vpath) ua = self.headers.get("user-agent", "") if ua.startswith("rclone/"): - uparam["raw"] = True - uparam["dots"] = True + uparam["raw"] = False + uparam["dots"] = False try: if self.mode in ["GET", "HEAD"]: @@ -237,12 +238,15 @@ class HttpCli(object): ) if not self.readable and not self.writable: self.log("inaccessible: [{}]".format(self.vpath)) - self.uparam = {"h": True} + self.uparam = {"h": False} if "h" in self.uparam: self.vpath = None return self.tx_mounts() + if "tree" in self.uparam: + return self.tx_tree() + return self.tx_browser() def handle_options(self): @@ -401,6 +405,9 @@ class HttpCli(object): except: raise Pebkac(422, "you POSTed invalid json") + if "srch" in self.uparam or "srch" in body: + return self.handle_search(body) + # prefer this over undot; no reason to allow traversion if "/" in body["name"]: raise Pebkac(400, "folders verboten") @@ -426,6 +433,30 @@ class HttpCli(object): self.reply(response.encode("utf-8"), mime="application/json") return True + def handle_search(self, body): + vols = [] + for vtop in self.rvol: + vfs, _ = self.conn.auth.vfs.get(vtop, self.uname, True, False) + vols.append([vfs.vpath, vfs.realpath, vfs.flags]) + + idx = self.conn.get_u2idx() + if "srch" in body: + # search by up2k hashlist + vbody = copy.deepcopy(body) + vbody["hash"] = len(vbody["hash"]) + self.log("qj: " + repr(vbody)) + hits = idx.fsearch(vols, body) + self.log("qh: " + repr(hits)) + else: + # search by query params + self.log("qj: " + repr(body)) + hits = idx.search(vols, body) + self.log("qh: " + str(len(hits))) + + r = json.dumps(hits).encode("utf-8") + self.reply(r, mime="application/json") + return True + def handle_post_binary(self): try: remains = int(self.headers["content-length"]) @@ -1037,6 +1068,60 @@ class HttpCli(object): self.reply(html.encode("utf-8")) return True + def tx_tree(self): + top = self.uparam["tree"] or "" + dst = self.vpath + if top in [".", ".."]: + top = undot(self.vpath + "/" + top) + + if top == dst: + dst = "" + elif top: + if not dst.startswith(top + "/"): + raise Pebkac(400, "arg funk") + + dst = dst[len(top) + 1 :] + + ret = self.gen_tree(top, dst) + ret = json.dumps(ret) + self.reply(ret.encode("utf-8")) + return True + + def gen_tree(self, top, target): + ret = {} + excl = None + if target: + excl, target = (target.split("/", 1) + [""])[:2] + ret["k" + excl] = self.gen_tree("/".join([top, excl]).strip("/"), target) + + try: + vn, rem = self.auth.vfs.get(top, self.uname, self.readable, self.writable) + fsroot, vfs_ls, vfs_virt = vn.ls(rem, self.uname) + except: + vfs_ls = [] + vfs_virt = {} + for v in self.rvol: + d1, d2 = v.rsplit("/", 1) if "/" in v else ["", v] + if d1 == top: + vfs_virt[d2] = 0 + + dirs = [] + + if not self.args.ed or "dots" not in self.uparam: + vfs_ls = exclude_dotfiles(vfs_ls) + + for fn in [x for x in vfs_ls if x != excl]: + abspath = os.path.join(fsroot, fn) + if os.path.isdir(abspath): + dirs.append(fn) + + for x in vfs_virt.keys(): + if x != excl: + dirs.append(x) + + ret["a"] = dirs + return ret + def tx_browser(self): vpath = "" vpnodes = [["", "/"]] @@ -1062,8 +1147,7 @@ class HttpCli(object): if abspath.endswith(".md") and "raw" not in self.uparam: return self.tx_md(abspath) - bad = "{0}.hist{0}up2k.".format(os.sep) - if abspath.endswith(bad + "db") or abspath.endswith(bad + "snap"): + if rem.startswith(".hist/up2k."): raise Pebkac(403) return self.tx_file(abspath) @@ -1092,8 +1176,8 @@ class HttpCli(object): vfs_ls = exclude_dotfiles(vfs_ls) hidden = [] - if fsroot.endswith(str(os.sep) + ".hist"): - hidden = ["up2k.db", "up2k.snap"] + if rem == ".hist": + hidden = ["up2k."] dirs = [] files = [] @@ -1106,7 +1190,7 @@ class HttpCli(object): if fn in vfs_virt: fspath = vfs_virt[fn].realpath - elif fn in hidden: + elif hidden and any(fn.startswith(x) for x in hidden): continue else: fspath = fsroot + "/" + fn @@ -1193,12 +1277,19 @@ class HttpCli(object): # ts = "?{}".format(time.time()) dirs.extend(files) + + if "ls" in self.uparam: + ret = json.dumps(dirs) + self.reply(ret.encode("utf-8", "replace")) + return True + html = self.conn.tpl_browser.render( vdir=quotep(self.vpath), vpnodes=vpnodes, files=dirs, can_upload=self.writable, can_read=self.readable, + have_up2k_idx=self.args.e2d, ts=ts, prologue=logues[0], epilogue=logues[1], diff --git a/copyparty/httpconn.py b/copyparty/httpconn.py index 1cb321cb..8c6ac30f 100644 --- a/copyparty/httpconn.py +++ b/copyparty/httpconn.py @@ -30,6 +30,7 @@ except ImportError: from .__init__ import E from .util import Unrecv from .httpcli import HttpCli +from .u2idx import U2idx class HttpConn(object): @@ -50,6 +51,7 @@ class HttpConn(object): self.t0 = time.time() self.nbyte = 0 self.workload = 0 + self.u2idx = None self.log_func = hsrv.log self.set_rproxy() @@ -80,6 +82,12 @@ class HttpConn(object): def log(self, msg): self.log_func(self.log_src, msg) + def get_u2idx(self): + if not self.u2idx: + self.u2idx = U2idx(self.args, self.log_func) + + return self.u2idx + def _detect_https(self): method = None if self.cert_path: diff --git a/copyparty/svchub.py b/copyparty/svchub.py index 0a8b73cf..cc17e30a 100644 --- a/copyparty/svchub.py +++ b/copyparty/svchub.py @@ -39,9 +39,13 @@ class SvcHub(object): self.tcpsrv = TcpSrv(self) self.up2k = Up2k(self) - if self.args.e2d and self.args.e2s: + if self.args.e2ds: auth = AuthSrv(self.args, self.log, False) - self.up2k.build_indexes(auth.all_writable) + vols = auth.vfs.all_vols.values() + if not self.args.e2dsa: + vols = [x for x in vols if x.uwrite] + + self.up2k.build_indexes(vols) # decide which worker impl to use if self.check_mp_enable(): @@ -79,7 +83,7 @@ class SvcHub(object): now = time.time() if now >= self.next_day: dt = datetime.utcfromtimestamp(now) - print("\033[36m{}\033[0m".format(dt.strftime("%Y-%m-%d"))) + print("\033[36m{}\033[0m\n".format(dt.strftime("%Y-%m-%d")), end="") # unix timestamp of next 00:00:00 (leap-seconds safe) day_now = dt.day @@ -89,7 +93,7 @@ class SvcHub(object): dt = dt.replace(hour=0, minute=0, second=0) self.next_day = calendar.timegm(dt.utctimetuple()) - fmt = "\033[36m{} \033[33m{:21} \033[0m{}" + fmt = "\033[36m{} \033[33m{:21} \033[0m{}\n" if not VT100: fmt = "{} {:21} {}" if "\033" in msg: @@ -100,12 +104,12 @@ class SvcHub(object): ts = datetime.utcfromtimestamp(now).strftime("%H:%M:%S.%f")[:-3] msg = fmt.format(ts, src, msg) try: - print(msg) + print(msg, end="") except UnicodeEncodeError: try: - print(msg.encode("utf-8", "replace").decode()) + print(msg.encode("utf-8", "replace").decode(), end="") except: - print(msg.encode("ascii", "replace").decode()) + print(msg.encode("ascii", "replace").decode(), end="") def check_mp_support(self): vmin = sys.version_info[1] diff --git a/copyparty/u2idx.py b/copyparty/u2idx.py new file mode 100644 index 00000000..59f2a1dd --- /dev/null +++ b/copyparty/u2idx.py @@ -0,0 +1,146 @@ +# coding: utf-8 +from __future__ import print_function, unicode_literals + +import os +from datetime import datetime + +from .util import u8safe +from .up2k import up2k_wark_from_hashlist + + +try: + HAVE_SQLITE3 = True + import sqlite3 +except: + HAVE_SQLITE3 = False + + +class U2idx(object): + def __init__(self, args, log_func): + self.args = args + self.log_func = log_func + + if not HAVE_SQLITE3: + self.log("could not load sqlite3; searchign wqill be disabled") + return + + self.dbs = {} + + def log(self, msg): + self.log_func("u2idx", msg) + + def fsearch(self, vols, body): + """search by up2k hashlist""" + if not HAVE_SQLITE3: + return [] + + fsize = body["size"] + fhash = body["hash"] + wark = up2k_wark_from_hashlist(self.args.salt, fsize, fhash) + return self.run_query(vols, "select * from up where w = ?", [wark]) + + def search(self, vols, body): + """search by query params""" + if not HAVE_SQLITE3: + return [] + + qobj = {} + _conv_sz(qobj, body, "sz_min", "sz >= ?") + _conv_sz(qobj, body, "sz_max", "sz <= ?") + _conv_dt(qobj, body, "dt_min", "mt >= ?") + _conv_dt(qobj, body, "dt_max", "mt <= ?") + for seg, dk in [["path", "rd"], ["name", "fn"]]: + for inv in ["no", "yes"]: + jk = "{}_{}".format(seg, inv) + if jk in body: + _conv_txt(qobj, body, jk, dk) + + qstr = "select * from up" + qv = [] + if qobj: + qk = [] + for k, v in sorted(qobj.items()): + qk.append(k) + qv.append(v) + + qstr = " and ".join(qk) + qstr = "select * from up where " + qstr + + return self.run_query(vols, qstr, qv) + + def run_query(self, vols, qstr, qv): + qv = tuple(qv) + self.log("qs: " + qstr) + self.log("qv: " + repr(qv)) + + ret = [] + lim = 100 + for (vtop, ptop, flags) in vols: + db = self.dbs.get(ptop) + if not db: + db = _open(ptop) + if not db: + continue + + self.dbs[ptop] = db + self.log("idx /{} @ {} {}".format(vtop, ptop, flags)) + + c = db.execute(qstr, qv) + for _, ts, sz, rd, fn in c: + lim -= 1 + if lim <= 0: + break + + rp = os.path.join(vtop, rd, fn).replace("\\", "/") + ret.append({"ts": int(ts), "sz": sz, "rp": rp}) + + return ret + + +def _open(ptop): + db_path = os.path.join(ptop, ".hist", "up2k.db") + if os.path.exists(db_path): + return sqlite3.connect(db_path) + + +def _conv_sz(q, body, k, sql): + if k in body: + q[sql] = int(float(body[k]) * 1024 * 1024) + + +def _conv_dt(q, body, k, sql): + if k not in body: + return + + v = body[k].upper().rstrip("Z").replace(",", " ").replace("T", " ") + while " " in v: + v = v.replace(" ", " ") + + for fmt in ["%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d %H", "%Y-%m-%d"]: + try: + ts = datetime.strptime(v, fmt).timestamp() + break + except: + ts = None + + if ts: + q[sql] = ts + + +def _conv_txt(q, body, k, sql): + v = body[k] + print("[" + v + "]") + + head = "'%'||" + if v.startswith("^"): + head = "" + v = v[1:] + + tail = "||'%'" + if v.endswith("$"): + tail = "" + v = v[:-1] + + inv = "not" if k.endswith("_no") else "" + qk = "{} {} like {}?{}".format(sql, inv, head, tail) + q[qk] = u8safe(v) diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 27053f70..ed9ddc5a 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -1,7 +1,6 @@ # coding: utf-8 from __future__ import print_function, unicode_literals - import re import os import sys @@ -17,15 +16,23 @@ import threading from copy import deepcopy from .__init__ import WINDOWS -from .util import Pebkac, Queue, fsdec, fsenc, sanitize_fn, ren_open, atomic_move +from .util import ( + Pebkac, + Queue, + ProgressPrinter, + fsdec, + fsenc, + sanitize_fn, + ren_open, + atomic_move, + u8safe, +) -HAVE_SQLITE3 = False try: - import sqlite3 - HAVE_SQLITE3 = True + import sqlite3 except: - pass + HAVE_SQLITE3 = False class Up2k(object): @@ -39,11 +46,11 @@ class Up2k(object): def __init__(self, broker): self.broker = broker self.args = broker.args - self.log = broker.log + self.log_func = broker.log self.persist = self.args.e2d # config - self.salt = "hunter2" # TODO: config + self.salt = broker.args.salt # state self.mutex = threading.Lock() @@ -66,8 +73,16 @@ class Up2k(object): self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$") if self.persist and not HAVE_SQLITE3: - m = "could not initialize sqlite3, will use in-memory registry only" - self.log("up2k", m) + self.log("could not initialize sqlite3, will use in-memory registry only") + + def log(self, msg): + self.log_func("up2k", msg + "\033[K") + + def _u8(self, rd, fn): + s_rd = u8safe(rd) + s_fn = u8safe(fn) + self.log("u8safe retry:\n [{}] [{}]\n [{}] [{}]".format(rd, fn, s_rd, s_fn)) + return (s_rd, s_fn) def _vis_job_progress(self, job): perc = 100 - (len(job["need"]) * 100.0 / len(job["hash"])) @@ -98,7 +113,7 @@ class Up2k(object): m = "loaded snap {} |{}|".format(path, len(reg.keys())) m = [m] + self._vis_reg_progress(reg) - self.log("up2k", "\n".join(m)) + self.log("\n".join(m)) self.registry[ptop] = reg if not self.persist or not HAVE_SQLITE3: @@ -119,57 +134,86 @@ class Up2k(object): self.db[ptop] = db return db except Exception as ex: - m = "failed to open [{}]: {}".format(ptop, repr(ex)) - self.log("up2k", m) + self.log("cannot use database at [{}]: {}".format(ptop, repr(ex))) return None def build_indexes(self, writeables): tops = [d.realpath for d in writeables] + self.pp = ProgressPrinter() + t0 = time.time() for top in tops: db = self.register_vpath(top) - if db: - # can be symlink so don't `and d.startswith(top)`` - excl = set([d for d in tops if d != top]) - dbw = [db, 0, time.time()] - self._build_dir(dbw, top, excl, top) - self._drop_lost(db, top) - if dbw[1]: - self.log("up2k", "commit {} new files".format(dbw[1])) + if not db: + continue - db.commit() + self.pp.n = next(db.execute("select count(w) from up"))[0] + db_path = os.path.join(top, ".hist", "up2k.db") + sz0 = os.path.getsize(db_path) // 1024 + + # can be symlink so don't `and d.startswith(top)`` + excl = set([d for d in tops if d != top]) + dbw = [db, 0, time.time()] + + n_add = self._build_dir(dbw, top, excl, top) + n_rm = self._drop_lost(db, top) + if dbw[1]: + self.log("commit {} new files".format(dbw[1])) + + db.commit() + if n_add or n_rm: + db_path = os.path.join(top, ".hist", "up2k.db") + sz1 = os.path.getsize(db_path) // 1024 + db.execute("vacuum") + sz2 = os.path.getsize(db_path) // 1024 + msg = "{} new, {} del, {} kB vacced, {} kB gain, {} kB now".format( + n_add, n_rm, sz1 - sz2, sz2 - sz0, sz2 + ) + self.log(msg) + + self.pp.end = True + self.log("{} volumes in {:.2f} sec".format(len(tops), time.time() - t0)) def _build_dir(self, dbw, top, excl, cdir): try: inodes = [fsdec(x) for x in os.listdir(fsenc(cdir))] except Exception as ex: - self.log("up2k", "listdir: {} @ [{}]".format(repr(ex), cdir)) - return + self.log("listdir: {} @ [{}]".format(repr(ex), cdir)) + return 0 + self.pp.msg = "a{} {}".format(self.pp.n, cdir) histdir = os.path.join(top, ".hist") + ret = 0 for inode in inodes: abspath = os.path.join(cdir, inode) try: inf = os.stat(fsenc(abspath)) except Exception as ex: - self.log("up2k", "stat: {} @ [{}]".format(repr(ex), abspath)) + self.log("stat: {} @ [{}]".format(repr(ex), abspath)) continue if stat.S_ISDIR(inf.st_mode): if abspath in excl or abspath == histdir: continue - # self.log("up2k", " dir: {}".format(abspath)) - self._build_dir(dbw, top, excl, abspath) + # self.log(" dir: {}".format(abspath)) + ret += self._build_dir(dbw, top, excl, abspath) else: - # self.log("up2k", "file: {}".format(abspath)) + # self.log("file: {}".format(abspath)) rp = abspath[len(top) :].replace("\\", "/").strip("/") - c = dbw[0].execute("select * from up where rp = ?", (rp,)) + rd, fn = rp.rsplit("/", 1) if "/" in rp else ["", rp] + sql = "select * from up where rd = ? and fn = ?" + try: + c = dbw[0].execute(sql, (rd, fn)) + except: + c = dbw[0].execute(sql, self._u8(rd, fn)) + in_db = list(c.fetchall()) if in_db: - _, dts, dsz, _ = in_db[0] + self.pp.n -= 1 + _, dts, dsz, _, _ = in_db[0] if len(in_db) > 1: m = "WARN: multiple entries: [{}] => [{}] ({})" - self.log("up2k", m.format(top, rp, len(in_db))) + self.log(m.format(top, rp, len(in_db))) dts = -1 if dts == inf.st_mtime and dsz == inf.st_size: @@ -178,68 +222,80 @@ class Up2k(object): m = "reindex [{}] => [{}] ({}/{}) ({}/{})".format( top, rp, dts, inf.st_mtime, dsz, inf.st_size ) - self.log("up2k", m) - self.db_rm(dbw[0], rp) + self.log(m) + self.db_rm(dbw[0], rd, fn) + ret += 1 dbw[1] += 1 in_db = None - self.log("up2k", "file: {}".format(abspath)) + self.pp.msg = "a{} {}".format(self.pp.n, abspath) + if inf.st_size > 1024 * 1024: + self.log("file: {}".format(abspath)) + try: hashes = self._hashlist_from_file(abspath) except Exception as ex: - self.log("up2k", "hash: {} @ [{}]".format(repr(ex), abspath)) + self.log("hash: {} @ [{}]".format(repr(ex), abspath)) continue - wark = self._wark_from_hashlist(inf.st_size, hashes) - self.db_add(dbw[0], wark, rp, inf.st_mtime, inf.st_size) + wark = up2k_wark_from_hashlist(self.salt, inf.st_size, hashes) + self.db_add(dbw[0], wark, rd, fn, inf.st_mtime, inf.st_size) dbw[1] += 1 + ret += 1 td = time.time() - dbw[2] - if dbw[1] > 1024 or td > 60: - self.log("up2k", "commit {} new files".format(dbw[1])) + if dbw[1] >= 4096 or td >= 60: + self.log("commit {} new files".format(dbw[1])) dbw[0].commit() dbw[1] = 0 dbw[2] = time.time() + return ret def _drop_lost(self, db, top): rm = [] + nchecked = 0 + nfiles = next(db.execute("select count(w) from up"))[0] c = db.execute("select * from up") - for dwark, dts, dsz, drp in c: - abspath = os.path.join(top, drp) + for dwark, dts, dsz, drd, dfn in c: + nchecked += 1 + abspath = os.path.join(top, drd, dfn) + # almost zero overhead dw + self.pp.msg = "b{} {}".format(nfiles - nchecked, abspath) try: if not os.path.exists(fsenc(abspath)): - rm.append(drp) + rm.append([drd, dfn]) except Exception as ex: - self.log("up2k", "stat-rm: {} @ [{}]".format(repr(ex), abspath)) + self.log("stat-rm: {} @ [{}]".format(repr(ex), abspath)) - if not rm: - return + if rm: + self.log("forgetting {} deleted files".format(len(rm))) + for rd, fn in rm: + self.db_rm(db, rd, fn) - self.log("up2k", "forgetting {} deleted files".format(len(rm))) - for rp in rm: - self.db_rm(db, rp) + return len(rm) def _open_db(self, db_path): + existed = os.path.exists(db_path) conn = sqlite3.connect(db_path, check_same_thread=False) try: - c = conn.execute(r"select * from kv where k = 'sver'") - rows = c.fetchall() - if rows: - ver = rows[0][1] - else: - self.log("up2k", "WARN: no sver in kv, DB corrupt?") - ver = "unknown" + ver = self._read_ver(conn) - if ver == "1": + if ver == 1: + conn = self._upgrade_v1(conn, db_path) + ver = self._read_ver(conn) + + if ver == 2: try: nfiles = next(conn.execute("select count(w) from up"))[0] - self.log("up2k", "found DB at {} |{}|".format(db_path, nfiles)) + self.log("found DB at {} |{}|".format(db_path, nfiles)) return conn except Exception as ex: - m = "WARN: could not list files, DB corrupt?\n " + repr(ex) - self.log("up2k", m) + self.log("WARN: could not list files, DB corrupt?\n " + repr(ex)) + + if ver is not None: + self.log("REPLACING unsupported DB (v.{}) at {}".format(ver, db_path)) + elif not existed: + raise Exception("whatever") - m = "REPLACING unsupported DB (v.{}) at {}".format(ver, db_path) - self.log("up2k", m) conn.close() os.unlink(db_path) conn = sqlite3.connect(db_path, check_same_thread=False) @@ -247,17 +303,58 @@ class Up2k(object): pass # sqlite is variable-width only, no point in using char/nchar/varchar + self._create_v2(conn) + conn.commit() + self.log("created DB at {}".format(db_path)) + return conn + + def _read_ver(self, conn): + for tab in ["ki", "kv"]: + try: + c = conn.execute(r"select v from {} where k = 'sver'".format(tab)) + except: + continue + + rows = c.fetchall() + if rows: + return int(rows[0][0]) + + def _create_v2(self, conn): for cmd in [ - r"create table kv (k text, v text)", - r"create table up (w text, mt int, sz int, rp text)", - r"insert into kv values ('sver', '1')", + r"create table ks (k text, v text)", + r"create table ki (k text, v int)", + r"create table up (w text, mt int, sz int, rd text, fn text)", + r"insert into ki values ('sver', 2)", r"create index up_w on up(w)", + r"create index up_rd on up(rd)", + r"create index up_fn on up(fn)", ]: conn.execute(cmd) - conn.commit() - self.log("up2k", "created DB at {}".format(db_path)) - return conn + def _upgrade_v1(self, odb, db_path): + self.log("\033[33mupgrading v1 to v2:\033[0m {}".format(db_path)) + + npath = db_path + ".next" + if os.path.exists(npath): + os.unlink(npath) + + ndb = sqlite3.connect(npath, check_same_thread=False) + self._create_v2(ndb) + + c = odb.execute("select * from up") + for wark, ts, sz, rp in c: + rd, fn = rp.rsplit("/", 1) if "/" in rp else ["", rp] + v = (wark, ts, sz, rd, fn) + ndb.execute("insert into up values (?,?,?,?,?)", v) + + ndb.commit() + ndb.close() + odb.close() + bpath = db_path + ".bak.v1" + self.log("success; backup at: " + bpath) + atomic_move(db_path, bpath) + atomic_move(npath, db_path) + return sqlite3.connect(db_path, check_same_thread=False) def handle_json(self, cj): self.register_vpath(cj["ptop"]) @@ -271,19 +368,13 @@ class Up2k(object): reg = self.registry[cj["ptop"]] if db: cur = db.execute(r"select * from up where w = ?", (wark,)) - for _, dtime, dsize, dp_rel in cur: - dp_abs = os.path.join(cj["ptop"], dp_rel).replace("\\", "/") + for _, dtime, dsize, dp_dir, dp_fn in cur: + dp_abs = os.path.join(cj["ptop"], dp_dir, dp_fn).replace("\\", "/") # relying on path.exists to return false on broken symlinks if os.path.exists(fsenc(dp_abs)): - try: - prel, name = dp_rel.rsplit("/", 1) - except: - prel = "" - name = dp_rel - job = { - "name": name, - "prel": prel, + "name": dp_fn, + "prel": dp_dir, "vtop": cj["vtop"], "ptop": cj["ptop"], "flag": cj["flag"], @@ -319,12 +410,12 @@ class Up2k(object): vsrc = os.path.join(job["vtop"], job["prel"], job["name"]) vsrc = vsrc.replace("\\", "/") # just for prints anyways if job["need"]: - self.log("up2k", "unfinished:\n {0}\n {1}".format(src, dst)) + self.log("unfinished:\n {0}\n {1}".format(src, dst)) err = "partial upload exists at a different location; please resume uploading here instead:\n" err += vsrc + " " raise Pebkac(400, err) elif "nodupe" in job["flag"]: - self.log("up2k", "dupe-reject:\n {0}\n {1}".format(src, dst)) + self.log("dupe-reject:\n {0}\n {1}".format(src, dst)) err = "upload rejected, file already exists:\n " + vsrc + " " raise Pebkac(400, err) else: @@ -389,7 +480,7 @@ class Up2k(object): def _symlink(self, src, dst): # TODO store this in linktab so we never delete src if there are links to it - self.log("up2k", "linking dupe:\n {0}\n {1}".format(src, dst)) + self.log("linking dupe:\n {0}\n {1}".format(src, dst)) try: lsrc = src ldst = dst @@ -412,7 +503,7 @@ class Up2k(object): lsrc = "../" * (len(lsrc) - 1) + "/".join(lsrc) os.symlink(fsenc(lsrc), fsenc(ldst)) except (AttributeError, OSError) as ex: - self.log("up2k", "cannot symlink; creating copy: " + repr(ex)) + self.log("cannot symlink; creating copy: " + repr(ex)) shutil.copy2(fsenc(src), fsenc(dst)) def handle_chunk(self, ptop, wark, chash): @@ -430,7 +521,7 @@ class Up2k(object): job["poke"] = time.time() - chunksize = self._get_chunksize(job["size"]) + chunksize = up2k_chunksize(job["size"]) ofs = [chunksize * x for x in nchunk] path = os.path.join(job["ptop"], job["prel"], job["tnam"]) @@ -463,33 +554,31 @@ class Up2k(object): db = self.db.get(job["ptop"], None) if db: - rp = os.path.join(job["prel"], job["name"]).replace("\\", "/") - self.db_rm(db, rp) - self.db_add(db, job["wark"], rp, job["lmod"], job["size"]) + j = job + self.db_rm(db, j["prel"], j["name"]) + self.db_add(db, j["wark"], j["prel"], j["name"], j["lmod"], j["size"]) db.commit() del self.registry[ptop][wark] # in-memory registry is reserved for unfinished uploads return ret, dst - def _get_chunksize(self, filesize): - chunksize = 1024 * 1024 - stepsize = 512 * 1024 - while True: - for mul in [1, 2]: - nchunks = math.ceil(filesize * 1.0 / chunksize) - if nchunks <= 256 or chunksize >= 32 * 1024 * 1024: - return chunksize + def db_rm(self, db, rd, fn): + sql = "delete from up where rd = ? and fn = ?" + try: + db.execute(sql, (rd, fn)) + except: + db.execute(sql, self._u8(rd, fn)) - chunksize += stepsize - stepsize *= mul - - def db_rm(self, db, rp): - db.execute("delete from up where rp = ?", (rp,)) - - def db_add(self, db, wark, rp, ts, sz): - v = (wark, ts, sz, rp) - db.execute("insert into up values (?,?,?,?)", v) + def db_add(self, db, wark, rd, fn, ts, sz): + sql = "insert into up values (?,?,?,?,?)" + v = (wark, ts, sz, rd, fn) + try: + db.execute(sql, v) + except: + rd, fn = self._u8(rd, fn) + v = (wark, ts, sz, rd, fn) + db.execute(sql, v) def _get_wark(self, cj): if len(cj["name"]) > 1024 or len(cj["hash"]) > 512 * 1024: # 16TiB @@ -507,36 +596,17 @@ class Up2k(object): except: cj["lmod"] = int(time.time()) - wark = self._wark_from_hashlist(cj["size"], cj["hash"]) + wark = up2k_wark_from_hashlist(self.salt, cj["size"], cj["hash"]) return wark - def _wark_from_hashlist(self, filesize, hashes): - """ server-reproducible file identifier, independent of name or location """ - ident = [self.salt, str(filesize)] - ident.extend(hashes) - ident = "\n".join(ident) - - hasher = hashlib.sha512() - hasher.update(ident.encode("utf-8")) - digest = hasher.digest()[:32] - - wark = base64.urlsafe_b64encode(digest) - return wark.decode("utf-8").rstrip("=") - def _hashlist_from_file(self, path): fsz = os.path.getsize(path) - csz = self._get_chunksize(fsz) + csz = up2k_chunksize(fsz) ret = [] last_print = time.time() with open(path, "rb", 512 * 1024) as f: while fsz > 0: - now = time.time() - td = now - last_print - if td >= 0.1: - last_print = now - msg = " {} MB \r".format(int(fsz / 1024 / 1024)) - print(msg, end="", file=sys.stderr) - + self.pp.msg = msg = "{} MB".format(int(fsz / 1024 / 1024)) hashobj = hashlib.sha512() rem = min(csz, fsz) fsz -= rem @@ -599,7 +669,7 @@ class Up2k(object): if rm: m = "dropping {} abandoned uploads in {}".format(len(rm), k) vis = [self._vis_job_progress(x) for x in rm] - self.log("up2k", "\n".join([m] + vis)) + self.log("\n".join([m] + vis)) for job in rm: del reg[job["wark"]] try: @@ -635,5 +705,32 @@ class Up2k(object): atomic_move(path2, path) - self.log("up2k", "snap: {} |{}|".format(path, len(reg.keys()))) + self.log("snap: {} |{}|".format(path, len(reg.keys()))) prev[k] = etag + + +def up2k_chunksize(filesize): + chunksize = 1024 * 1024 + stepsize = 512 * 1024 + while True: + for mul in [1, 2]: + nchunks = math.ceil(filesize * 1.0 / chunksize) + if nchunks <= 256 or chunksize >= 32 * 1024 * 1024: + return chunksize + + chunksize += stepsize + stepsize *= mul + + +def up2k_wark_from_hashlist(salt, filesize, hashes): + """ server-reproducible file identifier, independent of name or location """ + ident = [salt, str(filesize)] + ident.extend(hashes) + ident = "\n".join(ident) + + hasher = hashlib.sha512() + hasher.update(ident.encode("utf-8")) + digest = hasher.digest()[:32] + + wark = base64.urlsafe_b64encode(digest) + return wark.decode("utf-8").rstrip("=") diff --git a/copyparty/util.py b/copyparty/util.py index b5b37090..038990ec 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -99,6 +99,32 @@ class Unrecv(object): self.buf = buf + self.buf +class ProgressPrinter(threading.Thread): + """ + periodically print progress info without linefeeds + """ + + def __init__(self): + threading.Thread.__init__(self) + self.daemon = True + self.msg = None + self.end = False + self.start() + + def run(self): + msg = None + while not self.end: + time.sleep(0.05) + if msg == self.msg or self.end: + continue + + msg = self.msg + print(" {}\033[K\r".format(msg), end="") + + print("\033[K", end="") + sys.stdout.flush() # necessary on win10 even w/ stderr btw + + @contextlib.contextmanager def ren_open(fname, *args, **kwargs): fdir = kwargs.pop("fdir", None) @@ -146,7 +172,7 @@ def ren_open(fname, *args, **kwargs): except OSError as ex_: ex = ex_ - if ex.errno != 36: + if ex.errno not in [36, 63] and (not WINDOWS or ex.errno != 22): raise if not b64: @@ -480,6 +506,13 @@ def sanitize_fn(fn): return fn.strip() +def u8safe(txt): + try: + return txt.encode("utf-8", "xmlcharrefreplace").decode("utf-8", "replace") + except: + return txt.encode("utf-8", "replace").decode("utf-8", "replace") + + def exclude_dotfiles(filepaths): for fpath in filepaths: if not fpath.split("/")[-1].startswith("."): diff --git a/copyparty/web/browser.css b/copyparty/web/browser.css index bdc8ca57..baedfa50 100644 --- a/copyparty/web/browser.css +++ b/copyparty/web/browser.css @@ -39,15 +39,27 @@ body { margin: 1.3em 0 0 0; font-size: 1.4em; } +#path #entree { + margin-left: -.7em; +} +#treetab { + display: none; +} #files { border-collapse: collapse; margin-top: 2em; + z-index: 1; + position: relative; } #files tbody a { display: block; padding: .3em 0; } -a { +#files[ts] tbody div a { + color: #f5a; +} +a, +#files[ts] tbody div a:last-child { color: #fc5; padding: .2em; text-decoration: none; @@ -156,7 +168,7 @@ a.play.act { height: 100%; background: #333; font-size: 2.5em; - z-index:99; + z-index: 99; } #blk_play, #blk_abrt { @@ -190,6 +202,7 @@ a.play.act { bottom: -6em; height: 6em; width: 100%; + z-index: 3; transition: bottom 0.15s; } #widget.open { @@ -214,6 +227,9 @@ a.play.act { 75% {cursor: url(/.cpr/dd/5.png), pointer} 85% {cursor: url(/.cpr/dd/1.png), pointer} } +@keyframes spin { + 100% {transform: rotate(360deg)} +} #wtoggle { position: absolute; top: -1.2em; @@ -273,3 +289,207 @@ a.play.act { width: calc(100% - 10.5em); background: rgba(0,0,0,0.2); } + + + +.opview { + display: none; +} +.opview.act { + display: block; +} +#ops a { + color: #fc5; + font-size: 1.5em; + padding: .25em .3em; + margin: 0; + outline: none; +} +#ops a.act { + background: #281838; + border-radius: 0 0 .2em .2em; + border-bottom: .3em solid #d90; + box-shadow: 0 -.15em .2em #000 inset; + padding-bottom: .3em; +} +#ops i { + font-size: 1.5em; +} +#ops i:before { + content: 'x'; + color: #282828; + text-shadow: 0 0 .08em #01a7e1; + position: relative; +} +#ops i:after { + content: 'x'; + color: #282828; + text-shadow: 0 0 .08em #ff3f1a; + margin-left: -.35em; + font-size: 1.05em; +} +#ops, +.opbox { + border: 1px solid #3a3a3a; + box-shadow: 0 0 1em #222 inset; +} +#ops { + display: none; + background: #333; + margin: 1.7em 1.5em 0 1.5em; + padding: .3em .6em; + border-radius: .3em; + border-width: .15em 0; +} +.opbox { + background: #2d2d2d; + margin: 1.5em 0 0 0; + padding: .5em; + border-radius: 0 1em 1em 0; + border-width: .15em .3em .3em 0; + max-width: 40em; +} +.opbox input { + margin: .5em; +} +.opview input[type=text] { + color: #fff; + background: #383838; + border: none; + box-shadow: 0 0 .3em #222; + border-bottom: 1px solid #fc5; + border-radius: .2em; + padding: .2em .3em; +} +input[type="checkbox"]+label { + color: #f5a; +} +input[type="checkbox"]:checked+label { + color: #fc5; +} + + + +#op_search table { + border: 1px solid #3a3a3a; + box-shadow: 0 0 1em #222 inset; + background: #2d2d2d; + border-radius: .4em; + margin: 1.4em; + margin-bottom: 0; + padding: 0 .5em .5em 0; +} +#srch_form td { + padding: .6em .6em; +} +#op_search input { + margin: 0; +} +#srch_q { + white-space: pre; +} +#files td div span { + color: #fff; + padding: 0 .4em; + font-weight: bold; + font-style: italic; +} +#files td div a:hover { + background: #444; + color: #fff; +} +#files td div a { + display: table-cell; + white-space: nowrap; +} +#files td div a:last-child { + width: 100%; +} +#files td div { + display: table; + border-collapse: collapse; + width: 100%; +} +#files td div a:last-child { + width: 100%; +} +#tree, +#treefiles { + vertical-align: top; +} +#tree { + padding-top: 2em; +} +#detree { + padding: .3em .5em; + font-size: 1.5em; +} +#treefiles #files tbody { + border-radius: 0 .7em 0 .7em; +} +#treefiles #files thead th:nth-child(1) { + border-radius: .7em 0 0 0; +} +#tree li { + list-style: none; +} +#tree ul, +#tree li { + padding: 0; + margin: 0; +} +#tree ul { + border-left: .2em solid #444; +} +#tree li { + margin-left: 1em; +} +#tree a.hl { + color: #400; + background: #fc4; + border-radius: .3em; + text-shadow: none; +} +#tree li { + white-space: nowrap; +} +#tree a { + display: inline-block; +} +#tree a+a { + width: calc(100% - 2em); + background: #333; +} +#tree a+a:hover { + background: #222; + color: #fff; +} +#treeul { + position: relative; + overflow: hidden; + left: -1.7em; +} +#treeul:hover { + z-index: 2; + overflow: visible; +} +#treeul:hover a+a { + width: auto; + min-width: calc(100% - 2em); +} +#treeul a:first-child { + font-family: monospace, monospace; +} +#treefiles { + opacity: 1; + transition: opacity 0.2s ease-in-out; +} +#tree:hover+#treefiles { + opacity: .8; +} +.dumb_loader_thing { + display: inline-block; + margin: 1em; + font-size: 3em; + animation: spin 1s linear infinite; +} diff --git a/copyparty/web/browser.html b/copyparty/web/browser.html index 9d326ecc..ff995edb 100644 --- a/copyparty/web/browser.html +++ b/copyparty/web/browser.html @@ -13,11 +13,33 @@
+
+ 🍞...
+
🌲
+ |
+ + |
' + sconf[a][0] + ' | ');
+ for (var b = 1; b < 3; b++) {
+ var hn = "srch_" + sconf[a][b][0];
+ html.push(
+ '\n' +
+ '\n' +
+ ' | ');
+ }
+ html.push('||||
- | ' + links + ' | ' + sz + + ' | ' + ext + ' | ' + ts + ' | |
' + res[a][0] + ' | ' + res[a][2] + ' | '; + + for (var b = 3; b < res[a].length; b++) { + ln += '' + res[a][b] + ' | '; + } + html.push(ln + '
parallel uploads | ++ + + | ++ + + | ++ + + | + {%- if have_up2k_idx %} ++ + + | + {%- endif %} +|
@@ -51,26 +63,18 @@ + | -- - - | -- - - | -- - - |
( if you don't need lastmod timestamps, resumable uploads or progress bars just use the basic uploader)
+( if you don't need lastmod timestamps, resumable uploads or progress bars just use the basic uploader)