add xiu (batching hook; runs on idle after uploads) +

bunch of tweaks/fixes for hooks
This commit is contained in:
ed 2023-02-26 18:23:32 +00:00
parent 76bd005bdc
commit 05e0c2ec9e
12 changed files with 688 additions and 116 deletions

68
bin/hooks/notify2.py Executable file
View file

@ -0,0 +1,68 @@
#!/usr/bin/env python3
import json
import os
import sys
import subprocess as sp
from datetime import datetime
from plyer import notification
_ = r"""
same as notify.py but with additional info (uploader, ...)
and also supports --xm (notify on 📟 message)
example usages; either as global config (all volumes) or as volflag:
--xm f,j,bin/hooks/notify2.py
--xau f,j,bin/hooks/notify2.py
-v srv/inc:inc:c,xm=f,j,bin/hooks/notify2.py
-v srv/inc:inc:c,xau=f,j,bin/hooks/notify2.py
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
parameters explained,
xau = execute after upload
f = fork so it doesn't block uploads
j = provide json instead of filepath list
"""
try:
from copyparty.util import humansize
except:
def humansize(n):
return n
def main():
inf = json.loads(sys.argv[1])
fp = inf["ap"]
sz = humansize(inf["sz"])
dp, fn = os.path.split(fp)
mt = datetime.utcfromtimestamp(inf["mt"]).strftime("%Y-%m-%d %H:%M:%S")
msg = f"{fn} ({sz})\n📁 {dp}"
title = "File received"
icon = "emblem-documents-symbolic" if sys.platform == "linux" else ""
if inf.get("txt"):
msg = inf["txt"]
title = "Message received"
icon = "mail-unread-symbolic" if sys.platform == "linux" else ""
msg += f"\n👤 {inf['user']} ({inf['ip']})\n🕒 {mt}"
if "com.termux" in sys.executable:
sp.run(["termux-notification", "-t", title, "-c", msg])
return
notification.notify(
title=title,
message=msg,
app_icon=icon,
timeout=10,
)
if __name__ == "__main__":
main()

103
bin/hooks/xiu-sha.py Executable file
View file

@ -0,0 +1,103 @@
#!/usr/bin/env python3
import hashlib
import json
import sys
from datetime import datetime
_ = r"""
this hook will produce a single sha512 file which
covers all recent uploads (plus metadata comments)
use this with --xiu, which makes copyparty buffer
uploads until server is idle, providing file infos
on stdin (filepaths or json)
example usage as global config:
--xiu i5,j,bin/hooks/xiu-sha.py
example usage as a volflag (per-volume config):
-v srv/inc:inc:c,xiu=i5,j,bin/hooks/xiu-sha.py
parameters explained,
xiu = execute after uploads...
i5 = ...after volume has been idle for 5sec
j = provide json instead of filepath list
note the "f" (fork) flag is not set, so this xiu
will block other xiu hooks while it's running
"""
try:
from copyparty.util import fsenc
except:
def fsenc(p):
return p
def humantime(ts):
return datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")
def find_files_root(inf):
di = 9000
for f1, f2 in zip(inf, inf[1:]):
p1 = f1["ap"].replace("\\", "/").rsplit("/", 1)[0]
p2 = f2["ap"].replace("\\", "/").rsplit("/", 1)[0]
di = min(len(p1), len(p2), di)
di = next((i for i in range(di) if p1[i] != p2[i]), di)
return di + 1
def find_vol_root(inf):
return len(inf[0]["ap"][: -len(inf[0]["vp"])])
def main():
zb = sys.stdin.buffer.read()
zs = zb.decode("utf-8", "replace")
inf = json.loads(zs)
# root directory (where to put the sha512 file);
# di = find_files_root(inf) # next to the file closest to volume root
di = find_vol_root(inf) # top of the entire volume
ret = []
total_sz = 0
for md in inf:
ap = md["ap"]
rp = ap[di:]
total_sz += md["sz"]
fsize = "{:,}".format(md["sz"])
mtime = humantime(md["mt"])
up_ts = humantime(md["at"])
h = hashlib.sha512()
with open(fsenc(md["ap"]), "rb", 512 * 1024) as f:
while True:
buf = f.read(512 * 1024)
if not buf:
break
h.update(buf)
cksum = h.hexdigest()
meta = " | ".join([md["wark"], up_ts, mtime, fsize, md["ip"]])
ret.append("# {}\n{} *{}".format(meta, cksum, rp))
ret.append("# {} files, {} bytes total".format(len(inf), total_sz))
ret.append("")
ftime = datetime.utcnow().strftime("%Y-%m%d-%H%M%S.%f")
fp = "{}xfer-{}.sha512".format(inf[0]["ap"][:di], ftime)
with open(fsenc(fp), "wb") as f:
f.write("\n".join(ret).encode("utf-8", "replace"))
print("wrote checksums to {}".format(fp))
if __name__ == "__main__":
main()

45
bin/hooks/xiu.py Executable file
View file

@ -0,0 +1,45 @@
#!/usr/bin/env python3
import json
import sys
_ = r"""
this hook prints absolute filepaths + total size
use this with --xiu, which makes copyparty buffer
uploads until server is idle, providing file infos
on stdin (filepaths or json)
example usage as global config:
--xiu i1,j,bin/hooks/xiu.py
example usage as a volflag (per-volume config):
-v srv/inc:inc:c,xiu=i1,j,bin/hooks/xiu.py
parameters explained,
xiu = execute after uploads...
i1 = ...after volume has been idle for 1sec
j = provide json instead of filepath list
note the "f" (fork) flag is not set, so this xiu
will block other xiu hooks while it's running
"""
def main():
zb = sys.stdin.buffer.read()
zs = zb.decode("utf-8", "replace")
inf = json.loads(zs)
total_sz = 0
for upload in inf:
sz = upload["sz"]
total_sz += sz
print("{:9} {}".format(sz, upload["ap"]))
print("{} files, {} bytes total".format(len(inf), total_sz))
if __name__ == "__main__":
main()

View file

@ -511,6 +511,7 @@ def get_sects():
execute a command (a program or script) before or after various events;
\033[36mxbu\033[35m executes CMD before a file upload starts
\033[36mxau\033[35m executes CMD after a file upload finishes
\033[36mxiu\033[35m executes CMD after all uploads finish and volume is idle
\033[36mxbr\033[35m executes CMD before a file rename/move
\033[36mxar\033[35m executes CMD after a file rename/move
\033[36mxbd\033[35m executes CMD before a file delete
@ -532,6 +533,7 @@ def get_sects():
\033[36mj\033[35m provides json with info as 1st arg instead of filepath
\033[36mwN\033[35m waits N sec after command has been started before continuing
\033[36mtN\033[35m sets an N sec timeout before the command is abandoned
\033[36miN\033[35m xiu only: volume must be idle for N sec (default = 5)
\033[36mkt\033[35m kills the entire process tree on timeout (default),
\033[36mkm\033[35m kills just the main process
@ -542,6 +544,14 @@ def get_sects():
\033[36mc2\033[35m show only stdout
\033[36mc3\033[35m mute all process otput
\033[0m
each hook is executed once for each event, except for \033[36mxiu\033[0m
which builds up a backlog of uploads, running the hook just once
as soon as the volume has been idle for iN seconds (5 by default)
\033[36mxiu\033[0m is also unique in that it will pass the metadata to the
executed program on STDIN instead of as argv arguments, and
it also includes the wark (file-id/hash) as a json property
except for \033[36mxm\033[0m, only one hook / one action can run at a time,
so it's recommended to use the \033[36mf\033[0m flag unless you really need
to wait for the hook to finish before continuing (without \033[36mf\033[0m
@ -769,6 +779,7 @@ def add_hooks(ap):
ap2 = ap.add_argument_group('event hooks (see --help-hooks)')
ap2.add_argument("--xbu", metavar="CMD", type=u, action="append", help="execute CMD before a file upload starts")
ap2.add_argument("--xau", metavar="CMD", type=u, action="append", help="execute CMD after a file upload finishes")
ap2.add_argument("--xiu", metavar="CMD", type=u, action="append", help="execute CMD after all uploads finish and volume is idle")
ap2.add_argument("--xbr", metavar="CMD", type=u, action="append", help="execute CMD before a file move/rename")
ap2.add_argument("--xar", metavar="CMD", type=u, action="append", help="execute CMD after a file move/rename")
ap2.add_argument("--xbd", metavar="CMD", type=u, action="append", help="execute CMD before a file delete")

View file

@ -934,7 +934,7 @@ class AuthSrv(object):
is_list: bool,
) -> None:
desc = flagdescs.get(name, "?").replace("\n", " ")
if name not in ["mtp", "xbu", "xau", "xbr", "xar", "xbd", "xad", "xm"]:
if name not in "mtp xbu xau xiu xbr xar xbd xad xm".split():
if value is True:
t = "└─add volflag [{}] = {} ({})"
else:
@ -1303,7 +1303,7 @@ class AuthSrv(object):
vol.flags["mth"] = self.args.mth
# append additive args from argv to volflags
hooks = "xbu xau xbr xar xbd xad xm".split()
hooks = "xbu xau xiu xbr xar xbd xad xm".split()
for name in ["mtp"] + hooks:
self._read_volflag(vol.flags, name, getattr(self.args, name), True)
@ -1363,11 +1363,20 @@ class AuthSrv(object):
if k in ints:
vol.flags[k] = int(vol.flags[k])
if "lifetime" in vol.flags and "e2d" not in vol.flags:
if "e2d" not in vol.flags:
if "lifetime" in vol.flags:
t = 'removing lifetime config from volume "/{}" because e2d is disabled'
self.log(t.format(vol.vpath), 1)
del vol.flags["lifetime"]
needs_e2d = [x for x in hooks if x != "xm"]
drop = [x for x in needs_e2d if vol.flags.get(x)]
if drop:
t = 'removing [{}] from volume "/{}" because e2d is disabled'
self.log(t.format(", ".join(drop), vol.vpath), 1)
for x in drop:
vol.flags.pop(x)
if vol.flags.get("neversymlink") and not vol.flags.get("hardlink"):
vol.flags["copydupes"] = True
@ -1624,7 +1633,7 @@ class AuthSrv(object):
]
csv = set("i p".split())
lst = set("c ihead mtm mtp xad xar xau xbd xbr xbu xm".split())
lst = set("c ihead mtm mtp xad xar xau xiu xbd xbr xbu xm".split())
askip = set("a v c vc cgen theme".split())
# keymap from argv to vflag

View file

@ -123,6 +123,7 @@ flagcats = {
"event hooks\n(better explained in --help-hooks)": {
"xbu=CMD": "execute CMD before a file upload starts",
"xau=CMD": "execute CMD after a file upload finishes",
"xiu=CMD": "execute CMD after all uploads finish and volume is idle",
"xbr=CMD": "execute CMD before a file rename/move",
"xar=CMD": "execute CMD after a file rename/move",
"xbd=CMD": "execute CMD before a file delete",

View file

@ -15,6 +15,7 @@ from pyftpdlib.servers import FTPServer
from .__init__ import ANYWIN, PY2, TYPE_CHECKING, E
from .bos import bos
from .authsrv import VFS
from .util import (
Daemon,
Pebkac,
@ -23,6 +24,7 @@ from .util import (
ipnorm,
pybin,
relchk,
runhook,
sanitize_fn,
vjoin,
)
@ -132,7 +134,7 @@ class FtpFs(AbstractedFS):
w: bool = False,
m: bool = False,
d: bool = False,
) -> str:
) -> tuple[str, VFS, str]:
try:
vpath = vpath.replace("\\", "/").lstrip("/")
rd, fn = os.path.split(vpath)
@ -146,7 +148,7 @@ class FtpFs(AbstractedFS):
if not vfs.realpath:
raise FilesystemError("no filesystem mounted at this path")
return os.path.join(vfs.realpath, rem)
return os.path.join(vfs.realpath, rem), vfs, rem
except Pebkac as ex:
raise FilesystemError(str(ex))
@ -157,7 +159,7 @@ class FtpFs(AbstractedFS):
w: bool = False,
m: bool = False,
d: bool = False,
) -> str:
) -> tuple[str, VFS, str]:
return self.v2a(os.path.join(self.cwd, vpath), r, w, m, d)
def ftp2fs(self, ftppath: str) -> str:
@ -179,7 +181,7 @@ class FtpFs(AbstractedFS):
r = "r" in mode
w = "w" in mode or "a" in mode or "+" in mode
ap = self.rv2a(filename, r, w)
ap = self.rv2a(filename, r, w)[0]
if w:
try:
st = bos.stat(ap)
@ -212,7 +214,7 @@ class FtpFs(AbstractedFS):
) = self.hub.asrv.vfs.can_access(self.cwd.lstrip("/"), self.h.username)
def mkdir(self, path: str) -> None:
ap = self.rv2a(path, w=True)
ap = self.rv2a(path, w=True)[0]
bos.mkdir(ap)
def listdir(self, path: str) -> list[str]:
@ -244,7 +246,7 @@ class FtpFs(AbstractedFS):
return list(sorted(list(r.keys())))
def rmdir(self, path: str) -> None:
ap = self.rv2a(path, d=True)
ap = self.rv2a(path, d=True)[0]
bos.rmdir(ap)
def remove(self, path: str) -> None:
@ -277,10 +279,10 @@ class FtpFs(AbstractedFS):
def stat(self, path: str) -> os.stat_result:
try:
ap = self.rv2a(path, r=True)
ap = self.rv2a(path, r=True)[0]
return bos.stat(ap)
except:
ap = self.rv2a(path)
ap = self.rv2a(path)[0]
st = bos.stat(ap)
if not stat.S_ISDIR(st.st_mode):
raise
@ -288,11 +290,11 @@ class FtpFs(AbstractedFS):
return st
def utime(self, path: str, timeval: float) -> None:
ap = self.rv2a(path, w=True)
ap = self.rv2a(path, w=True)[0]
return bos.utime(ap, (timeval, timeval))
def lstat(self, path: str) -> os.stat_result:
ap = self.rv2a(path)
ap = self.rv2a(path)[0]
return bos.stat(ap)
def isfile(self, path: str) -> bool:
@ -303,7 +305,7 @@ class FtpFs(AbstractedFS):
return False # expected for mojibake in ftp_SIZE()
def islink(self, path: str) -> bool:
ap = self.rv2a(path)
ap = self.rv2a(path)[0]
return bos.path.islink(ap)
def isdir(self, path: str) -> bool:
@ -314,18 +316,18 @@ class FtpFs(AbstractedFS):
return True
def getsize(self, path: str) -> int:
ap = self.rv2a(path)
ap = self.rv2a(path)[0]
return bos.path.getsize(ap)
def getmtime(self, path: str) -> float:
ap = self.rv2a(path)
ap = self.rv2a(path)[0]
return bos.path.getmtime(ap)
def realpath(self, path: str) -> str:
return path
def lexists(self, path: str) -> bool:
ap = self.rv2a(path)
ap = self.rv2a(path)[0]
return bos.path.lexists(ap)
def get_user_by_uid(self, uid: int) -> str:
@ -355,11 +357,31 @@ class FtpHandler(FTPHandler):
# reduce non-debug logging
self.log_cmds_list = [x for x in self.log_cmds_list if x not in ("CWD", "XCWD")]
def die(self, msg):
self.respond("550 {}".format(msg))
raise FilesystemError(msg)
def ftp_STOR(self, file: str, mode: str = "w") -> Any:
# Optional[str]
vp = join(self.fs.cwd, file).lstrip("/")
ap = self.fs.v2a(vp)
ap, vfs, rem = self.fs.v2a(vp)
self.vfs_map[ap] = vp
xbu = vfs.flags.get("xbu")
if xbu and not runhook(
None,
xbu,
ap,
vfs.canonical(rem),
"",
self.username,
0,
0,
self.cli_ip,
0,
"",
):
self.die("Upload blocked by xbu server config")
# print("ftp_STOR: {} {} => {}".format(vp, mode, ap))
ret = FTPHandler.ftp_STOR(self, file, mode)
# print("ftp_STOR: {} {} OK".format(vp, mode))
@ -384,11 +406,13 @@ class FtpHandler(FTPHandler):
vfs, rem = vfs.get_dbv(rem)
self.hub.up2k.hash_file(
vfs.realpath,
vfs.vpath,
vfs.flags,
rem,
fn,
self.remote_ip,
self.cli_ip,
time.time(),
self.username,
)
return FTPHandler.log_transfer(

View file

@ -1272,9 +1272,10 @@ class HttpCli(object):
self.vpath,
self.host,
self.uname,
self.ip,
time.time(),
len(xm),
self.ip,
time.time(),
plain,
)
@ -1415,9 +1416,10 @@ class HttpCli(object):
self.vpath,
self.host,
self.uname,
self.ip,
at,
remains,
self.ip,
at,
"",
):
t = "upload denied by xbu"
@ -1491,9 +1493,10 @@ class HttpCli(object):
self.vpath,
self.host,
self.uname,
self.ip,
at,
post_sz,
self.ip,
at,
"",
):
t = "upload denied by xau"
@ -1505,11 +1508,13 @@ class HttpCli(object):
self.conn.hsrv.broker.say(
"up2k.hash_file",
vfs.realpath,
vfs.vpath,
vfs.flags,
rem,
fn,
self.ip,
at,
self.uname,
)
vsuf = ""
@ -2102,9 +2107,10 @@ class HttpCli(object):
self.vpath,
self.host,
self.uname,
self.ip,
at,
0,
self.ip,
at,
"",
):
t = "upload denied by xbu"
@ -2161,9 +2167,10 @@ class HttpCli(object):
self.vpath,
self.host,
self.uname,
self.ip,
at,
sz,
self.ip,
at,
"",
):
t = "upload denied by xau"
@ -2175,11 +2182,13 @@ class HttpCli(object):
self.conn.hsrv.broker.say(
"up2k.hash_file",
dbv.realpath,
vfs.vpath,
dbv.flags,
vrem,
fname,
self.ip,
at,
self.uname,
)
self.conn.nbyte += sz

View file

@ -12,10 +12,10 @@ from types import SimpleNamespace
from .__init__ import ANYWIN, EXE, TYPE_CHECKING
from .authsrv import LEELOO_DALLAS, VFS
from .bos import bos
from .util import Daemon, min_ex, pybin
from .util import Daemon, min_ex, pybin, runhook
if True: # pylint: disable=using-constant-test
from typing import Any
from typing import Any, Union
if TYPE_CHECKING:
from .svchub import SvcHub
@ -113,6 +113,9 @@ class SMB(object):
self.stop = srv.stop
self.log("smb", "listening @ {}:{}".format(ip, port))
def nlog(self, msg: str, c: Union[int, str] = 0) -> None:
self.log("smb", msg, c)
def start(self) -> None:
Daemon(self.srv.start)
@ -169,9 +172,16 @@ class SMB(object):
yeet("blocked write (no --smbw): " + vpath)
vfs, ap = self._v2a("open", vpath, *a)
if wr and not vfs.axs.uwrite:
if wr:
if not vfs.axs.uwrite:
yeet("blocked write (no-write-acc): " + vpath)
xbu = vfs.flags.get("xbu")
if xbu and not runhook(
self.nlog, xbu, ap, vpath, "", "", 0, 0, "1.7.6.2", 0, ""
):
yeet("blocked by xbu server config: " + vpath)
ret = bos.open(ap, flags, *a, mode=chmod, **ka)
if wr:
now = time.time()
@ -198,11 +208,13 @@ class SMB(object):
vfs, rem = vfs.get_dbv(rem)
self.hub.up2k.hash_file(
vfs.realpath,
vfs.vpath,
vfs.flags,
rem,
fn,
"1.7.6.2",
time.time(),
"",
)
def _rename(self, vp1: str, vp2: str) -> None:

View file

@ -48,6 +48,7 @@ from .util import (
rmdirs,
rmdirs_up,
runhook,
runihook,
s2hms,
s3dec,
s3enc,
@ -122,6 +123,7 @@ class Up2k(object):
self.flags: dict[str, dict[str, Any]] = {}
self.droppable: dict[str, list[str]] = {}
self.volstate: dict[str, str] = {}
self.vol_act: dict[str, float] = {}
self.dupesched: dict[str, list[tuple[str, str, float]]] = {}
self.snap_persist_interval = 300 # persist unfinished index every 5 min
self.snap_discard_interval = 21600 # drop unfinished after 6 hours inactivity
@ -131,13 +133,17 @@ class Up2k(object):
self.entags: dict[str, set[str]] = {}
self.mtp_parsers: dict[str, dict[str, MParser]] = {}
self.pending_tags: list[tuple[set[str], str, str, dict[str, Any]]] = []
self.hashq: Queue[tuple[str, str, str, str, float]] = Queue()
self.hashq: Queue[tuple[str, str, str, str, str, float, str]] = Queue()
self.tagq: Queue[tuple[str, str, str, str, str, float]] = Queue()
self.tag_event = threading.Condition()
self.n_hashq = 0
self.n_tagq = 0
self.mpool_used = False
self.xiu_ptn = re.compile(r"(?:^|,)i([0-9]+)")
self.xiu_busy = False # currently running hook
self.xiu_asleep = True # needs rescan_cond poke to schedule self
self.cur: dict[str, "sqlite3.Cursor"] = {}
self.mem_cur = None
self.sqlite_ver = None
@ -291,7 +297,7 @@ class Up2k(object):
cooldown = now + 1
continue
cooldown = now + 5
cooldown = now + 3
# self.log("SR", 5)
if self.args.no_lifetime:
@ -300,6 +306,8 @@ class Up2k(object):
# important; not deferred by db_act
timeout = self._check_lifetimes()
timeout = min(timeout, now + self._check_xiu())
with self.mutex:
for vp, vol in sorted(self.asrv.vfs.all_vols.items()):
maxage = vol.flags.get("scan")
@ -394,6 +402,85 @@ class Up2k(object):
return timeout
def _check_xiu(self) -> float:
if self.xiu_busy:
return 2
ret = 9001
for _, vol in sorted(self.asrv.vfs.all_vols.items()):
rp = vol.realpath
cur = self.cur.get(rp)
if not cur:
continue
with self.mutex:
q = "select distinct c from iu"
cds = cur.execute(q).fetchall()
if not cds:
continue
run_cds: list[int] = []
for cd in sorted([x[0] for x in cds]):
delta = cd - (time.time() - self.vol_act[rp])
if delta > 0:
ret = min(ret, delta)
break
run_cds.append(cd)
if run_cds:
self.xiu_busy = True
Daemon(self._run_xius, "xiu", (vol, run_cds))
return 2
return ret
def _run_xius(self, vol: VFS, cds: list[int]):
for cd in cds:
self._run_xiu(vol, cd)
self.xiu_busy = False
self.xiu_asleep = True
def _run_xiu(self, vol: VFS, cd: int):
rp = vol.realpath
cur = self.cur[rp]
# t0 = time.time()
with self.mutex:
q = "select w,rd,fn from iu where c={} limit 80386"
wrfs = cur.execute(q.format(cd)).fetchall()
if not wrfs:
return
# dont wanna rebox so use format instead of prepared
q = "delete from iu where w=? and +rd=? and +fn=? and +c={}"
cur.executemany(q.format(cd), wrfs)
cur.connection.commit()
q = "select * from up where substr(w,1,16)=? and +rd=? and +fn=?"
ups = []
for wrf in wrfs:
try:
# almost definitely exists; don't care if it doesn't
ups.append(cur.execute(q, wrf).fetchone())
except:
pass
# t1 = time.time()
# self.log("mapped {} warks in {:.3f} sec".format(len(wrfs), t1 - t0))
# "mapped 10989 warks in 0.126 sec"
cmds = self.flags[rp]["xiu"]
for cmd in cmds:
m = self.xiu_ptn.search(cmd)
ccd = int(m.group(1)) if m else 5
if ccd != cd:
continue
self.log("xiu: {}# {}".format(len(wrfs), cmd))
runihook(self.log, cmd, vol, ups)
def _vis_job_progress(self, job: dict[str, Any]) -> str:
perc = 100 - (len(job["need"]) * 100.0 / len(job["hash"]))
path = djoin(job["ptop"], job["prel"], job["name"])
@ -710,6 +797,7 @@ class Up2k(object):
self.log("\n".join(ta))
self.flags[ptop] = flags
self.vol_act[ptop] = 0.0
self.registry[ptop] = reg
self.droppable[ptop] = drp or []
self.regdrop(ptop, "")
@ -1010,7 +1098,8 @@ class Up2k(object):
wark = up2k_wark_from_hashlist(self.salt, sz, hashes)
self.db_add(db.c, wark, rd, fn, lmod, sz, "", 0)
# skip upload hooks by not providing vflags
self.db_add(db.c, {}, rd, fn, lmod, sz, "", "", wark, "", "", "", 0)
db.n += 1
ret += 1
td = time.time() - db.t
@ -1872,6 +1961,7 @@ class Up2k(object):
if ver == DB_VER:
try:
self._add_xiu_tab(cur)
self._add_dhash_tab(cur)
except:
pass
@ -1965,6 +2055,7 @@ class Up2k(object):
cur.execute(cmd)
self._add_dhash_tab(cur)
self._add_xiu_tab(cur)
self.log("created DB at {}".format(db_path))
return cur
@ -1990,6 +2081,18 @@ class Up2k(object):
cur.connection.commit()
def _add_xiu_tab(self, cur: "sqlite3.Cursor") -> None:
# v5a -> v5b
# store rd+fn rather than warks to support nohash vols
for cmd in [
r"create table iu (c int, w text, rd text, fn text)",
r"create index iu_c on iu(c)",
r"create index iu_w on iu(w)",
]:
cur.execute(cmd)
cur.connection.commit()
def _job_volchk(self, cj: dict[str, Any]) -> None:
if not self.register_vpath(cj["ptop"], cj["vcfg"]):
if cj["ptop"] not in self.registry:
@ -2009,11 +2112,12 @@ class Up2k(object):
with self.mutex:
self._job_volchk(cj)
ptop = cj["ptop"]
cj["name"] = sanitize_fn(cj["name"], "", [".prologue.html", ".epilogue.html"])
cj["poke"] = now = self.db_act = time.time()
cj["poke"] = now = self.db_act = self.vol_act[ptop] = time.time()
wark = self._get_wark(cj)
job = None
pdir = djoin(cj["ptop"], cj["prel"])
pdir = djoin(ptop, cj["prel"])
try:
dev = bos.stat(pdir).st_dev
except:
@ -2024,7 +2128,6 @@ class Up2k(object):
sprs = self.fstab.get(pdir) != "ng"
with self.mutex:
ptop = cj["ptop"]
jcur = self.cur.get(ptop)
reg = self.registry[ptop]
vfs = self.asrv.vfs.all_vols[cj["vtop"]]
@ -2161,7 +2264,7 @@ class Up2k(object):
raise Pebkac(422, err)
elif "nodupe" in self.flags[cj["ptop"]]:
elif "nodupe" in vfs.flags:
self.log("dupe-reject:\n {0}\n {1}".format(src, dst))
err = "upload rejected, file already exists:\n"
err += "/" + quotep(vsrc) + " "
@ -2170,6 +2273,7 @@ class Up2k(object):
# symlink to the client-provided name,
# returning the previous upload info
job = deepcopy(job)
job["wark"] = wark
for k in "ptop vtop prel addr".split():
job[k] = cj[k]
@ -2182,6 +2286,24 @@ class Up2k(object):
job["name"] = self._untaken(pdir, cj, now)
dst = djoin(job["ptop"], job["prel"], job["name"])
xbu = vfs.flags.get("xbu")
if xbu and not runhook(
self.log,
xbu, # type: ignore
dst,
job["vtop"],
job.get("host") or "",
job.get("user") or "",
job["lmod"],
job["size"],
job["addr"],
job.get("at") or 0,
"",
):
t = "upload blocked by xbu server config: {}".format(dst)
self.log(t, 1)
raise Pebkac(403, t)
if not self.args.nw:
try:
dvf = self.flags[job["ptop"]]
@ -2193,9 +2315,10 @@ class Up2k(object):
raise
if cur:
a = [job[x] for x in "prel name lmod size addr".split()]
zs = "prel name lmod size ptop vtop wark host user addr"
a = [job[x] for x in zs.split()]
a += [job.get("at") or time.time()]
self.db_add(cur, wark, *a)
self.db_add(cur, vfs.flags, *a)
cur.connection.commit()
if not job:
@ -2376,7 +2499,7 @@ class Up2k(object):
self, ptop: str, wark: str, chash: str
) -> tuple[int, list[int], str, float, bool]:
with self.mutex:
self.db_act = time.time()
self.db_act = self.vol_act[ptop] = time.time()
job = self.registry[ptop].get(wark)
if not job:
known = " ".join([x for x in self.registry[ptop].keys()])
@ -2427,7 +2550,7 @@ class Up2k(object):
def confirm_chunk(self, ptop: str, wark: str, chash: str) -> tuple[int, str]:
with self.mutex:
self.db_act = time.time()
self.db_act = self.vol_act[ptop] = time.time()
try:
job = self.registry[ptop][wark]
pdir = djoin(job["ptop"], job["prel"])
@ -2462,7 +2585,6 @@ class Up2k(object):
self._finish_upload(ptop, wark)
def _finish_upload(self, ptop: str, wark: str) -> None:
self.db_act = time.time()
try:
job = self.registry[ptop][wark]
pdir = djoin(job["ptop"], job["prel"])
@ -2475,24 +2597,7 @@ class Up2k(object):
atomic_move(src, dst)
upt = job.get("at") or time.time()
xau = self.flags[ptop].get("xau")
if xau and not runhook(
self.log,
xau,
dst,
djoin(job["vtop"], job["prel"], job["name"]),
job["host"],
job["user"],
job["addr"],
upt,
job["size"],
"",
):
t = "upload blocked by xau"
self.log(t, 1)
bos.unlink(dst)
self.registry[ptop].pop(wark, None)
raise Pebkac(403, t)
vflags = self.flags[ptop]
times = (int(time.time()), int(job["lmod"]))
if ANYWIN:
@ -2504,7 +2609,8 @@ class Up2k(object):
except:
pass
z2 = [job[x] for x in "ptop wark prel name lmod size addr".split()]
zs = "prel name lmod size ptop vtop wark host user addr"
z2 = [job[x] for x in zs.split()]
wake_sr = False
try:
flt = job["life"]
@ -2519,7 +2625,7 @@ class Up2k(object):
pass
z2 += [upt]
if self.idx_wark(*z2):
if self.idx_wark(vflags, *z2):
del self.registry[ptop][wark]
else:
self.regdrop(ptop, wark)
@ -2541,7 +2647,7 @@ class Up2k(object):
self._symlink(dst, d2, self.flags[ptop], lmod=lmod)
if cur:
self.db_rm(cur, rd, fn)
self.db_add(cur, wark, rd, fn, *z2[-4:])
self.db_add(cur, vflags, rd, fn, lmod, *z2[3:])
if cur:
cur.connection.commit()
@ -2563,12 +2669,16 @@ class Up2k(object):
def idx_wark(
self,
ptop: str,
wark: str,
vflags: dict[str, Any],
rd: str,
fn: str,
lmod: float,
sz: int,
ptop: str,
vtop: str,
wark: str,
host: str,
usr: str,
ip: str,
at: float,
) -> bool:
@ -2576,9 +2686,12 @@ class Up2k(object):
if not cur:
return False
self.db_act = self.vol_act[ptop] = time.time()
try:
self.db_rm(cur, rd, fn)
self.db_add(cur, wark, rd, fn, lmod, sz, ip, at)
self.db_add(
cur, vflags, rd, fn, lmod, sz, ptop, vtop, wark, host, usr, ip, at
)
cur.connection.commit()
except Exception as ex:
x = self.register_vpath(ptop, {})
@ -2603,11 +2716,16 @@ class Up2k(object):
def db_add(
self,
db: "sqlite3.Cursor",
wark: str,
vflags: dict[str, Any],
rd: str,
fn: str,
ts: float,
sz: int,
ptop: str,
vtop: str,
wark: str,
host: str,
usr: str,
ip: str,
at: float,
) -> None:
@ -2621,6 +2739,49 @@ class Up2k(object):
v = (wark, int(ts), sz, rd, fn, ip or "", int(at or 0))
db.execute(sql, v)
xau = vflags.get("xau")
dst = djoin(ptop, rd, fn)
if xau and not runhook(
self.log,
xau,
dst,
djoin(vtop, rd, fn),
host,
usr,
int(ts),
sz,
ip,
at or time.time(),
"",
):
t = "upload blocked by xau server config"
self.log(t, 1)
bos.unlink(dst)
self.registry[ptop].pop(wark, None)
raise Pebkac(403, t)
xiu = vflags.get("xiu")
if xiu:
cds: set[int] = set()
for cmd in xiu:
m = self.xiu_ptn.search(cmd)
cds.add(int(m.group(1)) if m else 5)
q = "insert into iu values (?,?,?,?)"
for cd in cds:
# one for each unique cooldown duration
try:
db.execute(q, (cd, wark[:16], rd, fn))
except:
assert self.mem_cur
rd, fn = s3enc(self.mem_cur, rd, fn)
db.execute(q, (cd, wark[:16], rd, fn))
if self.xiu_asleep:
self.xiu_asleep = False
with self.rescan_cond:
self.rescan_cond.notify_all()
def handle_rm(self, uname: str, ip: str, vpaths: list[str], lim: list[int]) -> str:
n_files = 0
ok = {}
@ -2678,6 +2839,7 @@ class Up2k(object):
ptop = vn.realpath
atop = vn.canonical(rem, False)
self.vol_act[ptop] = self.db_act
adir, fn = os.path.split(atop)
try:
st = bos.lstat(atop)
@ -2711,18 +2873,31 @@ class Up2k(object):
self.log("hit delete limit of {} files".format(lim[1]), 3)
break
n_files += 1
abspath = djoin(adir, fn)
volpath = "{}/{}".format(vrem, fn).strip("/")
vpath = "{}/{}".format(dbv.vpath, volpath).strip("/")
self.log("rm {}\n {}".format(vpath, abspath))
_ = dbv.get(volpath, uname, *permsets[0])
if xbd and not runhook(
self.log, xbd, abspath, vpath, "", uname, "", 0, 0, ""
if xbd:
st = bos.stat(abspath)
if not runhook(
self.log,
xbd,
abspath,
vpath,
"",
uname,
st.st_mtime,
st.st_size,
ip,
0,
"",
):
self.log("delete blocked by xbd: {}".format(abspath), 1)
t = "delete blocked by xbd server config: {}"
self.log(t.format(abspath), 1)
continue
n_files += 1
with self.mutex:
cur = None
try:
@ -2735,7 +2910,7 @@ class Up2k(object):
bos.unlink(abspath)
if xad:
runhook(self.log, xad, abspath, vpath, "", uname, "", 0, 0, "")
runhook(self.log, xad, abspath, vpath, "", uname, 0, 0, ip, 0, "")
ok: list[str] = []
ng: list[str] = []
@ -2747,11 +2922,11 @@ class Up2k(object):
return n_files, ok + ok2, ng + ng2
def handle_mv(self, uname: str, svp: str, dvp: str) -> str:
self.db_act = time.time()
svn, srem = self.asrv.vfs.get(svp, uname, True, False, True)
svn, srem = svn.get_dbv(srem)
sabs = svn.canonical(srem, False)
curs: set["sqlite3.Cursor"] = set()
self.db_act = self.vol_act[svn.realpath] = time.time()
if not srem:
raise Pebkac(400, "mv: cannot move a mountpoint")
@ -2787,7 +2962,7 @@ class Up2k(object):
with self.mutex:
try:
for fn in files:
self.db_act = time.time()
self.db_act = self.vol_act[dbv.realpath] = time.time()
svpf = "/".join(x for x in [dbv.vpath, vrem, fn[0]] if x)
if not svpf.startswith(svp + "/"): # assert
raise Pebkac(500, "mv: bug at {}, top {}".format(svpf, svp))
@ -2830,8 +3005,12 @@ class Up2k(object):
xbr = svn.flags.get("xbr")
xar = dvn.flags.get("xar")
if xbr and not runhook(self.log, xbr, sabs, svp, "", uname, "", 0, 0, ""):
t = "move blocked by xbr: {}".format(svp)
if xbr:
st = bos.stat(sabs)
if not runhook(
self.log, xbr, sabs, svp, "", uname, st.st_mtime, st.st_size, "", 0, ""
):
t = "move blocked by xbr server config: {}".format(svp)
self.log(t, 1)
raise Pebkac(405, t)
@ -2852,7 +3031,7 @@ class Up2k(object):
self.rescan_cond.notify_all()
if xar:
runhook(self.log, xar, dabs, dvp, "", uname, "", 0, 0, "")
runhook(self.log, xar, dabs, dvp, "", uname, 0, 0, "", 0, "")
return "k"
@ -2893,13 +3072,27 @@ class Up2k(object):
curs.add(c1)
if c2:
self.db_add(c2, w, drd, dfn, ftime, fsize, ip or "", at or 0)
self.db_add(
c2,
{}, # skip upload hooks
drd,
dfn,
ftime,
fsize,
dvn.realpath,
dvn.vpath,
w,
"",
"",
ip or "",
at or 0,
)
curs.add(c2)
else:
self.log("not found in src db: [{}]".format(svp))
if xar:
runhook(self.log, xar, dabs, dvp, "", uname, "", 0, 0, "")
runhook(self.log, xar, dabs, dvp, "", uname, 0, 0, "", 0, "")
return "k"
@ -3131,9 +3324,10 @@ class Up2k(object):
vp_chk,
job["host"],
job["user"],
job["addr"],
job["t0"],
int(job["lmod"]),
job["size"],
job["addr"],
int(job["t0"]),
"",
):
t = "upload blocked by xbu: {}".format(vp_chk)
@ -3373,7 +3567,7 @@ class Up2k(object):
self.n_hashq -= 1
# self.log("hashq {}".format(self.n_hashq))
ptop, rd, fn, ip, at = self.hashq.get()
ptop, vtop, rd, fn, ip, at, usr = self.hashq.get()
# self.log("hashq {} pop {}/{}/{}".format(self.n_hashq, ptop, rd, fn))
if "e2d" not in self.flags[ptop]:
continue
@ -3393,18 +3587,39 @@ class Up2k(object):
wark = up2k_wark_from_hashlist(self.salt, inf.st_size, hashes)
with self.mutex:
self.idx_wark(ptop, wark, rd, fn, inf.st_mtime, inf.st_size, ip, at)
self.idx_wark(
self.flags[ptop],
rd,
fn,
inf.st_mtime,
inf.st_size,
ptop,
vtop,
wark,
"",
usr,
ip,
at,
)
if at and time.time() - at > 30:
with self.rescan_cond:
self.rescan_cond.notify_all()
def hash_file(
self, ptop: str, flags: dict[str, Any], rd: str, fn: str, ip: str, at: float
self,
ptop: str,
vtop: str,
flags: dict[str, Any],
rd: str,
fn: str,
ip: str,
at: float,
usr: str,
) -> None:
with self.mutex:
self.register_vpath(ptop, flags)
self.hashq.put((ptop, rd, fn, ip, at))
self.hashq.put((ptop, vtop, rd, fn, ip, at, usr))
self.n_hashq += 1
# self.log("hashq {} push {}/{}/{}".format(self.n_hashq, ptop, rd, fn))

View file

@ -1849,6 +1849,14 @@ def _msaenc(txt: str) -> bytes:
return txt.replace("/", "\\").encode(FS_ENCODING, "surrogateescape")
def _uncify(txt: str) -> str:
txt = txt.replace("/", "\\")
if ":" not in txt and not txt.startswith("\\\\"):
txt = absreal(txt)
return txt if txt.startswith("\\\\") else "\\\\?\\" + txt
def _msenc(txt: str) -> bytes:
txt = txt.replace("/", "\\")
if ":" not in txt and not txt.startswith("\\\\"):
@ -1877,9 +1885,11 @@ if not PY2 and WINDOWS:
afsenc = _msaenc
fsenc = _msenc
fsdec = _msdec
uncify = _uncify
elif not PY2 or not WINDOWS:
fsenc = afsenc = sfsenc = w8enc
fsdec = w8dec
uncify = str
else:
# moonrunes become \x3f with bytestrings,
# losing mojibake support is worth
@ -1891,6 +1901,7 @@ else:
fsenc = afsenc = sfsenc = _not_actually_mbcs_enc
fsdec = _not_actually_mbcs_dec
uncify = str
def s3enc(mem_cur: "sqlite3.Cursor", rd: str, fn: str) -> tuple[str, str]:
@ -2512,23 +2523,14 @@ def retchk(
raise Exception(t)
def _runhook(
log: "NamedLogger",
cmd: str,
ap: str,
vp: str,
host: str,
uname: str,
ip: str,
at: float,
sz: int,
txt: str,
) -> bool:
def _parsehook(
log: Optional["NamedLogger"], cmd: str
) -> tuple[bool, bool, bool, float, dict[str, Any], str]:
chk = False
fork = False
jtxt = False
wait = 0
tout = 0
wait = 0.0
tout = 0.0
kill = "t"
cap = 0
ocmd = cmd
@ -2548,9 +2550,11 @@ def _runhook(
cap = int(arg[1:]) # 0=none 1=stdout 2=stderr 3=both
elif arg.startswith("k"):
kill = arg[1:] # [t]ree [m]ain [n]one
elif arg.startswith("i"):
pass
else:
t = "hook: invalid flag {} in {}"
log(t.format(arg, ocmd))
(log or print)(t.format(arg, ocmd))
env = os.environ.copy()
try:
@ -2565,22 +2569,92 @@ def _runhook(
if not EXE:
raise
ka = {
sp_ka = {
"env": env,
"timeout": tout,
"kill": kill,
"capture": cap,
}
if cmd.startswith("~"):
cmd = os.path.expanduser(cmd)
return chk, fork, jtxt, wait, sp_ka, cmd
def runihook(
log: Optional["NamedLogger"],
cmd: str,
vol: "VFS",
ups: list[tuple[str, int, int, str, str, str, int]],
) -> bool:
ocmd = cmd
chk, fork, jtxt, wait, sp_ka, cmd = _parsehook(log, cmd)
bcmd = [sfsenc(cmd)]
if cmd.endswith(".py"):
bcmd = [sfsenc(pybin)] + bcmd
vps = [vjoin(*list(s3dec(x[3], x[4]))) for x in ups]
aps = [djoin(vol.realpath, x) for x in vps]
if jtxt:
# 0w 1mt 2sz 3rd 4fn 5ip 6at
ja = [
{
"ap": uncify(ap), # utf8 for json
"vp": vp,
"wark": x[0][:16],
"mt": x[1],
"sz": x[2],
"ip": x[5],
"at": x[6],
}
for x, vp, ap in zip(ups, vps, aps)
]
sp_ka["sin"] = json.dumps(ja).encode("utf-8", "replace")
else:
sp_ka["sin"] = b"\n".join(fsenc(x) for x in aps)
t0 = time.time()
if fork:
Daemon(runcmd, ocmd, [bcmd], ka=sp_ka)
else:
rc, v, err = runcmd(bcmd, **sp_ka) # type: ignore
if chk and rc:
retchk(rc, bcmd, err, log, 5)
return False
wait -= time.time() - t0
if wait > 0:
time.sleep(wait)
return True
def _runhook(
log: Optional["NamedLogger"],
cmd: str,
ap: str,
vp: str,
host: str,
uname: str,
mt: float,
sz: int,
ip: str,
at: float,
txt: str,
) -> bool:
ocmd = cmd
chk, fork, jtxt, wait, sp_ka, cmd = _parsehook(log, cmd)
if jtxt:
ja = {
"ap": ap,
"vp": vp,
"mt": mt,
"sz": sz,
"ip": ip,
"at": at or time.time(),
"host": host,
"user": uname,
"at": at or time.time(),
"sz": sz,
"txt": txt,
}
arg = json.dumps(ja)
@ -2595,9 +2669,9 @@ def _runhook(
t0 = time.time()
if fork:
Daemon(runcmd, ocmd, [acmd], ka=ka)
Daemon(runcmd, ocmd, [bcmd], ka=sp_ka)
else:
rc, v, err = runcmd(bcmd, **ka) # type: ignore
rc, v, err = runcmd(bcmd, **sp_ka) # type: ignore
if chk and rc:
retchk(rc, bcmd, err, log, 5)
return False
@ -2610,24 +2684,25 @@ def _runhook(
def runhook(
log: "NamedLogger",
log: Optional["NamedLogger"],
cmds: list[str],
ap: str,
vp: str,
host: str,
uname: str,
mt: float,
sz: int,
ip: str,
at: float,
sz: int,
txt: str,
) -> bool:
vp = vp.replace("\\", "/")
for cmd in cmds:
try:
if not _runhook(log, cmd, ap, vp, host, uname, ip, at, sz, txt):
if not _runhook(log, cmd, ap, vp, host, uname, mt, sz, ip, at, txt):
return False
except Exception as ex:
log("hook: {}".format(ex))
(log or print)("hook: {}".format(ex))
if ",c," in "," + cmd:
return False
break

View file

@ -113,7 +113,7 @@ class Cfg(Namespace):
ex = "doctitle favico html_head lg_sbf log_fk md_sbf mth textfiles R RS SR"
ka.update(**{k: "" for k in ex.split()})
ex = "xad xar xau xbd xbr xbu xm"
ex = "xad xar xau xbd xbr xbu xiu xm"
ka.update(**{k: [] for k in ex.split()})
super(Cfg, self).__init__(