From d9db1534b12bedd1227f4c7804452acb74922d71 Mon Sep 17 00:00:00 2001 From: ed Date: Wed, 22 Jan 2025 21:18:42 +0000 Subject: [PATCH] hooks: send zeromq/zmq/0mq messages adds an optional dependency on pyzmq --- README.md | 29 ++++- bin/zmq-recv.py | 71 +++++++++++ contrib/package/arch/PKGBUILD | 1 + contrib/package/nix/copyparty/default.nix | 6 +- copyparty/svchub.py | 2 + copyparty/util.py | 137 ++++++++++++++++++++-- docs/devnotes.md | 1 + pyproject.toml | 1 + scripts/docker/Dockerfile.ac | 2 +- scripts/docker/Dockerfile.dj | 3 +- scripts/docker/Dockerfile.iv | 3 +- setup.py | 1 + 12 files changed, 240 insertions(+), 17 deletions(-) create mode 100755 bin/zmq-recv.py diff --git a/README.md b/README.md index be859ba3..704d5624 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,7 @@ turn almost any device into a file server with resumable uploads/downloads using * [metadata from audio files](#metadata-from-audio-files) - set `-e2t` to index tags on upload * [file parser plugins](#file-parser-plugins) - provide custom parsers to index additional tags * [event hooks](#event-hooks) - trigger a program on uploads, renames etc ([examples](./bin/hooks/)) + * [zeromq](#zeromq) - event-hooks can send zeromq messages * [upload events](#upload-events) - the older, more powerful approach ([examples](./bin/mtag/)) * [handlers](#handlers) - redefine behavior with plugins ([examples](./bin/handlers/)) * [ip auth](#ip-auth) - autologin based on IP range (CIDR) @@ -1458,6 +1459,23 @@ there's a bunch of flags and stuff, see `--help-hooks` if you want to write your own hooks, see [devnotes](./docs/devnotes.md#event-hooks) +### zeromq + +event-hooks can send zeromq messages instead of running programs + +to send a 0mq message every time a file is uploaded, + +* `--xau zmq:pub:tcp://*:5556` sends a PUB to any/all connected SUB clients +* `--xau t3,zmq:push:tcp://*:5557` sends a PUSH to exactly one connected PULL client +* `--xau t3,j,zmq:req:tcp://localhost:5555` sends a REQ to the connected REP client + +the PUSH and REQ examples have `t3` (timeout after 3 seconds) because they block if there's no clients to talk to + +* the REQ example does `t3,j` to send extended upload-info as json instead of just the filesystem-path + +see [zmq-recv.py](https://github.com/9001/copyparty/blob/hovudstraum/bin/zmq-recv.py) if you need something to receive the messages with + + ### upload events the older, more powerful approach ([examples](./bin/mtag/)): @@ -2308,13 +2326,13 @@ mandatory deps: install these to enable bonus features -enable hashed passwords in config: `argon2-cffi` +enable [hashed passwords](#password-hashing) in config: `argon2-cffi` -enable ftp-server: +enable [ftp-server](#ftp-server): * for just plaintext FTP, `pyftpdlib` (is built into the SFX) * with TLS encryption, `pyftpdlib pyopenssl` -enable music tags: +enable [music tags](#metadata-from-audio-files): * either `mutagen` (fast, pure-python, skips a few tags, makes copyparty GPL? idk) * or `ffprobe` (20x slower, more accurate, possibly dangerous depending on your distro and users) @@ -2325,8 +2343,9 @@ enable [thumbnails](#thumbnails) of... * **AVIF pictures:** `pyvips` or `ffmpeg` or `pillow-avif-plugin` * **JPEG XL pictures:** `pyvips` or `ffmpeg` -enable [smb](#smb-server) support (**not** recommended): -* `impacket==0.12.0` +enable sending [zeromq messages](#zeromq) from event-hooks: `pyzmq` + +enable [smb](#smb-server) support (**not** recommended): `impacket==0.12.0` `pyvips` gives higher quality thumbnails than `Pillow` and is 320% faster, using 270% more ram: `sudo apt install libvips42 && python3 -m pip install --user -U pyvips` diff --git a/bin/zmq-recv.py b/bin/zmq-recv.py new file mode 100755 index 00000000..cbf0b5cf --- /dev/null +++ b/bin/zmq-recv.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 + +import sys +import zmq + +""" +zmq-recv.py: demo zmq receiver +2025-01-22, v1.0, ed , MIT-Licensed +https://github.com/9001/copyparty/blob/hovudstraum/bin/zmq-recv.py + +basic zmq-server to receive events from copyparty; try one of +the below and then "send a message to serverlog" in the web-ui: + +1) dumb fire-and-forget to any and all listeners; +run this script with "sub" and run copyparty with this: + --xm zmq:pub:tcp://*:5556 + +2) one lucky listener gets the message, blocks if no listeners: +run this script with "pull" and run copyparty with this: + --xm t3,zmq:push:tcp://*:5557 + +3) blocking syn/ack mode, client must ack each message; +run this script with "rep" and run copyparty with this: + --xm t3,zmq:req:tcp://localhost:5555 + +""" + + +ctx = zmq.Context() + + +def sub_server(): + # PUB/SUB allows any number of servers/clients, and + # messages are fire-and-forget + sck = ctx.socket(zmq.SUB) + sck.connect("tcp://localhost:5556") + sck.setsockopt_string(zmq.SUBSCRIBE, "") + while True: + print("copyparty says %r" % (sck.recv_string(),)) + + +def pull_server(): + # PUSH/PULL allows any number of servers/clients, and + # each message is sent to a exactly one PULL client + sck = ctx.socket(zmq.PULL) + sck.connect("tcp://localhost:5557") + while True: + print("copyparty says %r" % (sck.recv_string(),)) + + +def rep_server(): + # REP/REQ is a server/client pair where each message must be + # acked by the other before another message can be sent, so + # copyparty will do a blocking-wait for the ack + sck = ctx.socket(zmq.REP) + sck.bind("tcp://*:5555") + while True: + print("copyparty says %r" % (sck.recv_string(),)) + sck.send(b"thx") + + +mode = sys.argv[1].lower() if len(sys.argv) > 1 else "" + +if mode == "sub": + sub_server() +elif mode == "pull": + pull_server() +elif mode == "rep": + rep_server() +else: + print("specify mode as first argument: SUB | PULL | REP") diff --git a/contrib/package/arch/PKGBUILD b/contrib/package/arch/PKGBUILD index a0cd6cb0..7621d19a 100644 --- a/contrib/package/arch/PKGBUILD +++ b/contrib/package/arch/PKGBUILD @@ -16,6 +16,7 @@ optdepends=("ffmpeg: thumbnails for videos, images (slower) and audio, music tag "libkeyfinder-git: detection of musical keys" "qm-vamp-plugins: BPM detection" "python-pyopenssl: ftps functionality" + "python-pyzmq: send zeromq messages from event-hooks" "python-argon2-cffi: hashed passwords in config" "python-impacket-git: smb support (bad idea)" ) diff --git a/contrib/package/nix/copyparty/default.nix b/contrib/package/nix/copyparty/default.nix index 9380a1e5..140fedfb 100644 --- a/contrib/package/nix/copyparty/default.nix +++ b/contrib/package/nix/copyparty/default.nix @@ -1,4 +1,4 @@ -{ lib, stdenv, makeWrapper, fetchurl, utillinux, python, jinja2, impacket, pyftpdlib, pyopenssl, argon2-cffi, pillow, pyvips, ffmpeg, mutagen, +{ lib, stdenv, makeWrapper, fetchurl, utillinux, python, jinja2, impacket, pyftpdlib, pyopenssl, argon2-cffi, pillow, pyvips, pyzmq, ffmpeg, mutagen, # use argon2id-hashed passwords in config files (sha2 is always available) withHashedPasswords ? true, @@ -21,6 +21,9 @@ withMediaProcessing ? true, # if MediaProcessing is not enabled, you probably want this instead (less accurate, but much safer and faster) withBasicAudioMetadata ? false, +# send ZeroMQ messages from event-hooks +withZeroMQ ? true, + # enable FTPS support in the FTP server withFTPS ? false, @@ -43,6 +46,7 @@ let ++ lib.optional withMediaProcessing ffmpeg ++ lib.optional withBasicAudioMetadata mutagen ++ lib.optional withHashedPasswords argon2-cffi + ++ lib.optional withZeroMQ pyzmq ); in stdenv.mkDerivation { pname = "copyparty"; diff --git a/copyparty/svchub.py b/copyparty/svchub.py index 01b8b781..3be52ce7 100644 --- a/copyparty/svchub.py +++ b/copyparty/svchub.py @@ -50,6 +50,7 @@ from .util import ( FFMPEG_URL, HAVE_PSUTIL, HAVE_SQLITE3, + HAVE_ZMQ, URL_BUG, UTC, VERSIONS, @@ -641,6 +642,7 @@ class SvcHub(object): (HAVE_FFPROBE, "ffprobe", t_ff + ", read audio/media tags"), (HAVE_MUTAGEN, "mutagen", "read audio tags (ffprobe is better but slower)"), (HAVE_ARGON2, "argon2", "secure password hashing (advanced users only)"), + (HAVE_ZMQ, "pyzmq", "send zeromq messages from event-hooks"), (HAVE_HEIF, "pillow-heif", "read .heif images with pillow (rarely useful)"), (HAVE_AVIF, "pillow-avif", "read .avif images with pillow (rarely useful)"), ] diff --git a/copyparty/util.py b/copyparty/util.py index aafe224b..68b18c1f 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -120,6 +120,13 @@ try: except: HAVE_SQLITE3 = False +try: + import importlib.util + + HAVE_ZMQ = bool(importlib.util.find_spec("zmq")) +except: + HAVE_ZMQ = False + try: if os.environ.get("PRTY_NO_PSUTIL"): raise Exception() @@ -3437,15 +3444,118 @@ def runihook( retchk(rc, bcmd, err, log, 5) return False - wait -= time.time() - t0 - if wait > 0: - time.sleep(wait) + if wait: + wait -= time.time() - t0 + if wait > 0: + time.sleep(wait) return True +ZMQ = {} +ZMQ_DESC = { + "pub": "fire-and-forget to all/any connected SUB-clients", + "push": "fire-and-forget to one of the connected PULL-clients", + "req": "send messages to a REP-server and blocking-wait for ack", +} + + +def _zmq_hook( + log: Optional["NamedLogger"], + verbose: bool, + src: str, + cmd: str, + msg: str, + wait: float, + sp_ka: dict[str, Any], +) -> str: + import zmq + + try: + mtx = ZMQ["mtx"] + except: + ZMQ["mtx"] = threading.Lock() + time.sleep(0.1) + mtx = ZMQ["mtx"] + + ret = "" + t0 = time.time() + if verbose and log: + log("hook(%s) %r entering zmq-main-lock" % (src, cmd), 6) + + with mtx: + try: + mode, sck, mtx = ZMQ[cmd] + except: + mode, uri = cmd.split(":", 1) + try: + desc = ZMQ_DESC[mode] + if log: + t = "libzmq(%s) pyzmq(%s) init(%s); %s" + log(t % (zmq.zmq_version(), zmq.__version__, cmd, desc)) + except: + raise Exception("the only supported ZMQ modes are REQ PUB PUSH") + + try: + ctx = ZMQ["ctx"] + except: + ctx = ZMQ["ctx"] = zmq.Context() + + timeout = sp_ka["timeout"] + + if mode == "pub": + sck = ctx.socket(zmq.PUB) + sck.bind(uri) + time.sleep(1) # give clients time to connect; avoids losing first msg + elif mode == "push": + sck = ctx.socket(zmq.PUSH) + sck.bind(uri) + if timeout: + sck.SNDTIMEO = int(timeout * 1000) + elif mode == "req": + sck = ctx.socket(zmq.REQ) + sck.connect(uri) + if timeout: + sck.RCVTIMEO = int(timeout * 1000) + else: + raise Exception() + + mtx = threading.Lock() + ZMQ[cmd] = (mode, sck, mtx) + + if verbose and log: + log("hook(%s) %r entering socket-lock" % (src, cmd), 6) + + with mtx: + if verbose and log: + log("hook(%s) %r sending |%d|" % (src, cmd, len(msg)), 6) + + sck.send_string(msg) # PUSH can safely timeout here + + if mode == "req": + if verbose and log: + log("hook(%s) %r awaiting ack from req" % (src, cmd), 6) + try: + ret = sck.recv().decode("utf-8", "replace") + except: + sck.close() + del ZMQ[cmd] # bad state; must reset + raise Exception("ack timeout; zmq socket killed") + + if ret and log: + log("hook(%s) %r ACK: %r" % (src, cmd, ret), 6) + + if wait: + wait -= time.time() - t0 + if wait > 0: + time.sleep(wait) + + return ret + + def _runhook( log: Optional["NamedLogger"], + verbose: bool, src: str, cmd: str, ap: str, @@ -3486,6 +3596,15 @@ def _runhook( else: arg = txt or ap + if acmd[0].startswith("zmq:"): + zs = "zmq-error" + try: + zs = _zmq_hook(log, verbose, src, acmd[0][4:].lower(), arg, wait, sp_ka) + except Exception as ex: + if log: + log("zeromq failed: %r" % (ex,)) + return {"rc": 0, "stdout": zs} + acmd += [arg] if acmd[0].endswith(".py"): acmd = [pybin] + acmd @@ -3514,9 +3633,10 @@ def _runhook( except: ret = {"rc": rc, "stdout": v} - wait -= time.time() - t0 - if wait > 0: - time.sleep(wait) + if wait: + wait -= time.time() - t0 + if wait > 0: + time.sleep(wait) return ret @@ -3540,14 +3660,15 @@ def runhook( ) -> dict[str, Any]: assert broker or up2k # !rm args = (broker or up2k).args + verbose = args.hook_v vp = vp.replace("\\", "/") ret = {"rc": 0} for cmd in cmds: try: hr = _runhook( - log, src, cmd, ap, vp, host, uname, perms, mt, sz, ip, at, txt + log, verbose, src, cmd, ap, vp, host, uname, perms, mt, sz, ip, at, txt ) - if log and args.hook_v: + if verbose and log: log("hook(%s) %r => \033[32m%s" % (src, cmd, hr), 6) if not hr: return {} diff --git a/docs/devnotes.md b/docs/devnotes.md index 2d9197ae..ffbdb8ca 100644 --- a/docs/devnotes.md +++ b/docs/devnotes.md @@ -342,6 +342,7 @@ python3 -m venv .venv . .venv/bin/activate pip install jinja2 strip_hints # MANDATORY pip install argon2-cffi # password hashing +pip install pyzmq # send 0mq from hooks pip install mutagen # audio metadata pip install pyftpdlib # ftp server pip install partftpy # tftp server diff --git a/pyproject.toml b/pyproject.toml index d673fa37..e16ed9e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,7 @@ ftpd = ["pyftpdlib"] ftps = ["pyftpdlib", "pyopenssl"] tftpd = ["partftpy>=0.4.0"] pwhash = ["argon2-cffi"] +zeromq = ["pyzmq"] [project.scripts] copyparty = "copyparty.__main__:main" diff --git a/scripts/docker/Dockerfile.ac b/scripts/docker/Dockerfile.ac index ba5fbbc4..e27bbd9f 100644 --- a/scripts/docker/Dockerfile.ac +++ b/scripts/docker/Dockerfile.ac @@ -9,7 +9,7 @@ ENV XDG_CONFIG_HOME=/cfg RUN apk --no-cache add !pyc \ tzdata wget \ - py3-jinja2 py3-argon2-cffi py3-pillow \ + py3-jinja2 py3-argon2-cffi py3-pyzmq py3-pillow \ ffmpeg COPY i/dist/copyparty-sfx.py innvikler.sh ./ diff --git a/scripts/docker/Dockerfile.dj b/scripts/docker/Dockerfile.dj index 9addcdf1..09e603b0 100644 --- a/scripts/docker/Dockerfile.dj +++ b/scripts/docker/Dockerfile.dj @@ -12,7 +12,8 @@ COPY i/bin/mtag/audio-bpm.py /mtag/ COPY i/bin/mtag/audio-key.py /mtag/ RUN apk add -U !pyc \ tzdata wget \ - py3-jinja2 py3-argon2-cffi py3-pillow py3-pip py3-cffi \ + py3-jinja2 py3-argon2-cffi py3-pyzmq py3-pillow \ + py3-pip py3-cffi \ ffmpeg \ vips-jxl vips-heif vips-poppler vips-magick \ py3-numpy fftw libsndfile \ diff --git a/scripts/docker/Dockerfile.iv b/scripts/docker/Dockerfile.iv index 6c8fdb2e..1accef7c 100644 --- a/scripts/docker/Dockerfile.iv +++ b/scripts/docker/Dockerfile.iv @@ -9,7 +9,8 @@ ENV XDG_CONFIG_HOME=/cfg RUN apk add -U !pyc \ tzdata wget \ - py3-jinja2 py3-argon2-cffi py3-pillow py3-pip py3-cffi \ + py3-jinja2 py3-argon2-cffi py3-pyzmq py3-pillow \ + py3-pip py3-cffi \ ffmpeg \ vips-jxl vips-heif vips-poppler vips-magick \ && apk add -t .bd \ diff --git a/setup.py b/setup.py index 1a2c4617..12b2aa48 100755 --- a/setup.py +++ b/setup.py @@ -144,6 +144,7 @@ args = { "ftps": ["pyftpdlib", "pyopenssl"], "tftpd": ["partftpy>=0.4.0"], "pwhash": ["argon2-cffi"], + "zeromq": ["pyzmq"], }, "entry_points": {"console_scripts": ["copyparty = copyparty.__main__:main"]}, "scripts": ["bin/partyfuse.py", "bin/u2c.py"],