list active downloads in controlpanel

This commit is contained in:
ed 2024-11-10 02:12:18 +00:00
parent 0ce7cf5e10
commit 8aba5aed4f
21 changed files with 271 additions and 31 deletions

View file

@ -1687,6 +1687,7 @@ scrape_configs:
currently the following metrics are available,
* `cpp_uptime_seconds` time since last copyparty restart
* `cpp_boot_unixtime_seconds` same but as an absolute timestamp
* `cpp_active_dl` number of active downloads
* `cpp_http_conns` number of open http(s) connections
* `cpp_http_reqs` number of http(s) requests handled
* `cpp_sus_reqs` number of 403/422/malicious requests

View file

@ -80,6 +80,7 @@ web/deps/prismd.css
web/deps/scp.woff2
web/deps/sha512.ac.js
web/deps/sha512.hw.js
web/iiam.gif
web/md.css
web/md.html
web/md.js

View file

@ -1321,6 +1321,7 @@ def add_admin(ap):
ap2.add_argument("--no-reload", action="store_true", help="disable ?reload=cfg (reload users/volumes/volflags from config file)")
ap2.add_argument("--no-rescan", action="store_true", help="disable ?scan (volume reindexing)")
ap2.add_argument("--no-stack", action="store_true", help="disable ?stack (list all stacks)")
ap2.add_argument("--dl-list", metavar="LVL", type=int, default=2, help="who can see active downloads in the controlpanel? [\033[32m0\033[0m]=nobody, [\033[32m1\033[0m]=admins, [\033[32m2\033[0m]=everyone")
def add_thumbnail(ap):

View file

@ -43,6 +43,9 @@ class BrokerMp(object):
self.procs = []
self.mutex = threading.Lock()
self.retpend: dict[int, Any] = {}
self.retpend_mutex = threading.Lock()
self.num_workers = self.args.j or CORES
self.log("broker", "booting {} subprocesses".format(self.num_workers))
for n in range(1, self.num_workers + 1):
@ -54,6 +57,8 @@ class BrokerMp(object):
self.procs.append(proc)
proc.start()
Daemon(self.periodic, "mp-periodic")
def shutdown(self) -> None:
self.log("broker", "shutting down")
for n, proc in enumerate(self.procs):
@ -90,8 +95,10 @@ class BrokerMp(object):
self.log(*args)
elif dest == "retq":
# response from previous ipc call
raise Exception("invalid broker_mp usage")
with self.retpend_mutex:
retq = self.retpend.pop(retq_id)
retq.put(args[0])
else:
# new ipc invoking managed service in hub
@ -109,7 +116,6 @@ class BrokerMp(object):
proc.q_pend.put((retq_id, "retq", rv))
def ask(self, dest: str, *args: Any) -> Union[ExceptionalQueue, NotExQueue]:
# new non-ipc invoking managed service in hub
obj = self.hub
for node in dest.split("."):
@ -121,17 +127,30 @@ class BrokerMp(object):
retq.put(rv)
return retq
def wask(self, dest: str, *args: Any) -> list[Union[ExceptionalQueue, NotExQueue]]:
# call from hub to workers
ret = []
for p in self.procs:
retq = ExceptionalQueue(1)
retq_id = id(retq)
with self.retpend_mutex:
self.retpend[retq_id] = retq
p.q_pend.put((retq_id, dest, list(args)))
ret.append(retq)
return ret
def say(self, dest: str, *args: Any) -> None:
"""
send message to non-hub component in other process,
returns a Queue object which eventually contains the response if want_retval
(not-impl here since nothing uses it yet)
"""
if dest == "listen":
if dest == "httpsrv.listen":
for p in self.procs:
p.q_pend.put((0, dest, [args[0], len(self.procs)]))
elif dest == "set_netdevs":
elif dest == "httpsrv.set_netdevs":
for p in self.procs:
p.q_pend.put((0, dest, list(args)))
@ -140,3 +159,19 @@ class BrokerMp(object):
else:
raise Exception("what is " + str(dest))
def periodic(self) -> None:
while True:
time.sleep(1)
tdli = {}
tdls = {}
qs = self.wask("httpsrv.read_dls")
for q in qs:
qr = q.get()
dli, dls = qr
tdli.update(dli)
tdls.update(dls)
tdl = (tdli, tdls)
for p in self.procs:
p.q_pend.put((0, "httpsrv.write_dls", tdl))

View file

@ -82,37 +82,38 @@ class MpWorker(BrokerCli):
while True:
retq_id, dest, args = self.q_pend.get()
# self.logw("work: [{}]".format(d[0]))
if dest == "retq":
# response from previous ipc call
with self.retpend_mutex:
retq = self.retpend.pop(retq_id)
retq.put(args)
continue
if dest == "shutdown":
self.httpsrv.shutdown()
self.logw("ok bye")
sys.exit(0)
return
elif dest == "reload":
if dest == "reload":
self.logw("mpw.asrv reloading")
self.asrv.reload()
self.logw("mpw.asrv reloaded")
continue
elif dest == "reload_sessions":
if dest == "reload_sessions":
with self.asrv.mutex:
self.asrv.load_sessions()
continue
elif dest == "listen":
self.httpsrv.listen(args[0], args[1])
obj = self
for node in dest.split("."):
obj = getattr(obj, node)
elif dest == "set_netdevs":
self.httpsrv.set_netdevs(args[0])
elif dest == "retq":
# response from previous ipc call
with self.retpend_mutex:
retq = self.retpend.pop(retq_id)
retq.put(args)
else:
raise Exception("what is " + str(dest))
rv = obj(*args) # type: ignore
if retq_id:
self.say("retq", rv, retq_id=retq_id)
def ask(self, dest: str, *args: Any) -> Union[ExceptionalQueue, NotExQueue]:
retq = ExceptionalQueue(1)
@ -123,5 +124,5 @@ class MpWorker(BrokerCli):
self.q_yield.put((retq_id, dest, list(args)))
return retq
def say(self, dest: str, *args: Any) -> None:
self.q_yield.put((0, dest, list(args)))
def say(self, dest: str, *args: Any, retq_id=0) -> None:
self.q_yield.put((retq_id, dest, list(args)))

View file

@ -53,11 +53,11 @@ class BrokerThr(BrokerCli):
return NotExQueue(obj(*args)) # type: ignore
def say(self, dest: str, *args: Any) -> None:
if dest == "listen":
if dest == "httpsrv.listen":
self.httpsrv.listen(args[0], 1)
return
if dest == "set_netdevs":
if dest == "httpsrv.set_netdevs":
self.httpsrv.set_netdevs(args[0])
return

View file

@ -186,6 +186,7 @@ class HttpCli(object):
self.rem = " "
self.vpath = " "
self.vpaths = " "
self.dl_id = ""
self.gctx = " " # additional context for garda
self.trailing_slash = True
self.uname = " "
@ -726,6 +727,11 @@ class HttpCli(object):
except Pebkac:
return False
finally:
if self.dl_id:
self.conn.hsrv.dli.pop(self.dl_id, None)
self.conn.hsrv.dls.pop(self.dl_id, None)
def dip(self) -> str:
if self.args.plain_ip:
return self.ip.replace(":", ".")
@ -1218,6 +1224,9 @@ class HttpCli(object):
if "shares" in self.uparam:
return self.tx_shares()
if "dls" in self.uparam:
return self.tx_dls()
if "h" in self.uparam:
return self.tx_mounts()
@ -3690,6 +3699,8 @@ class HttpCli(object):
self.args.s_wr_sz,
self.args.s_wr_slp,
not self.args.no_poll,
{},
"",
)
res.close()
@ -3736,6 +3747,7 @@ class HttpCli(object):
editions: dict[str, tuple[str, int]] = {}
for ext in ("", ".gz"):
if ptop is not None:
assert job and ap_data # type: ignore # !rm
sz = job["size"]
file_ts = job["lmod"]
editions["plain"] = (ap_data, sz)
@ -3904,7 +3916,21 @@ class HttpCli(object):
self.send_headers(length=upper - lower, status=status, mime=mime)
return True
dls = self.conn.hsrv.dls
if upper - lower > 0x400000: # 4m
now = time.time()
self.dl_id = "%s:%s" % (self.ip, self.addr[1])
dls[self.dl_id] = (now, 0)
self.conn.hsrv.dli[self.dl_id] = (
now,
upper - lower,
self.vn,
self.vpath,
self.uname,
)
if ptop is not None:
assert job and ap_data # type: ignore # !rm
return self.tx_pipe(
ptop, req_path, ap_data, job, lower, upper, status, mime, logmsg
)
@ -3923,6 +3949,8 @@ class HttpCli(object):
self.args.s_wr_sz,
self.args.s_wr_slp,
not self.args.no_poll,
dls,
self.dl_id,
)
if remains > 0:
@ -4073,6 +4101,8 @@ class HttpCli(object):
wr_sz,
wr_slp,
not self.args.no_poll,
self.conn.hsrv.dls,
self.dl_id,
)
spd = self._spd((upper - lower) - remains)
@ -4158,6 +4188,18 @@ class HttpCli(object):
self.log("transcoding to [{}]".format(cfmt))
fgen = gfilter(fgen, self.thumbcli, self.uname, vpath, cfmt)
now = time.time()
self.dl_id = "%s:%s" % (self.ip, self.addr[1])
self.conn.hsrv.dli[self.dl_id] = (
now,
0,
self.vn,
"%s :%s" % (self.vpath, ext),
self.uname,
)
dls = self.conn.hsrv.dls
dls[self.dl_id] = (time.time(), 0)
bgen = packer(
self.log,
self.asrv,
@ -4166,6 +4208,7 @@ class HttpCli(object):
pre_crc="crc" in uarg,
cmp=uarg if cancmp or uarg == "pax" else "",
)
n = 0
bsent = 0
for buf in bgen.gen():
if not buf:
@ -4179,6 +4222,11 @@ class HttpCli(object):
bgen.stop()
break
n += 1
if n >= 4:
n = 0
dls[self.dl_id] = (time.time(), bsent)
spd = self._spd(bsent)
self.log("{}, {}".format(logmsg, spd))
return True
@ -4436,6 +4484,32 @@ class HttpCli(object):
assert vstate.items and vs # type: ignore # !rm
dls = dl_list = []
if self.conn.hsrv.tdls:
zi = self.args.dl_list
if zi == 2 or (zi == 1 and self.avol):
dl_list = self.get_dls()
for t0, t1, sent, sz, vp, dl_id, uname in dl_list:
rem = sz - sent
td = max(0.1, now - t0)
rd, fn = vsplit(vp)
if not rd:
rd = "/"
erd = quotep(rd)
rds = rd.replace("/", " / ")
spd = humansize(sent / td, True) + "/s"
hsent = humansize(sent, True)
idle = s2hms(now - t1, True)
usr = "%s @%s" % (dl_id, uname) if dl_id else uname
if sz and sent and td:
eta = s2hms((sz - sent) / (sent / td), True)
perc = int(100 * sent / sz)
else:
eta = perc = "--"
fn = html_escape(fn) if fn else self.conn.hsrv.iiam
dls.append((perc, hsent, spd, eta, idle, usr, erd, rds, fn))
fmt = self.uparam.get("ls", "")
if not fmt and (self.ua.startswith("curl/") or self.ua.startswith("fetch")):
fmt = "v"
@ -4457,6 +4531,12 @@ class HttpCli(object):
txt += "\n%s" % (", ".join((str(x) for x in zt)),)
txt += "\n"
if dls:
txt += "\n\nactive downloads:"
for zt in dls:
txt += "\n%s" % (", ".join((str(x) for x in zt)),)
txt += "\n"
if rvol:
txt += "\nyou can browse:"
for v in rvol:
@ -4480,6 +4560,7 @@ class HttpCli(object):
avol=avol,
in_shr=self.args.shr and self.vpath.startswith(self.args.shr1),
vstate=vstate,
dls=dls,
ups=ups,
scanning=vs["scanning"],
hashq=vs["hashq"],
@ -4700,6 +4781,40 @@ class HttpCli(object):
ret["a"] = dirs
return ret
def get_dls(self) -> list[list[Any]]:
ret = []
dls = self.conn.hsrv.tdls
for dl_id, (t0, sz, vn, vp, uname) in self.conn.hsrv.tdli.items():
t1, sent = dls[dl_id]
if sent > 0x100000: # 1m; buffers 2~4
sent -= 0x100000
if self.uname not in vn.axs.uread:
vp = ""
elif self.uname not in vn.axs.udot and (vp.startswith(".") or "/." in vp):
vp = ""
if self.uname not in vn.axs.uadmin:
dl_id = uname = ""
ret.append([t0, t1, sent, sz, vp, dl_id, uname])
return ret
def tx_dls(self) -> bool:
ret = [
{
"t0": x[0],
"t1": x[1],
"sent": x[2],
"size": x[3],
"path": x[4],
"conn": x[5],
"uname": x[6],
}
for x in self.get_dls()
]
zs = json.dumps(ret, separators=(",\n", ": "))
self.reply(zs.encode("utf-8", "replace"), mime="application/json")
return True
def tx_ups(self) -> bool:
idx = self.conn.get_u2idx()
if not idx or not hasattr(idx, "p_end"):

View file

@ -81,6 +81,7 @@ from .util import (
)
if TYPE_CHECKING:
from .authsrv import VFS
from .broker_util import BrokerCli
from .ssdp import SSDPr
@ -130,6 +131,12 @@ class HttpSrv(object):
self.bans: dict[str, int] = {}
self.aclose: dict[str, int] = {}
dli: dict[str, tuple[float, int, "VFS", str, str]] = {} # info
dls: dict[str, tuple[float, int]] = {} # state
self.dli = self.tdli = dli
self.dls = self.tdls = dls
self.iiam = '<img src="%s.cpr/iiam.gif" />' % (self.args.SRS,)
self.bound: set[tuple[str, int]] = set()
self.name = "hsrv" + nsuf
self.mutex = threading.Lock()
@ -205,6 +212,9 @@ class HttpSrv(object):
self.start_threads(4)
if nid:
self.tdli = {}
self.tdls = {}
if self.args.stackmon:
start_stackmon(self.args.stackmon, nid)
@ -579,3 +589,32 @@ class HttpSrv(object):
ident += "a"
self.u2idx_free[ident] = u2idx
def read_dls(
self,
) -> tuple[
dict[str, tuple[float, int, str, str, str]], dict[str, tuple[float, int]]
]:
"""
mp-broker asking for local dl-info + dl-state;
reduce overhead by sending just the vfs vpath
"""
dli = {k: (a, b, c.vpath, d, e) for k, (a, b, c, d, e) in self.dli.items()}
return (dli, self.dls)
def write_dls(
self,
sdli: dict[str, tuple[float, int, str, str, str]],
dls: dict[str, tuple[float, int]],
) -> None:
"""
mp-broker pushing total dl-info + dl-state;
swap out the vfs vpath with the vfs node
"""
dli: dict[str, tuple[float, int, "VFS", str, str]] = {}
for k, (a, b, c, d, e) in sdli.items():
vn = self.asrv.vfs.all_vols[c]
dli[k] = (a, b, vn, d, e)
self.tdli = dli
self.tdls = dls

View file

@ -72,6 +72,9 @@ class Metrics(object):
v = "{:.3f}".format(self.hsrv.t0)
addug("cpp_boot_unixtime", "seconds", v, t)
t = "number of active downloads"
addg("cpp_active_dl", str(len(self.hsrv.tdls)), t)
t = "number of open http(s) client connections"
addg("cpp_http_conns", str(self.hsrv.ncli), t)

View file

@ -1004,15 +1004,18 @@ class SvcHub(object):
except:
self.log("root", "ssdp startup failed;\n" + min_ex(), 3)
def reload(self, rescan_all_vols: bool, up2k: bool) -> None:
def reload(self, rescan_all_vols: bool, up2k: bool) -> str:
t = "config has been reloaded"
with self.reload_mutex:
self.log("root", "reloading config")
self.asrv.reload(9 if up2k else 4)
if up2k:
self.up2k.reload(rescan_all_vols)
t += "; volumes are now reinitializing"
else:
self.log("root", "reload done")
self.broker.reload()
return t
def _reload_sessions(self) -> None:
with self.asrv.mutex:

View file

@ -371,7 +371,7 @@ class TcpSrv(object):
if self.args.q:
print(msg)
self.hub.broker.say("listen", srv)
self.hub.broker.say("httpsrv.listen", srv)
self.srv = srvs
self.bound = bound
@ -379,7 +379,7 @@ class TcpSrv(object):
self._distribute_netdevs()
def _distribute_netdevs(self):
self.hub.broker.say("set_netdevs", self.netdevs)
self.hub.broker.say("httpsrv.set_netdevs", self.netdevs)
self.hub.start_zeroconf()
gencert(self.log, self.args, self.netdevs)
self.hub.restart_ftpd()

View file

@ -2791,7 +2791,10 @@ def sendfile_py(
bufsz: int,
slp: float,
use_poll: bool,
dls: dict[str, tuple[float, int]],
dl_id: str,
) -> int:
sent = 0
remains = upper - lower
f.seek(lower)
while remains > 0:
@ -2808,6 +2811,10 @@ def sendfile_py(
except:
return remains
if dl_id:
sent += len(buf)
dls[dl_id] = (time.time(), sent)
return 0
@ -2820,6 +2827,8 @@ def sendfile_kern(
bufsz: int,
slp: float,
use_poll: bool,
dls: dict[str, tuple[float, int]],
dl_id: str,
) -> int:
out_fd = s.fileno()
in_fd = f.fileno()
@ -2832,7 +2841,7 @@ def sendfile_kern(
while ofs < upper:
stuck = stuck or time.time()
try:
req = min(2 ** 30, upper - ofs)
req = min(0x2000000, upper - ofs) # 32 MiB
if use_poll:
poll.poll(10000)
else:
@ -2856,6 +2865,9 @@ def sendfile_kern(
return upper - ofs
ofs += n
if dl_id:
dls[dl_id] = (time.time(), ofs - lower)
# print("sendfile: ok, sent {} now, {} total, {} remains".format(n, ofs - lower, upper - ofs))
return 0

BIN
copyparty/web/iiam.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 230 B

View file

@ -90,6 +90,10 @@ table {
text-align: left;
white-space: nowrap;
}
.vols td:empty,
.vols th:empty {
padding: 0;
}
.num {
border-right: 1px solid #bbb;
}
@ -222,3 +226,6 @@ html.bz {
color: #bbd;
background: #11121d;
}
html.bz .vols img {
filter: sepia(0.8) hue-rotate(180deg);
}

View file

@ -44,6 +44,18 @@
</table>
{%- endif %}
{%- if dls %}
<h1 id="ae">active downloads:</h1>
<table class="vols">
<thead><tr><th>%</th><th>sent</th><th>speed</th><th>eta</th><th>idle</th><th></th><th>dir</th><th>file</th></tr></thead>
<tbody>
{% for u in dls %}
<tr><td>{{ u[0] }}</td><td>{{ u[1] }}</td><td>{{ u[2] }}</td><td>{{ u[3] }}</td><td>{{ u[4] }}</td><td>{{ u[5] }}</td><td><a href="{{ u[6] }}">{{ u[7]|e }}</a></td><td>{{ u[8] }}</td></tr>
{% endfor %}
</tbody>
</table>
{%- endif %}
{%- if avol %}
<h1>admin panel:</h1>
<table><tr><td> <!-- hehehe -->

View file

@ -37,6 +37,7 @@ var Ls = {
"ab1": "skru av no304",
"ac1": "skru på no304",
"ad1": "no304 stopper all bruk av cache. Hvis ikke k304 var nok, prøv denne. Vil mangedoble dataforbruk!",
"ae1": "utgående:",
},
"eng": {
"d2": "shows the state of all active threads",
@ -86,6 +87,7 @@ var Ls = {
"ab1": "关闭 k304",
"ac1": "开启 k304",
"ad1": "启用 no304 将禁用所有缓存;如果 k304 不够,可以尝试此选项。这将消耗大量的网络流量!", //m
"ae1": "正在下载:", //m
}
};

View file

@ -140,6 +140,7 @@ authenticate using header `Cookie: cppwd=foo` or url param `&pw=foo`
| GET | `?tar&j` | pregenerate jpg thumbnails |
| GET | `?tar&p` | pregenerate audio waveforms |
| GET | `?shares` | list your shared files/folders |
| GET | `?dls` | show active downloads (do this as admin) |
| GET | `?ups` | show recent uploads from your IP |
| GET | `?ups&filter=f` | ...where URL contains `f` |
| GET | `?mime=foo` | specify return mimetype `foo` |

View file

@ -94,6 +94,7 @@ copyparty/web/deps/prismd.css,
copyparty/web/deps/scp.woff2,
copyparty/web/deps/sha512.ac.js,
copyparty/web/deps/sha512.hw.js,
copyparty/web/iiam.gif,
copyparty/web/md.css,
copyparty/web/md.html,
copyparty/web/md.js,

View file

@ -79,6 +79,7 @@ var tl_cpanel = {
"ab1": "disable no304",
"ac1": "enable no304",
"ad1": "enabling no304 will disable all caching; try this if k304 wasn't enough. This will waste a huge amount of network traffic!",
"ae1": "active downloads:",
},
};

View file

@ -57,6 +57,7 @@ class TestMetrics(unittest.TestCase):
ptns = r"""
cpp_uptime_seconds [0-9]\.[0-9]{3}$
cpp_boot_unixtime_seconds [0-9]{7,10}\.[0-9]{3}$
cpp_active_dl 0$
cpp_http_reqs_created [0-9]{7,10}$
cpp_http_reqs_total -1$
cpp_http_conns 9$

View file

@ -134,7 +134,7 @@ class Cfg(Namespace):
ex = "hash_mt safe_dedup srch_time u2abort u2j u2sz"
ka.update(**{k: 1 for k in ex.split()})
ex = "au_vol mtab_age reg_cap s_thead s_tbody th_convt"
ex = "au_vol dl_list mtab_age reg_cap s_thead s_tbody th_convt"
ka.update(**{k: 9 for k in ex.split()})
ex = "db_act k304 loris no304 re_maxage rproxy rsp_jtr rsp_slp s_wr_slp snap_wri theme themes turbo"
@ -254,6 +254,8 @@ class VHttpSrv(object):
self.broker = NullBroker(args, asrv)
self.prism = None
self.bans = {}
self.tdls = self.dls = {}
self.tdli = self.dli = {}
self.nreq = 0
self.nsus = 0
@ -292,6 +294,8 @@ class VHttpConn(object):
self.args = args
self.asrv = asrv
self.bans = {}
self.tdls = self.dls = {}
self.tdli = self.dli = {}
self.freshen_pwd = 0.0
Ctor = VHttpSrvUp2k if use_up2k else VHttpSrv