From 05e0c2ec9e8dafca4b3fd8df627689b8f025c1a8 Mon Sep 17 00:00:00 2001 From: ed Date: Sun, 26 Feb 2023 18:23:32 +0000 Subject: [PATCH] add xiu (batching hook; runs on idle after uploads) + bunch of tweaks/fixes for hooks --- bin/hooks/notify2.py | 68 +++++++++ bin/hooks/xiu-sha.py | 103 +++++++++++++ bin/hooks/xiu.py | 45 ++++++ copyparty/__main__.py | 11 ++ copyparty/authsrv.py | 23 ++- copyparty/cfg.py | 1 + copyparty/ftpd.py | 56 +++++-- copyparty/httpcli.py | 19 ++- copyparty/smbd.py | 20 ++- copyparty/up2k.py | 333 ++++++++++++++++++++++++++++++++++-------- copyparty/util.py | 123 +++++++++++++--- tests/util.py | 2 +- 12 files changed, 688 insertions(+), 116 deletions(-) create mode 100755 bin/hooks/notify2.py create mode 100755 bin/hooks/xiu-sha.py create mode 100755 bin/hooks/xiu.py diff --git a/bin/hooks/notify2.py b/bin/hooks/notify2.py new file mode 100755 index 00000000..185b7631 --- /dev/null +++ b/bin/hooks/notify2.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 + +import json +import os +import sys +import subprocess as sp +from datetime import datetime +from plyer import notification + + +_ = r""" +same as notify.py but with additional info (uploader, ...) +and also supports --xm (notify on 📟 message) + +example usages; either as global config (all volumes) or as volflag: + --xm f,j,bin/hooks/notify2.py + --xau f,j,bin/hooks/notify2.py + -v srv/inc:inc:c,xm=f,j,bin/hooks/notify2.py + -v srv/inc:inc:c,xau=f,j,bin/hooks/notify2.py + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +parameters explained, + xau = execute after upload + f = fork so it doesn't block uploads + j = provide json instead of filepath list +""" + + +try: + from copyparty.util import humansize +except: + + def humansize(n): + return n + + +def main(): + inf = json.loads(sys.argv[1]) + fp = inf["ap"] + sz = humansize(inf["sz"]) + dp, fn = os.path.split(fp) + mt = datetime.utcfromtimestamp(inf["mt"]).strftime("%Y-%m-%d %H:%M:%S") + + msg = f"{fn} ({sz})\n📁 {dp}" + title = "File received" + icon = "emblem-documents-symbolic" if sys.platform == "linux" else "" + + if inf.get("txt"): + msg = inf["txt"] + title = "Message received" + icon = "mail-unread-symbolic" if sys.platform == "linux" else "" + + msg += f"\n👤 {inf['user']} ({inf['ip']})\n🕒 {mt}" + + if "com.termux" in sys.executable: + sp.run(["termux-notification", "-t", title, "-c", msg]) + return + + notification.notify( + title=title, + message=msg, + app_icon=icon, + timeout=10, + ) + + +if __name__ == "__main__": + main() diff --git a/bin/hooks/xiu-sha.py b/bin/hooks/xiu-sha.py new file mode 100755 index 00000000..635e93d9 --- /dev/null +++ b/bin/hooks/xiu-sha.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 + +import hashlib +import json +import sys +from datetime import datetime + + +_ = r""" +this hook will produce a single sha512 file which +covers all recent uploads (plus metadata comments) + +use this with --xiu, which makes copyparty buffer +uploads until server is idle, providing file infos +on stdin (filepaths or json) + +example usage as global config: + --xiu i5,j,bin/hooks/xiu-sha.py + +example usage as a volflag (per-volume config): + -v srv/inc:inc:c,xiu=i5,j,bin/hooks/xiu-sha.py + +parameters explained, + xiu = execute after uploads... + i5 = ...after volume has been idle for 5sec + j = provide json instead of filepath list + +note the "f" (fork) flag is not set, so this xiu +will block other xiu hooks while it's running +""" + + +try: + from copyparty.util import fsenc +except: + + def fsenc(p): + return p + + +def humantime(ts): + return datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") + + +def find_files_root(inf): + di = 9000 + for f1, f2 in zip(inf, inf[1:]): + p1 = f1["ap"].replace("\\", "/").rsplit("/", 1)[0] + p2 = f2["ap"].replace("\\", "/").rsplit("/", 1)[0] + di = min(len(p1), len(p2), di) + di = next((i for i in range(di) if p1[i] != p2[i]), di) + + return di + 1 + + +def find_vol_root(inf): + return len(inf[0]["ap"][: -len(inf[0]["vp"])]) + + +def main(): + zb = sys.stdin.buffer.read() + zs = zb.decode("utf-8", "replace") + inf = json.loads(zs) + + # root directory (where to put the sha512 file); + # di = find_files_root(inf) # next to the file closest to volume root + di = find_vol_root(inf) # top of the entire volume + + ret = [] + total_sz = 0 + for md in inf: + ap = md["ap"] + rp = ap[di:] + total_sz += md["sz"] + fsize = "{:,}".format(md["sz"]) + mtime = humantime(md["mt"]) + up_ts = humantime(md["at"]) + + h = hashlib.sha512() + with open(fsenc(md["ap"]), "rb", 512 * 1024) as f: + while True: + buf = f.read(512 * 1024) + if not buf: + break + + h.update(buf) + + cksum = h.hexdigest() + meta = " | ".join([md["wark"], up_ts, mtime, fsize, md["ip"]]) + ret.append("# {}\n{} *{}".format(meta, cksum, rp)) + + ret.append("# {} files, {} bytes total".format(len(inf), total_sz)) + ret.append("") + ftime = datetime.utcnow().strftime("%Y-%m%d-%H%M%S.%f") + fp = "{}xfer-{}.sha512".format(inf[0]["ap"][:di], ftime) + with open(fsenc(fp), "wb") as f: + f.write("\n".join(ret).encode("utf-8", "replace")) + + print("wrote checksums to {}".format(fp)) + + +if __name__ == "__main__": + main() diff --git a/bin/hooks/xiu.py b/bin/hooks/xiu.py new file mode 100755 index 00000000..8b21a25c --- /dev/null +++ b/bin/hooks/xiu.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 + +import json +import sys + + +_ = r""" +this hook prints absolute filepaths + total size + +use this with --xiu, which makes copyparty buffer +uploads until server is idle, providing file infos +on stdin (filepaths or json) + +example usage as global config: + --xiu i1,j,bin/hooks/xiu.py + +example usage as a volflag (per-volume config): + -v srv/inc:inc:c,xiu=i1,j,bin/hooks/xiu.py + +parameters explained, + xiu = execute after uploads... + i1 = ...after volume has been idle for 1sec + j = provide json instead of filepath list + +note the "f" (fork) flag is not set, so this xiu +will block other xiu hooks while it's running +""" + + +def main(): + zb = sys.stdin.buffer.read() + zs = zb.decode("utf-8", "replace") + inf = json.loads(zs) + + total_sz = 0 + for upload in inf: + sz = upload["sz"] + total_sz += sz + print("{:9} {}".format(sz, upload["ap"])) + + print("{} files, {} bytes total".format(len(inf), total_sz)) + + +if __name__ == "__main__": + main() diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 8a61e06b..ac0a395f 100755 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -511,6 +511,7 @@ def get_sects(): execute a command (a program or script) before or after various events; \033[36mxbu\033[35m executes CMD before a file upload starts \033[36mxau\033[35m executes CMD after a file upload finishes + \033[36mxiu\033[35m executes CMD after all uploads finish and volume is idle \033[36mxbr\033[35m executes CMD before a file rename/move \033[36mxar\033[35m executes CMD after a file rename/move \033[36mxbd\033[35m executes CMD before a file delete @@ -532,6 +533,7 @@ def get_sects(): \033[36mj\033[35m provides json with info as 1st arg instead of filepath \033[36mwN\033[35m waits N sec after command has been started before continuing \033[36mtN\033[35m sets an N sec timeout before the command is abandoned + \033[36miN\033[35m xiu only: volume must be idle for N sec (default = 5) \033[36mkt\033[35m kills the entire process tree on timeout (default), \033[36mkm\033[35m kills just the main process @@ -542,6 +544,14 @@ def get_sects(): \033[36mc2\033[35m show only stdout \033[36mc3\033[35m mute all process otput \033[0m + each hook is executed once for each event, except for \033[36mxiu\033[0m + which builds up a backlog of uploads, running the hook just once + as soon as the volume has been idle for iN seconds (5 by default) + + \033[36mxiu\033[0m is also unique in that it will pass the metadata to the + executed program on STDIN instead of as argv arguments, and + it also includes the wark (file-id/hash) as a json property + except for \033[36mxm\033[0m, only one hook / one action can run at a time, so it's recommended to use the \033[36mf\033[0m flag unless you really need to wait for the hook to finish before continuing (without \033[36mf\033[0m @@ -769,6 +779,7 @@ def add_hooks(ap): ap2 = ap.add_argument_group('event hooks (see --help-hooks)') ap2.add_argument("--xbu", metavar="CMD", type=u, action="append", help="execute CMD before a file upload starts") ap2.add_argument("--xau", metavar="CMD", type=u, action="append", help="execute CMD after a file upload finishes") + ap2.add_argument("--xiu", metavar="CMD", type=u, action="append", help="execute CMD after all uploads finish and volume is idle") ap2.add_argument("--xbr", metavar="CMD", type=u, action="append", help="execute CMD before a file move/rename") ap2.add_argument("--xar", metavar="CMD", type=u, action="append", help="execute CMD after a file move/rename") ap2.add_argument("--xbd", metavar="CMD", type=u, action="append", help="execute CMD before a file delete") diff --git a/copyparty/authsrv.py b/copyparty/authsrv.py index 34f0f95c..bb030ddc 100644 --- a/copyparty/authsrv.py +++ b/copyparty/authsrv.py @@ -934,7 +934,7 @@ class AuthSrv(object): is_list: bool, ) -> None: desc = flagdescs.get(name, "?").replace("\n", " ") - if name not in ["mtp", "xbu", "xau", "xbr", "xar", "xbd", "xad", "xm"]: + if name not in "mtp xbu xau xiu xbr xar xbd xad xm".split(): if value is True: t = "└─add volflag [{}] = {} ({})" else: @@ -1303,7 +1303,7 @@ class AuthSrv(object): vol.flags["mth"] = self.args.mth # append additive args from argv to volflags - hooks = "xbu xau xbr xar xbd xad xm".split() + hooks = "xbu xau xiu xbr xar xbd xad xm".split() for name in ["mtp"] + hooks: self._read_volflag(vol.flags, name, getattr(self.args, name), True) @@ -1363,10 +1363,19 @@ class AuthSrv(object): if k in ints: vol.flags[k] = int(vol.flags[k]) - if "lifetime" in vol.flags and "e2d" not in vol.flags: - t = 'removing lifetime config from volume "/{}" because e2d is disabled' - self.log(t.format(vol.vpath), 1) - del vol.flags["lifetime"] + if "e2d" not in vol.flags: + if "lifetime" in vol.flags: + t = 'removing lifetime config from volume "/{}" because e2d is disabled' + self.log(t.format(vol.vpath), 1) + del vol.flags["lifetime"] + + needs_e2d = [x for x in hooks if x != "xm"] + drop = [x for x in needs_e2d if vol.flags.get(x)] + if drop: + t = 'removing [{}] from volume "/{}" because e2d is disabled' + self.log(t.format(", ".join(drop), vol.vpath), 1) + for x in drop: + vol.flags.pop(x) if vol.flags.get("neversymlink") and not vol.flags.get("hardlink"): vol.flags["copydupes"] = True @@ -1624,7 +1633,7 @@ class AuthSrv(object): ] csv = set("i p".split()) - lst = set("c ihead mtm mtp xad xar xau xbd xbr xbu xm".split()) + lst = set("c ihead mtm mtp xad xar xau xiu xbd xbr xbu xm".split()) askip = set("a v c vc cgen theme".split()) # keymap from argv to vflag diff --git a/copyparty/cfg.py b/copyparty/cfg.py index 32389654..6e6ba025 100644 --- a/copyparty/cfg.py +++ b/copyparty/cfg.py @@ -123,6 +123,7 @@ flagcats = { "event hooks\n(better explained in --help-hooks)": { "xbu=CMD": "execute CMD before a file upload starts", "xau=CMD": "execute CMD after a file upload finishes", + "xiu=CMD": "execute CMD after all uploads finish and volume is idle", "xbr=CMD": "execute CMD before a file rename/move", "xar=CMD": "execute CMD after a file rename/move", "xbd=CMD": "execute CMD before a file delete", diff --git a/copyparty/ftpd.py b/copyparty/ftpd.py index f2abdf1c..3e9f8ff5 100644 --- a/copyparty/ftpd.py +++ b/copyparty/ftpd.py @@ -15,6 +15,7 @@ from pyftpdlib.servers import FTPServer from .__init__ import ANYWIN, PY2, TYPE_CHECKING, E from .bos import bos +from .authsrv import VFS from .util import ( Daemon, Pebkac, @@ -23,6 +24,7 @@ from .util import ( ipnorm, pybin, relchk, + runhook, sanitize_fn, vjoin, ) @@ -132,7 +134,7 @@ class FtpFs(AbstractedFS): w: bool = False, m: bool = False, d: bool = False, - ) -> str: + ) -> tuple[str, VFS, str]: try: vpath = vpath.replace("\\", "/").lstrip("/") rd, fn = os.path.split(vpath) @@ -146,7 +148,7 @@ class FtpFs(AbstractedFS): if not vfs.realpath: raise FilesystemError("no filesystem mounted at this path") - return os.path.join(vfs.realpath, rem) + return os.path.join(vfs.realpath, rem), vfs, rem except Pebkac as ex: raise FilesystemError(str(ex)) @@ -157,7 +159,7 @@ class FtpFs(AbstractedFS): w: bool = False, m: bool = False, d: bool = False, - ) -> str: + ) -> tuple[str, VFS, str]: return self.v2a(os.path.join(self.cwd, vpath), r, w, m, d) def ftp2fs(self, ftppath: str) -> str: @@ -179,7 +181,7 @@ class FtpFs(AbstractedFS): r = "r" in mode w = "w" in mode or "a" in mode or "+" in mode - ap = self.rv2a(filename, r, w) + ap = self.rv2a(filename, r, w)[0] if w: try: st = bos.stat(ap) @@ -212,7 +214,7 @@ class FtpFs(AbstractedFS): ) = self.hub.asrv.vfs.can_access(self.cwd.lstrip("/"), self.h.username) def mkdir(self, path: str) -> None: - ap = self.rv2a(path, w=True) + ap = self.rv2a(path, w=True)[0] bos.mkdir(ap) def listdir(self, path: str) -> list[str]: @@ -244,7 +246,7 @@ class FtpFs(AbstractedFS): return list(sorted(list(r.keys()))) def rmdir(self, path: str) -> None: - ap = self.rv2a(path, d=True) + ap = self.rv2a(path, d=True)[0] bos.rmdir(ap) def remove(self, path: str) -> None: @@ -277,10 +279,10 @@ class FtpFs(AbstractedFS): def stat(self, path: str) -> os.stat_result: try: - ap = self.rv2a(path, r=True) + ap = self.rv2a(path, r=True)[0] return bos.stat(ap) except: - ap = self.rv2a(path) + ap = self.rv2a(path)[0] st = bos.stat(ap) if not stat.S_ISDIR(st.st_mode): raise @@ -288,11 +290,11 @@ class FtpFs(AbstractedFS): return st def utime(self, path: str, timeval: float) -> None: - ap = self.rv2a(path, w=True) + ap = self.rv2a(path, w=True)[0] return bos.utime(ap, (timeval, timeval)) def lstat(self, path: str) -> os.stat_result: - ap = self.rv2a(path) + ap = self.rv2a(path)[0] return bos.stat(ap) def isfile(self, path: str) -> bool: @@ -303,7 +305,7 @@ class FtpFs(AbstractedFS): return False # expected for mojibake in ftp_SIZE() def islink(self, path: str) -> bool: - ap = self.rv2a(path) + ap = self.rv2a(path)[0] return bos.path.islink(ap) def isdir(self, path: str) -> bool: @@ -314,18 +316,18 @@ class FtpFs(AbstractedFS): return True def getsize(self, path: str) -> int: - ap = self.rv2a(path) + ap = self.rv2a(path)[0] return bos.path.getsize(ap) def getmtime(self, path: str) -> float: - ap = self.rv2a(path) + ap = self.rv2a(path)[0] return bos.path.getmtime(ap) def realpath(self, path: str) -> str: return path def lexists(self, path: str) -> bool: - ap = self.rv2a(path) + ap = self.rv2a(path)[0] return bos.path.lexists(ap) def get_user_by_uid(self, uid: int) -> str: @@ -355,11 +357,31 @@ class FtpHandler(FTPHandler): # reduce non-debug logging self.log_cmds_list = [x for x in self.log_cmds_list if x not in ("CWD", "XCWD")] + def die(self, msg): + self.respond("550 {}".format(msg)) + raise FilesystemError(msg) + def ftp_STOR(self, file: str, mode: str = "w") -> Any: # Optional[str] vp = join(self.fs.cwd, file).lstrip("/") - ap = self.fs.v2a(vp) + ap, vfs, rem = self.fs.v2a(vp) self.vfs_map[ap] = vp + xbu = vfs.flags.get("xbu") + if xbu and not runhook( + None, + xbu, + ap, + vfs.canonical(rem), + "", + self.username, + 0, + 0, + self.cli_ip, + 0, + "", + ): + self.die("Upload blocked by xbu server config") + # print("ftp_STOR: {} {} => {}".format(vp, mode, ap)) ret = FTPHandler.ftp_STOR(self, file, mode) # print("ftp_STOR: {} {} OK".format(vp, mode)) @@ -384,11 +406,13 @@ class FtpHandler(FTPHandler): vfs, rem = vfs.get_dbv(rem) self.hub.up2k.hash_file( vfs.realpath, + vfs.vpath, vfs.flags, rem, fn, - self.remote_ip, + self.cli_ip, time.time(), + self.username, ) return FTPHandler.log_transfer( diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 66de4b95..06e67d4b 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -1272,9 +1272,10 @@ class HttpCli(object): self.vpath, self.host, self.uname, - self.ip, time.time(), len(xm), + self.ip, + time.time(), plain, ) @@ -1415,9 +1416,10 @@ class HttpCli(object): self.vpath, self.host, self.uname, - self.ip, at, remains, + self.ip, + at, "", ): t = "upload denied by xbu" @@ -1491,9 +1493,10 @@ class HttpCli(object): self.vpath, self.host, self.uname, - self.ip, at, post_sz, + self.ip, + at, "", ): t = "upload denied by xau" @@ -1505,11 +1508,13 @@ class HttpCli(object): self.conn.hsrv.broker.say( "up2k.hash_file", vfs.realpath, + vfs.vpath, vfs.flags, rem, fn, self.ip, at, + self.uname, ) vsuf = "" @@ -2102,9 +2107,10 @@ class HttpCli(object): self.vpath, self.host, self.uname, - self.ip, at, 0, + self.ip, + at, "", ): t = "upload denied by xbu" @@ -2161,9 +2167,10 @@ class HttpCli(object): self.vpath, self.host, self.uname, - self.ip, at, sz, + self.ip, + at, "", ): t = "upload denied by xau" @@ -2175,11 +2182,13 @@ class HttpCli(object): self.conn.hsrv.broker.say( "up2k.hash_file", dbv.realpath, + vfs.vpath, dbv.flags, vrem, fname, self.ip, at, + self.uname, ) self.conn.nbyte += sz diff --git a/copyparty/smbd.py b/copyparty/smbd.py index 94e8529b..ac5cdab4 100644 --- a/copyparty/smbd.py +++ b/copyparty/smbd.py @@ -12,10 +12,10 @@ from types import SimpleNamespace from .__init__ import ANYWIN, EXE, TYPE_CHECKING from .authsrv import LEELOO_DALLAS, VFS from .bos import bos -from .util import Daemon, min_ex, pybin +from .util import Daemon, min_ex, pybin, runhook if True: # pylint: disable=using-constant-test - from typing import Any + from typing import Any, Union if TYPE_CHECKING: from .svchub import SvcHub @@ -113,6 +113,9 @@ class SMB(object): self.stop = srv.stop self.log("smb", "listening @ {}:{}".format(ip, port)) + def nlog(self, msg: str, c: Union[int, str] = 0) -> None: + self.log("smb", msg, c) + def start(self) -> None: Daemon(self.srv.start) @@ -169,8 +172,15 @@ class SMB(object): yeet("blocked write (no --smbw): " + vpath) vfs, ap = self._v2a("open", vpath, *a) - if wr and not vfs.axs.uwrite: - yeet("blocked write (no-write-acc): " + vpath) + if wr: + if not vfs.axs.uwrite: + yeet("blocked write (no-write-acc): " + vpath) + + xbu = vfs.flags.get("xbu") + if xbu and not runhook( + self.nlog, xbu, ap, vpath, "", "", 0, 0, "1.7.6.2", 0, "" + ): + yeet("blocked by xbu server config: " + vpath) ret = bos.open(ap, flags, *a, mode=chmod, **ka) if wr: @@ -198,11 +208,13 @@ class SMB(object): vfs, rem = vfs.get_dbv(rem) self.hub.up2k.hash_file( vfs.realpath, + vfs.vpath, vfs.flags, rem, fn, "1.7.6.2", time.time(), + "", ) def _rename(self, vp1: str, vp2: str) -> None: diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 8ba5cbd8..c38faf99 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -48,6 +48,7 @@ from .util import ( rmdirs, rmdirs_up, runhook, + runihook, s2hms, s3dec, s3enc, @@ -122,6 +123,7 @@ class Up2k(object): self.flags: dict[str, dict[str, Any]] = {} self.droppable: dict[str, list[str]] = {} self.volstate: dict[str, str] = {} + self.vol_act: dict[str, float] = {} self.dupesched: dict[str, list[tuple[str, str, float]]] = {} self.snap_persist_interval = 300 # persist unfinished index every 5 min self.snap_discard_interval = 21600 # drop unfinished after 6 hours inactivity @@ -131,13 +133,17 @@ class Up2k(object): self.entags: dict[str, set[str]] = {} self.mtp_parsers: dict[str, dict[str, MParser]] = {} self.pending_tags: list[tuple[set[str], str, str, dict[str, Any]]] = [] - self.hashq: Queue[tuple[str, str, str, str, float]] = Queue() + self.hashq: Queue[tuple[str, str, str, str, str, float, str]] = Queue() self.tagq: Queue[tuple[str, str, str, str, str, float]] = Queue() self.tag_event = threading.Condition() self.n_hashq = 0 self.n_tagq = 0 self.mpool_used = False + self.xiu_ptn = re.compile(r"(?:^|,)i([0-9]+)") + self.xiu_busy = False # currently running hook + self.xiu_asleep = True # needs rescan_cond poke to schedule self + self.cur: dict[str, "sqlite3.Cursor"] = {} self.mem_cur = None self.sqlite_ver = None @@ -291,7 +297,7 @@ class Up2k(object): cooldown = now + 1 continue - cooldown = now + 5 + cooldown = now + 3 # self.log("SR", 5) if self.args.no_lifetime: @@ -300,6 +306,8 @@ class Up2k(object): # important; not deferred by db_act timeout = self._check_lifetimes() + timeout = min(timeout, now + self._check_xiu()) + with self.mutex: for vp, vol in sorted(self.asrv.vfs.all_vols.items()): maxage = vol.flags.get("scan") @@ -394,6 +402,85 @@ class Up2k(object): return timeout + def _check_xiu(self) -> float: + if self.xiu_busy: + return 2 + + ret = 9001 + for _, vol in sorted(self.asrv.vfs.all_vols.items()): + rp = vol.realpath + cur = self.cur.get(rp) + if not cur: + continue + + with self.mutex: + q = "select distinct c from iu" + cds = cur.execute(q).fetchall() + if not cds: + continue + + run_cds: list[int] = [] + for cd in sorted([x[0] for x in cds]): + delta = cd - (time.time() - self.vol_act[rp]) + if delta > 0: + ret = min(ret, delta) + break + + run_cds.append(cd) + + if run_cds: + self.xiu_busy = True + Daemon(self._run_xius, "xiu", (vol, run_cds)) + return 2 + + return ret + + def _run_xius(self, vol: VFS, cds: list[int]): + for cd in cds: + self._run_xiu(vol, cd) + + self.xiu_busy = False + self.xiu_asleep = True + + def _run_xiu(self, vol: VFS, cd: int): + rp = vol.realpath + cur = self.cur[rp] + + # t0 = time.time() + with self.mutex: + q = "select w,rd,fn from iu where c={} limit 80386" + wrfs = cur.execute(q.format(cd)).fetchall() + if not wrfs: + return + + # dont wanna rebox so use format instead of prepared + q = "delete from iu where w=? and +rd=? and +fn=? and +c={}" + cur.executemany(q.format(cd), wrfs) + cur.connection.commit() + + q = "select * from up where substr(w,1,16)=? and +rd=? and +fn=?" + ups = [] + for wrf in wrfs: + try: + # almost definitely exists; don't care if it doesn't + ups.append(cur.execute(q, wrf).fetchone()) + except: + pass + + # t1 = time.time() + # self.log("mapped {} warks in {:.3f} sec".format(len(wrfs), t1 - t0)) + # "mapped 10989 warks in 0.126 sec" + + cmds = self.flags[rp]["xiu"] + for cmd in cmds: + m = self.xiu_ptn.search(cmd) + ccd = int(m.group(1)) if m else 5 + if ccd != cd: + continue + + self.log("xiu: {}# {}".format(len(wrfs), cmd)) + runihook(self.log, cmd, vol, ups) + def _vis_job_progress(self, job: dict[str, Any]) -> str: perc = 100 - (len(job["need"]) * 100.0 / len(job["hash"])) path = djoin(job["ptop"], job["prel"], job["name"]) @@ -710,6 +797,7 @@ class Up2k(object): self.log("\n".join(ta)) self.flags[ptop] = flags + self.vol_act[ptop] = 0.0 self.registry[ptop] = reg self.droppable[ptop] = drp or [] self.regdrop(ptop, "") @@ -1010,7 +1098,8 @@ class Up2k(object): wark = up2k_wark_from_hashlist(self.salt, sz, hashes) - self.db_add(db.c, wark, rd, fn, lmod, sz, "", 0) + # skip upload hooks by not providing vflags + self.db_add(db.c, {}, rd, fn, lmod, sz, "", "", wark, "", "", "", 0) db.n += 1 ret += 1 td = time.time() - db.t @@ -1872,6 +1961,7 @@ class Up2k(object): if ver == DB_VER: try: + self._add_xiu_tab(cur) self._add_dhash_tab(cur) except: pass @@ -1965,6 +2055,7 @@ class Up2k(object): cur.execute(cmd) self._add_dhash_tab(cur) + self._add_xiu_tab(cur) self.log("created DB at {}".format(db_path)) return cur @@ -1990,6 +2081,18 @@ class Up2k(object): cur.connection.commit() + def _add_xiu_tab(self, cur: "sqlite3.Cursor") -> None: + # v5a -> v5b + # store rd+fn rather than warks to support nohash vols + for cmd in [ + r"create table iu (c int, w text, rd text, fn text)", + r"create index iu_c on iu(c)", + r"create index iu_w on iu(w)", + ]: + cur.execute(cmd) + + cur.connection.commit() + def _job_volchk(self, cj: dict[str, Any]) -> None: if not self.register_vpath(cj["ptop"], cj["vcfg"]): if cj["ptop"] not in self.registry: @@ -2009,11 +2112,12 @@ class Up2k(object): with self.mutex: self._job_volchk(cj) + ptop = cj["ptop"] cj["name"] = sanitize_fn(cj["name"], "", [".prologue.html", ".epilogue.html"]) - cj["poke"] = now = self.db_act = time.time() + cj["poke"] = now = self.db_act = self.vol_act[ptop] = time.time() wark = self._get_wark(cj) job = None - pdir = djoin(cj["ptop"], cj["prel"]) + pdir = djoin(ptop, cj["prel"]) try: dev = bos.stat(pdir).st_dev except: @@ -2024,7 +2128,6 @@ class Up2k(object): sprs = self.fstab.get(pdir) != "ng" with self.mutex: - ptop = cj["ptop"] jcur = self.cur.get(ptop) reg = self.registry[ptop] vfs = self.asrv.vfs.all_vols[cj["vtop"]] @@ -2161,7 +2264,7 @@ class Up2k(object): raise Pebkac(422, err) - elif "nodupe" in self.flags[cj["ptop"]]: + elif "nodupe" in vfs.flags: self.log("dupe-reject:\n {0}\n {1}".format(src, dst)) err = "upload rejected, file already exists:\n" err += "/" + quotep(vsrc) + " " @@ -2170,6 +2273,7 @@ class Up2k(object): # symlink to the client-provided name, # returning the previous upload info job = deepcopy(job) + job["wark"] = wark for k in "ptop vtop prel addr".split(): job[k] = cj[k] @@ -2182,6 +2286,24 @@ class Up2k(object): job["name"] = self._untaken(pdir, cj, now) dst = djoin(job["ptop"], job["prel"], job["name"]) + xbu = vfs.flags.get("xbu") + if xbu and not runhook( + self.log, + xbu, # type: ignore + dst, + job["vtop"], + job.get("host") or "", + job.get("user") or "", + job["lmod"], + job["size"], + job["addr"], + job.get("at") or 0, + "", + ): + t = "upload blocked by xbu server config: {}".format(dst) + self.log(t, 1) + raise Pebkac(403, t) + if not self.args.nw: try: dvf = self.flags[job["ptop"]] @@ -2193,9 +2315,10 @@ class Up2k(object): raise if cur: - a = [job[x] for x in "prel name lmod size addr".split()] + zs = "prel name lmod size ptop vtop wark host user addr" + a = [job[x] for x in zs.split()] a += [job.get("at") or time.time()] - self.db_add(cur, wark, *a) + self.db_add(cur, vfs.flags, *a) cur.connection.commit() if not job: @@ -2376,7 +2499,7 @@ class Up2k(object): self, ptop: str, wark: str, chash: str ) -> tuple[int, list[int], str, float, bool]: with self.mutex: - self.db_act = time.time() + self.db_act = self.vol_act[ptop] = time.time() job = self.registry[ptop].get(wark) if not job: known = " ".join([x for x in self.registry[ptop].keys()]) @@ -2427,7 +2550,7 @@ class Up2k(object): def confirm_chunk(self, ptop: str, wark: str, chash: str) -> tuple[int, str]: with self.mutex: - self.db_act = time.time() + self.db_act = self.vol_act[ptop] = time.time() try: job = self.registry[ptop][wark] pdir = djoin(job["ptop"], job["prel"]) @@ -2462,7 +2585,6 @@ class Up2k(object): self._finish_upload(ptop, wark) def _finish_upload(self, ptop: str, wark: str) -> None: - self.db_act = time.time() try: job = self.registry[ptop][wark] pdir = djoin(job["ptop"], job["prel"]) @@ -2475,24 +2597,7 @@ class Up2k(object): atomic_move(src, dst) upt = job.get("at") or time.time() - xau = self.flags[ptop].get("xau") - if xau and not runhook( - self.log, - xau, - dst, - djoin(job["vtop"], job["prel"], job["name"]), - job["host"], - job["user"], - job["addr"], - upt, - job["size"], - "", - ): - t = "upload blocked by xau" - self.log(t, 1) - bos.unlink(dst) - self.registry[ptop].pop(wark, None) - raise Pebkac(403, t) + vflags = self.flags[ptop] times = (int(time.time()), int(job["lmod"])) if ANYWIN: @@ -2504,7 +2609,8 @@ class Up2k(object): except: pass - z2 = [job[x] for x in "ptop wark prel name lmod size addr".split()] + zs = "prel name lmod size ptop vtop wark host user addr" + z2 = [job[x] for x in zs.split()] wake_sr = False try: flt = job["life"] @@ -2519,7 +2625,7 @@ class Up2k(object): pass z2 += [upt] - if self.idx_wark(*z2): + if self.idx_wark(vflags, *z2): del self.registry[ptop][wark] else: self.regdrop(ptop, wark) @@ -2541,7 +2647,7 @@ class Up2k(object): self._symlink(dst, d2, self.flags[ptop], lmod=lmod) if cur: self.db_rm(cur, rd, fn) - self.db_add(cur, wark, rd, fn, *z2[-4:]) + self.db_add(cur, vflags, rd, fn, lmod, *z2[3:]) if cur: cur.connection.commit() @@ -2563,12 +2669,16 @@ class Up2k(object): def idx_wark( self, - ptop: str, - wark: str, + vflags: dict[str, Any], rd: str, fn: str, lmod: float, sz: int, + ptop: str, + vtop: str, + wark: str, + host: str, + usr: str, ip: str, at: float, ) -> bool: @@ -2576,9 +2686,12 @@ class Up2k(object): if not cur: return False + self.db_act = self.vol_act[ptop] = time.time() try: self.db_rm(cur, rd, fn) - self.db_add(cur, wark, rd, fn, lmod, sz, ip, at) + self.db_add( + cur, vflags, rd, fn, lmod, sz, ptop, vtop, wark, host, usr, ip, at + ) cur.connection.commit() except Exception as ex: x = self.register_vpath(ptop, {}) @@ -2603,11 +2716,16 @@ class Up2k(object): def db_add( self, db: "sqlite3.Cursor", - wark: str, + vflags: dict[str, Any], rd: str, fn: str, ts: float, sz: int, + ptop: str, + vtop: str, + wark: str, + host: str, + usr: str, ip: str, at: float, ) -> None: @@ -2621,6 +2739,49 @@ class Up2k(object): v = (wark, int(ts), sz, rd, fn, ip or "", int(at or 0)) db.execute(sql, v) + xau = vflags.get("xau") + dst = djoin(ptop, rd, fn) + if xau and not runhook( + self.log, + xau, + dst, + djoin(vtop, rd, fn), + host, + usr, + int(ts), + sz, + ip, + at or time.time(), + "", + ): + t = "upload blocked by xau server config" + self.log(t, 1) + bos.unlink(dst) + self.registry[ptop].pop(wark, None) + raise Pebkac(403, t) + + xiu = vflags.get("xiu") + if xiu: + cds: set[int] = set() + for cmd in xiu: + m = self.xiu_ptn.search(cmd) + cds.add(int(m.group(1)) if m else 5) + + q = "insert into iu values (?,?,?,?)" + for cd in cds: + # one for each unique cooldown duration + try: + db.execute(q, (cd, wark[:16], rd, fn)) + except: + assert self.mem_cur + rd, fn = s3enc(self.mem_cur, rd, fn) + db.execute(q, (cd, wark[:16], rd, fn)) + + if self.xiu_asleep: + self.xiu_asleep = False + with self.rescan_cond: + self.rescan_cond.notify_all() + def handle_rm(self, uname: str, ip: str, vpaths: list[str], lim: list[int]) -> str: n_files = 0 ok = {} @@ -2678,6 +2839,7 @@ class Up2k(object): ptop = vn.realpath atop = vn.canonical(rem, False) + self.vol_act[ptop] = self.db_act adir, fn = os.path.split(atop) try: st = bos.lstat(atop) @@ -2711,18 +2873,31 @@ class Up2k(object): self.log("hit delete limit of {} files".format(lim[1]), 3) break - n_files += 1 abspath = djoin(adir, fn) volpath = "{}/{}".format(vrem, fn).strip("/") vpath = "{}/{}".format(dbv.vpath, volpath).strip("/") self.log("rm {}\n {}".format(vpath, abspath)) _ = dbv.get(volpath, uname, *permsets[0]) - if xbd and not runhook( - self.log, xbd, abspath, vpath, "", uname, "", 0, 0, "" - ): - self.log("delete blocked by xbd: {}".format(abspath), 1) - continue + if xbd: + st = bos.stat(abspath) + if not runhook( + self.log, + xbd, + abspath, + vpath, + "", + uname, + st.st_mtime, + st.st_size, + ip, + 0, + "", + ): + t = "delete blocked by xbd server config: {}" + self.log(t.format(abspath), 1) + continue + n_files += 1 with self.mutex: cur = None try: @@ -2735,7 +2910,7 @@ class Up2k(object): bos.unlink(abspath) if xad: - runhook(self.log, xad, abspath, vpath, "", uname, "", 0, 0, "") + runhook(self.log, xad, abspath, vpath, "", uname, 0, 0, ip, 0, "") ok: list[str] = [] ng: list[str] = [] @@ -2747,11 +2922,11 @@ class Up2k(object): return n_files, ok + ok2, ng + ng2 def handle_mv(self, uname: str, svp: str, dvp: str) -> str: - self.db_act = time.time() svn, srem = self.asrv.vfs.get(svp, uname, True, False, True) svn, srem = svn.get_dbv(srem) sabs = svn.canonical(srem, False) curs: set["sqlite3.Cursor"] = set() + self.db_act = self.vol_act[svn.realpath] = time.time() if not srem: raise Pebkac(400, "mv: cannot move a mountpoint") @@ -2787,7 +2962,7 @@ class Up2k(object): with self.mutex: try: for fn in files: - self.db_act = time.time() + self.db_act = self.vol_act[dbv.realpath] = time.time() svpf = "/".join(x for x in [dbv.vpath, vrem, fn[0]] if x) if not svpf.startswith(svp + "/"): # assert raise Pebkac(500, "mv: bug at {}, top {}".format(svpf, svp)) @@ -2830,10 +3005,14 @@ class Up2k(object): xbr = svn.flags.get("xbr") xar = dvn.flags.get("xar") - if xbr and not runhook(self.log, xbr, sabs, svp, "", uname, "", 0, 0, ""): - t = "move blocked by xbr: {}".format(svp) - self.log(t, 1) - raise Pebkac(405, t) + if xbr: + st = bos.stat(sabs) + if not runhook( + self.log, xbr, sabs, svp, "", uname, st.st_mtime, st.st_size, "", 0, "" + ): + t = "move blocked by xbr server config: {}".format(svp) + self.log(t, 1) + raise Pebkac(405, t) bos.makedirs(os.path.dirname(dabs)) @@ -2852,7 +3031,7 @@ class Up2k(object): self.rescan_cond.notify_all() if xar: - runhook(self.log, xar, dabs, dvp, "", uname, "", 0, 0, "") + runhook(self.log, xar, dabs, dvp, "", uname, 0, 0, "", 0, "") return "k" @@ -2893,13 +3072,27 @@ class Up2k(object): curs.add(c1) if c2: - self.db_add(c2, w, drd, dfn, ftime, fsize, ip or "", at or 0) + self.db_add( + c2, + {}, # skip upload hooks + drd, + dfn, + ftime, + fsize, + dvn.realpath, + dvn.vpath, + w, + "", + "", + ip or "", + at or 0, + ) curs.add(c2) else: self.log("not found in src db: [{}]".format(svp)) if xar: - runhook(self.log, xar, dabs, dvp, "", uname, "", 0, 0, "") + runhook(self.log, xar, dabs, dvp, "", uname, 0, 0, "", 0, "") return "k" @@ -3131,9 +3324,10 @@ class Up2k(object): vp_chk, job["host"], job["user"], - job["addr"], - job["t0"], + int(job["lmod"]), job["size"], + job["addr"], + int(job["t0"]), "", ): t = "upload blocked by xbu: {}".format(vp_chk) @@ -3373,7 +3567,7 @@ class Up2k(object): self.n_hashq -= 1 # self.log("hashq {}".format(self.n_hashq)) - ptop, rd, fn, ip, at = self.hashq.get() + ptop, vtop, rd, fn, ip, at, usr = self.hashq.get() # self.log("hashq {} pop {}/{}/{}".format(self.n_hashq, ptop, rd, fn)) if "e2d" not in self.flags[ptop]: continue @@ -3393,18 +3587,39 @@ class Up2k(object): wark = up2k_wark_from_hashlist(self.salt, inf.st_size, hashes) with self.mutex: - self.idx_wark(ptop, wark, rd, fn, inf.st_mtime, inf.st_size, ip, at) + self.idx_wark( + self.flags[ptop], + rd, + fn, + inf.st_mtime, + inf.st_size, + ptop, + vtop, + wark, + "", + usr, + ip, + at, + ) if at and time.time() - at > 30: with self.rescan_cond: self.rescan_cond.notify_all() def hash_file( - self, ptop: str, flags: dict[str, Any], rd: str, fn: str, ip: str, at: float + self, + ptop: str, + vtop: str, + flags: dict[str, Any], + rd: str, + fn: str, + ip: str, + at: float, + usr: str, ) -> None: with self.mutex: self.register_vpath(ptop, flags) - self.hashq.put((ptop, rd, fn, ip, at)) + self.hashq.put((ptop, vtop, rd, fn, ip, at, usr)) self.n_hashq += 1 # self.log("hashq {} push {}/{}/{}".format(self.n_hashq, ptop, rd, fn)) diff --git a/copyparty/util.py b/copyparty/util.py index f048bc56..ab193183 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -1849,6 +1849,14 @@ def _msaenc(txt: str) -> bytes: return txt.replace("/", "\\").encode(FS_ENCODING, "surrogateescape") +def _uncify(txt: str) -> str: + txt = txt.replace("/", "\\") + if ":" not in txt and not txt.startswith("\\\\"): + txt = absreal(txt) + + return txt if txt.startswith("\\\\") else "\\\\?\\" + txt + + def _msenc(txt: str) -> bytes: txt = txt.replace("/", "\\") if ":" not in txt and not txt.startswith("\\\\"): @@ -1877,9 +1885,11 @@ if not PY2 and WINDOWS: afsenc = _msaenc fsenc = _msenc fsdec = _msdec + uncify = _uncify elif not PY2 or not WINDOWS: fsenc = afsenc = sfsenc = w8enc fsdec = w8dec + uncify = str else: # moonrunes become \x3f with bytestrings, # losing mojibake support is worth @@ -1891,6 +1901,7 @@ else: fsenc = afsenc = sfsenc = _not_actually_mbcs_enc fsdec = _not_actually_mbcs_dec + uncify = str def s3enc(mem_cur: "sqlite3.Cursor", rd: str, fn: str) -> tuple[str, str]: @@ -2512,23 +2523,14 @@ def retchk( raise Exception(t) -def _runhook( - log: "NamedLogger", - cmd: str, - ap: str, - vp: str, - host: str, - uname: str, - ip: str, - at: float, - sz: int, - txt: str, -) -> bool: +def _parsehook( + log: Optional["NamedLogger"], cmd: str +) -> tuple[bool, bool, bool, float, dict[str, Any], str]: chk = False fork = False jtxt = False - wait = 0 - tout = 0 + wait = 0.0 + tout = 0.0 kill = "t" cap = 0 ocmd = cmd @@ -2548,9 +2550,11 @@ def _runhook( cap = int(arg[1:]) # 0=none 1=stdout 2=stderr 3=both elif arg.startswith("k"): kill = arg[1:] # [t]ree [m]ain [n]one + elif arg.startswith("i"): + pass else: t = "hook: invalid flag {} in {}" - log(t.format(arg, ocmd)) + (log or print)(t.format(arg, ocmd)) env = os.environ.copy() try: @@ -2565,22 +2569,92 @@ def _runhook( if not EXE: raise - ka = { + sp_ka = { "env": env, "timeout": tout, "kill": kill, "capture": cap, } + if cmd.startswith("~"): + cmd = os.path.expanduser(cmd) + + return chk, fork, jtxt, wait, sp_ka, cmd + + +def runihook( + log: Optional["NamedLogger"], + cmd: str, + vol: "VFS", + ups: list[tuple[str, int, int, str, str, str, int]], +) -> bool: + ocmd = cmd + chk, fork, jtxt, wait, sp_ka, cmd = _parsehook(log, cmd) + bcmd = [sfsenc(cmd)] + if cmd.endswith(".py"): + bcmd = [sfsenc(pybin)] + bcmd + + vps = [vjoin(*list(s3dec(x[3], x[4]))) for x in ups] + aps = [djoin(vol.realpath, x) for x in vps] + if jtxt: + # 0w 1mt 2sz 3rd 4fn 5ip 6at + ja = [ + { + "ap": uncify(ap), # utf8 for json + "vp": vp, + "wark": x[0][:16], + "mt": x[1], + "sz": x[2], + "ip": x[5], + "at": x[6], + } + for x, vp, ap in zip(ups, vps, aps) + ] + sp_ka["sin"] = json.dumps(ja).encode("utf-8", "replace") + else: + sp_ka["sin"] = b"\n".join(fsenc(x) for x in aps) + + t0 = time.time() + if fork: + Daemon(runcmd, ocmd, [bcmd], ka=sp_ka) + else: + rc, v, err = runcmd(bcmd, **sp_ka) # type: ignore + if chk and rc: + retchk(rc, bcmd, err, log, 5) + return False + + wait -= time.time() - t0 + if wait > 0: + time.sleep(wait) + + return True + + +def _runhook( + log: Optional["NamedLogger"], + cmd: str, + ap: str, + vp: str, + host: str, + uname: str, + mt: float, + sz: int, + ip: str, + at: float, + txt: str, +) -> bool: + ocmd = cmd + chk, fork, jtxt, wait, sp_ka, cmd = _parsehook(log, cmd) if jtxt: ja = { "ap": ap, "vp": vp, + "mt": mt, + "sz": sz, "ip": ip, + "at": at or time.time(), "host": host, "user": uname, - "at": at or time.time(), - "sz": sz, "txt": txt, } arg = json.dumps(ja) @@ -2595,9 +2669,9 @@ def _runhook( t0 = time.time() if fork: - Daemon(runcmd, ocmd, [acmd], ka=ka) + Daemon(runcmd, ocmd, [bcmd], ka=sp_ka) else: - rc, v, err = runcmd(bcmd, **ka) # type: ignore + rc, v, err = runcmd(bcmd, **sp_ka) # type: ignore if chk and rc: retchk(rc, bcmd, err, log, 5) return False @@ -2610,24 +2684,25 @@ def _runhook( def runhook( - log: "NamedLogger", + log: Optional["NamedLogger"], cmds: list[str], ap: str, vp: str, host: str, uname: str, + mt: float, + sz: int, ip: str, at: float, - sz: int, txt: str, ) -> bool: vp = vp.replace("\\", "/") for cmd in cmds: try: - if not _runhook(log, cmd, ap, vp, host, uname, ip, at, sz, txt): + if not _runhook(log, cmd, ap, vp, host, uname, mt, sz, ip, at, txt): return False except Exception as ex: - log("hook: {}".format(ex)) + (log or print)("hook: {}".format(ex)) if ",c," in "," + cmd: return False break diff --git a/tests/util.py b/tests/util.py index 45781501..ad88103a 100644 --- a/tests/util.py +++ b/tests/util.py @@ -113,7 +113,7 @@ class Cfg(Namespace): ex = "doctitle favico html_head lg_sbf log_fk md_sbf mth textfiles R RS SR" ka.update(**{k: "" for k in ex.split()}) - ex = "xad xar xau xbd xbr xbu xm" + ex = "xad xar xau xbd xbr xbu xiu xm" ka.update(**{k: [] for k in ex.split()}) super(Cfg, self).__init__(