hooks: send zeromq/zmq/0mq messages

adds an optional dependency on pyzmq
This commit is contained in:
ed 2025-01-22 21:18:42 +00:00
parent 6a0aaaf069
commit d9db1534b1
12 changed files with 240 additions and 17 deletions

View file

@ -80,6 +80,7 @@ turn almost any device into a file server with resumable uploads/downloads using
* [metadata from audio files](#metadata-from-audio-files) - set `-e2t` to index tags on upload
* [file parser plugins](#file-parser-plugins) - provide custom parsers to index additional tags
* [event hooks](#event-hooks) - trigger a program on uploads, renames etc ([examples](./bin/hooks/))
* [zeromq](#zeromq) - event-hooks can send zeromq messages
* [upload events](#upload-events) - the older, more powerful approach ([examples](./bin/mtag/))
* [handlers](#handlers) - redefine behavior with plugins ([examples](./bin/handlers/))
* [ip auth](#ip-auth) - autologin based on IP range (CIDR)
@ -1458,6 +1459,23 @@ there's a bunch of flags and stuff, see `--help-hooks`
if you want to write your own hooks, see [devnotes](./docs/devnotes.md#event-hooks)
### zeromq
event-hooks can send zeromq messages instead of running programs
to send a 0mq message every time a file is uploaded,
* `--xau zmq:pub:tcp://*:5556` sends a PUB to any/all connected SUB clients
* `--xau t3,zmq:push:tcp://*:5557` sends a PUSH to exactly one connected PULL client
* `--xau t3,j,zmq:req:tcp://localhost:5555` sends a REQ to the connected REP client
the PUSH and REQ examples have `t3` (timeout after 3 seconds) because they block if there's no clients to talk to
* the REQ example does `t3,j` to send extended upload-info as json instead of just the filesystem-path
see [zmq-recv.py](https://github.com/9001/copyparty/blob/hovudstraum/bin/zmq-recv.py) if you need something to receive the messages with
### upload events
the older, more powerful approach ([examples](./bin/mtag/)):
@ -2308,13 +2326,13 @@ mandatory deps:
install these to enable bonus features
enable hashed passwords in config: `argon2-cffi`
enable [hashed passwords](#password-hashing) in config: `argon2-cffi`
enable ftp-server:
enable [ftp-server](#ftp-server):
* for just plaintext FTP, `pyftpdlib` (is built into the SFX)
* with TLS encryption, `pyftpdlib pyopenssl`
enable music tags:
enable [music tags](#metadata-from-audio-files):
* either `mutagen` (fast, pure-python, skips a few tags, makes copyparty GPL? idk)
* or `ffprobe` (20x slower, more accurate, possibly dangerous depending on your distro and users)
@ -2325,8 +2343,9 @@ enable [thumbnails](#thumbnails) of...
* **AVIF pictures:** `pyvips` or `ffmpeg` or `pillow-avif-plugin`
* **JPEG XL pictures:** `pyvips` or `ffmpeg`
enable [smb](#smb-server) support (**not** recommended):
* `impacket==0.12.0`
enable sending [zeromq messages](#zeromq) from event-hooks: `pyzmq`
enable [smb](#smb-server) support (**not** recommended): `impacket==0.12.0`
`pyvips` gives higher quality thumbnails than `Pillow` and is 320% faster, using 270% more ram: `sudo apt install libvips42 && python3 -m pip install --user -U pyvips`

71
bin/zmq-recv.py Executable file
View file

@ -0,0 +1,71 @@
#!/usr/bin/env python3
import sys
import zmq
"""
zmq-recv.py: demo zmq receiver
2025-01-22, v1.0, ed <irc.rizon.net>, MIT-Licensed
https://github.com/9001/copyparty/blob/hovudstraum/bin/zmq-recv.py
basic zmq-server to receive events from copyparty; try one of
the below and then "send a message to serverlog" in the web-ui:
1) dumb fire-and-forget to any and all listeners;
run this script with "sub" and run copyparty with this:
--xm zmq:pub:tcp://*:5556
2) one lucky listener gets the message, blocks if no listeners:
run this script with "pull" and run copyparty with this:
--xm t3,zmq:push:tcp://*:5557
3) blocking syn/ack mode, client must ack each message;
run this script with "rep" and run copyparty with this:
--xm t3,zmq:req:tcp://localhost:5555
"""
ctx = zmq.Context()
def sub_server():
# PUB/SUB allows any number of servers/clients, and
# messages are fire-and-forget
sck = ctx.socket(zmq.SUB)
sck.connect("tcp://localhost:5556")
sck.setsockopt_string(zmq.SUBSCRIBE, "")
while True:
print("copyparty says %r" % (sck.recv_string(),))
def pull_server():
# PUSH/PULL allows any number of servers/clients, and
# each message is sent to a exactly one PULL client
sck = ctx.socket(zmq.PULL)
sck.connect("tcp://localhost:5557")
while True:
print("copyparty says %r" % (sck.recv_string(),))
def rep_server():
# REP/REQ is a server/client pair where each message must be
# acked by the other before another message can be sent, so
# copyparty will do a blocking-wait for the ack
sck = ctx.socket(zmq.REP)
sck.bind("tcp://*:5555")
while True:
print("copyparty says %r" % (sck.recv_string(),))
sck.send(b"thx")
mode = sys.argv[1].lower() if len(sys.argv) > 1 else ""
if mode == "sub":
sub_server()
elif mode == "pull":
pull_server()
elif mode == "rep":
rep_server()
else:
print("specify mode as first argument: SUB | PULL | REP")

View file

@ -16,6 +16,7 @@ optdepends=("ffmpeg: thumbnails for videos, images (slower) and audio, music tag
"libkeyfinder-git: detection of musical keys"
"qm-vamp-plugins: BPM detection"
"python-pyopenssl: ftps functionality"
"python-pyzmq: send zeromq messages from event-hooks"
"python-argon2-cffi: hashed passwords in config"
"python-impacket-git: smb support (bad idea)"
)

View file

@ -1,4 +1,4 @@
{ lib, stdenv, makeWrapper, fetchurl, utillinux, python, jinja2, impacket, pyftpdlib, pyopenssl, argon2-cffi, pillow, pyvips, ffmpeg, mutagen,
{ lib, stdenv, makeWrapper, fetchurl, utillinux, python, jinja2, impacket, pyftpdlib, pyopenssl, argon2-cffi, pillow, pyvips, pyzmq, ffmpeg, mutagen,
# use argon2id-hashed passwords in config files (sha2 is always available)
withHashedPasswords ? true,
@ -21,6 +21,9 @@ withMediaProcessing ? true,
# if MediaProcessing is not enabled, you probably want this instead (less accurate, but much safer and faster)
withBasicAudioMetadata ? false,
# send ZeroMQ messages from event-hooks
withZeroMQ ? true,
# enable FTPS support in the FTP server
withFTPS ? false,
@ -43,6 +46,7 @@ let
++ lib.optional withMediaProcessing ffmpeg
++ lib.optional withBasicAudioMetadata mutagen
++ lib.optional withHashedPasswords argon2-cffi
++ lib.optional withZeroMQ pyzmq
);
in stdenv.mkDerivation {
pname = "copyparty";

View file

@ -50,6 +50,7 @@ from .util import (
FFMPEG_URL,
HAVE_PSUTIL,
HAVE_SQLITE3,
HAVE_ZMQ,
URL_BUG,
UTC,
VERSIONS,
@ -641,6 +642,7 @@ class SvcHub(object):
(HAVE_FFPROBE, "ffprobe", t_ff + ", read audio/media tags"),
(HAVE_MUTAGEN, "mutagen", "read audio tags (ffprobe is better but slower)"),
(HAVE_ARGON2, "argon2", "secure password hashing (advanced users only)"),
(HAVE_ZMQ, "pyzmq", "send zeromq messages from event-hooks"),
(HAVE_HEIF, "pillow-heif", "read .heif images with pillow (rarely useful)"),
(HAVE_AVIF, "pillow-avif", "read .avif images with pillow (rarely useful)"),
]

View file

@ -120,6 +120,13 @@ try:
except:
HAVE_SQLITE3 = False
try:
import importlib.util
HAVE_ZMQ = bool(importlib.util.find_spec("zmq"))
except:
HAVE_ZMQ = False
try:
if os.environ.get("PRTY_NO_PSUTIL"):
raise Exception()
@ -3437,6 +3444,7 @@ def runihook(
retchk(rc, bcmd, err, log, 5)
return False
if wait:
wait -= time.time() - t0
if wait > 0:
time.sleep(wait)
@ -3444,8 +3452,110 @@ def runihook(
return True
ZMQ = {}
ZMQ_DESC = {
"pub": "fire-and-forget to all/any connected SUB-clients",
"push": "fire-and-forget to one of the connected PULL-clients",
"req": "send messages to a REP-server and blocking-wait for ack",
}
def _zmq_hook(
log: Optional["NamedLogger"],
verbose: bool,
src: str,
cmd: str,
msg: str,
wait: float,
sp_ka: dict[str, Any],
) -> str:
import zmq
try:
mtx = ZMQ["mtx"]
except:
ZMQ["mtx"] = threading.Lock()
time.sleep(0.1)
mtx = ZMQ["mtx"]
ret = ""
t0 = time.time()
if verbose and log:
log("hook(%s) %r entering zmq-main-lock" % (src, cmd), 6)
with mtx:
try:
mode, sck, mtx = ZMQ[cmd]
except:
mode, uri = cmd.split(":", 1)
try:
desc = ZMQ_DESC[mode]
if log:
t = "libzmq(%s) pyzmq(%s) init(%s); %s"
log(t % (zmq.zmq_version(), zmq.__version__, cmd, desc))
except:
raise Exception("the only supported ZMQ modes are REQ PUB PUSH")
try:
ctx = ZMQ["ctx"]
except:
ctx = ZMQ["ctx"] = zmq.Context()
timeout = sp_ka["timeout"]
if mode == "pub":
sck = ctx.socket(zmq.PUB)
sck.bind(uri)
time.sleep(1) # give clients time to connect; avoids losing first msg
elif mode == "push":
sck = ctx.socket(zmq.PUSH)
sck.bind(uri)
if timeout:
sck.SNDTIMEO = int(timeout * 1000)
elif mode == "req":
sck = ctx.socket(zmq.REQ)
sck.connect(uri)
if timeout:
sck.RCVTIMEO = int(timeout * 1000)
else:
raise Exception()
mtx = threading.Lock()
ZMQ[cmd] = (mode, sck, mtx)
if verbose and log:
log("hook(%s) %r entering socket-lock" % (src, cmd), 6)
with mtx:
if verbose and log:
log("hook(%s) %r sending |%d|" % (src, cmd, len(msg)), 6)
sck.send_string(msg) # PUSH can safely timeout here
if mode == "req":
if verbose and log:
log("hook(%s) %r awaiting ack from req" % (src, cmd), 6)
try:
ret = sck.recv().decode("utf-8", "replace")
except:
sck.close()
del ZMQ[cmd] # bad state; must reset
raise Exception("ack timeout; zmq socket killed")
if ret and log:
log("hook(%s) %r ACK: %r" % (src, cmd, ret), 6)
if wait:
wait -= time.time() - t0
if wait > 0:
time.sleep(wait)
return ret
def _runhook(
log: Optional["NamedLogger"],
verbose: bool,
src: str,
cmd: str,
ap: str,
@ -3486,6 +3596,15 @@ def _runhook(
else:
arg = txt or ap
if acmd[0].startswith("zmq:"):
zs = "zmq-error"
try:
zs = _zmq_hook(log, verbose, src, acmd[0][4:].lower(), arg, wait, sp_ka)
except Exception as ex:
if log:
log("zeromq failed: %r" % (ex,))
return {"rc": 0, "stdout": zs}
acmd += [arg]
if acmd[0].endswith(".py"):
acmd = [pybin] + acmd
@ -3514,6 +3633,7 @@ def _runhook(
except:
ret = {"rc": rc, "stdout": v}
if wait:
wait -= time.time() - t0
if wait > 0:
time.sleep(wait)
@ -3540,14 +3660,15 @@ def runhook(
) -> dict[str, Any]:
assert broker or up2k # !rm
args = (broker or up2k).args
verbose = args.hook_v
vp = vp.replace("\\", "/")
ret = {"rc": 0}
for cmd in cmds:
try:
hr = _runhook(
log, src, cmd, ap, vp, host, uname, perms, mt, sz, ip, at, txt
log, verbose, src, cmd, ap, vp, host, uname, perms, mt, sz, ip, at, txt
)
if log and args.hook_v:
if verbose and log:
log("hook(%s) %r => \033[32m%s" % (src, cmd, hr), 6)
if not hr:
return {}

View file

@ -342,6 +342,7 @@ python3 -m venv .venv
. .venv/bin/activate
pip install jinja2 strip_hints # MANDATORY
pip install argon2-cffi # password hashing
pip install pyzmq # send 0mq from hooks
pip install mutagen # audio metadata
pip install pyftpdlib # ftp server
pip install partftpy # tftp server

View file

@ -52,6 +52,7 @@ ftpd = ["pyftpdlib"]
ftps = ["pyftpdlib", "pyopenssl"]
tftpd = ["partftpy>=0.4.0"]
pwhash = ["argon2-cffi"]
zeromq = ["pyzmq"]
[project.scripts]
copyparty = "copyparty.__main__:main"

View file

@ -9,7 +9,7 @@ ENV XDG_CONFIG_HOME=/cfg
RUN apk --no-cache add !pyc \
tzdata wget \
py3-jinja2 py3-argon2-cffi py3-pillow \
py3-jinja2 py3-argon2-cffi py3-pyzmq py3-pillow \
ffmpeg
COPY i/dist/copyparty-sfx.py innvikler.sh ./

View file

@ -12,7 +12,8 @@ COPY i/bin/mtag/audio-bpm.py /mtag/
COPY i/bin/mtag/audio-key.py /mtag/
RUN apk add -U !pyc \
tzdata wget \
py3-jinja2 py3-argon2-cffi py3-pillow py3-pip py3-cffi \
py3-jinja2 py3-argon2-cffi py3-pyzmq py3-pillow \
py3-pip py3-cffi \
ffmpeg \
vips-jxl vips-heif vips-poppler vips-magick \
py3-numpy fftw libsndfile \

View file

@ -9,7 +9,8 @@ ENV XDG_CONFIG_HOME=/cfg
RUN apk add -U !pyc \
tzdata wget \
py3-jinja2 py3-argon2-cffi py3-pillow py3-pip py3-cffi \
py3-jinja2 py3-argon2-cffi py3-pyzmq py3-pillow \
py3-pip py3-cffi \
ffmpeg \
vips-jxl vips-heif vips-poppler vips-magick \
&& apk add -t .bd \

View file

@ -144,6 +144,7 @@ args = {
"ftps": ["pyftpdlib", "pyopenssl"],
"tftpd": ["partftpy>=0.4.0"],
"pwhash": ["argon2-cffi"],
"zeromq": ["pyzmq"],
},
"entry_points": {"console_scripts": ["copyparty = copyparty.__main__:main"]},
"scripts": ["bin/partyfuse.py", "bin/u2c.py"],