mirror of
https://github.com/9001/copyparty.git
synced 2025-08-17 09:02:15 -06:00
2636 lines
84 KiB
Python
2636 lines
84 KiB
Python
# coding: utf-8
|
||
from __future__ import print_function, unicode_literals
|
||
|
||
import argparse # typechk
|
||
import base64
|
||
import calendar
|
||
import copy
|
||
import gzip
|
||
import json
|
||
import os
|
||
import re
|
||
import socket
|
||
import stat
|
||
import string
|
||
import threading # typechk
|
||
import time
|
||
from datetime import datetime
|
||
from operator import itemgetter
|
||
|
||
import jinja2 # typechk
|
||
|
||
try:
|
||
import lzma
|
||
except:
|
||
pass
|
||
|
||
try:
|
||
import ctypes
|
||
except:
|
||
pass
|
||
|
||
from .__init__ import ANYWIN, PY2, TYPE_CHECKING, WINDOWS, E, unicode
|
||
from .authsrv import VFS # typechk
|
||
from .bos import bos
|
||
from .star import StreamTar
|
||
from .sutil import StreamArc # typechk
|
||
from .szip import StreamZip
|
||
from .util import (
|
||
HTTP_TS_FMT,
|
||
HTTPCODE,
|
||
META_NOBOTS,
|
||
MultipartParser,
|
||
Pebkac,
|
||
UnrecvEOF,
|
||
alltrace,
|
||
atomic_move,
|
||
exclude_dotfiles,
|
||
fsenc,
|
||
gen_filekey,
|
||
gencookie,
|
||
get_spd,
|
||
guess_mime,
|
||
gzip_orig_sz,
|
||
hashcopy,
|
||
html_bescape,
|
||
html_escape,
|
||
http_ts,
|
||
humansize,
|
||
min_ex,
|
||
quotep,
|
||
read_header,
|
||
read_socket,
|
||
read_socket_chunked,
|
||
read_socket_unbounded,
|
||
relchk,
|
||
ren_open,
|
||
s3enc,
|
||
sanitize_fn,
|
||
sendfile_kern,
|
||
sendfile_py,
|
||
undot,
|
||
unescape_cookie,
|
||
unquote,
|
||
unquotep,
|
||
vol_san,
|
||
vsplit,
|
||
yieldfile,
|
||
)
|
||
|
||
try:
|
||
from typing import Any, Generator, Match, Optional, Pattern, Type, Union
|
||
except:
|
||
pass
|
||
|
||
if TYPE_CHECKING:
|
||
from .httpconn import HttpConn
|
||
|
||
_ = (argparse, threading)
|
||
|
||
NO_CACHE = {"Cache-Control": "no-cache"}
|
||
|
||
|
||
class HttpCli(object):
|
||
"""
|
||
Spawned by HttpConn to process one http transaction
|
||
"""
|
||
|
||
def __init__(self, conn: "HttpConn") -> None:
|
||
assert conn.sr
|
||
|
||
self.t0 = time.time()
|
||
self.conn = conn
|
||
self.mutex = conn.mutex # mypy404
|
||
self.s = conn.s
|
||
self.sr = conn.sr
|
||
self.ip = conn.addr[0]
|
||
self.addr: tuple[str, int] = conn.addr
|
||
self.args = conn.args # mypy404
|
||
self.asrv = conn.asrv # mypy404
|
||
self.ico = conn.ico # mypy404
|
||
self.thumbcli = conn.thumbcli # mypy404
|
||
self.u2fh = conn.u2fh # mypy404
|
||
self.log_func = conn.log_func # mypy404
|
||
self.log_src = conn.log_src # mypy404
|
||
self.tls: bool = hasattr(self.s, "cipher")
|
||
|
||
# placeholders; assigned by run()
|
||
self.keepalive = False
|
||
self.is_https = False
|
||
self.headers: dict[str, str] = {}
|
||
self.mode = " "
|
||
self.req = " "
|
||
self.http_ver = " "
|
||
self.ua = " "
|
||
self.is_rclone = False
|
||
self.is_ancient = False
|
||
self.dip = " "
|
||
self.ouparam: dict[str, str] = {}
|
||
self.uparam: dict[str, str] = {}
|
||
self.cookies: dict[str, str] = {}
|
||
self.vpath = " "
|
||
self.uname = " "
|
||
self.rvol = [" "]
|
||
self.wvol = [" "]
|
||
self.mvol = [" "]
|
||
self.dvol = [" "]
|
||
self.gvol = [" "]
|
||
self.do_log = True
|
||
self.can_read = False
|
||
self.can_write = False
|
||
self.can_move = False
|
||
self.can_delete = False
|
||
self.can_get = False
|
||
# post
|
||
self.parser: Optional[MultipartParser] = None
|
||
# end placeholders
|
||
|
||
self.bufsz = 1024 * 32
|
||
self.hint = ""
|
||
self.trailing_slash = True
|
||
self.out_headerlist: list[tuple[str, str]] = []
|
||
self.out_headers = {
|
||
"Access-Control-Allow-Origin": "*",
|
||
"Cache-Control": "no-store; max-age=0",
|
||
}
|
||
h = self.args.html_head
|
||
if self.args.no_robots:
|
||
h = META_NOBOTS + (("\n" + h) if h else "")
|
||
self.out_headers["X-Robots-Tag"] = "noindex, nofollow"
|
||
self.html_head = h
|
||
|
||
def log(self, msg: str, c: Union[int, str] = 0) -> None:
|
||
ptn = self.asrv.re_pwd
|
||
if ptn and ptn.search(msg):
|
||
msg = ptn.sub(self.unpwd, msg)
|
||
|
||
self.log_func(self.log_src, msg, c)
|
||
|
||
def unpwd(self, m: Match[str]) -> str:
|
||
a, b = m.groups()
|
||
return "=\033[7m {} \033[27m{}".format(self.asrv.iacct[a], b)
|
||
|
||
def _check_nonfatal(self, ex: Pebkac, post: bool) -> bool:
|
||
if post:
|
||
return ex.code < 300
|
||
|
||
return ex.code < 400 or ex.code in [404, 429]
|
||
|
||
def _assert_safe_rem(self, rem: str) -> None:
|
||
# sanity check to prevent any disasters
|
||
if rem.startswith("/") or rem.startswith("../") or "/../" in rem:
|
||
raise Exception("that was close")
|
||
|
||
def j2s(self, name: str, **ka: Any) -> str:
|
||
tpl = self.conn.hsrv.j2[name]
|
||
ka["ts"] = self.conn.hsrv.cachebuster()
|
||
ka["svcname"] = self.args.doctitle
|
||
ka["html_head"] = self.html_head
|
||
return tpl.render(**ka) # type: ignore
|
||
|
||
def j2j(self, name: str) -> jinja2.Template:
|
||
return self.conn.hsrv.j2[name]
|
||
|
||
def run(self) -> bool:
|
||
"""returns true if connection can be reused"""
|
||
self.keepalive = False
|
||
self.is_https = False
|
||
self.headers = {}
|
||
self.hint = ""
|
||
try:
|
||
headerlines = read_header(self.sr)
|
||
if not headerlines:
|
||
return False
|
||
|
||
if not headerlines[0]:
|
||
# seen after login with IE6.0.2900.5512.xpsp.080413-2111 (xp-sp3)
|
||
self.log("BUG: trailing newline from previous request", c="1;31")
|
||
headerlines.pop(0)
|
||
|
||
try:
|
||
self.mode, self.req, self.http_ver = headerlines[0].split(" ")
|
||
|
||
# normalize incoming headers to lowercase;
|
||
# outgoing headers however are Correct-Case
|
||
for header_line in headerlines[1:]:
|
||
k, zs = header_line.split(":", 1)
|
||
self.headers[k.lower()] = zs.strip()
|
||
except:
|
||
msg = " ]\n#[ ".join(headerlines)
|
||
raise Pebkac(400, "bad headers:\n#[ " + msg + " ]")
|
||
|
||
except Pebkac as ex:
|
||
self.mode = "GET"
|
||
self.req = "[junk]"
|
||
self.http_ver = "HTTP/1.1"
|
||
# self.log("pebkac at httpcli.run #1: " + repr(ex))
|
||
self.keepalive = False
|
||
self.loud_reply(unicode(ex), status=ex.code, volsan=True)
|
||
return self.keepalive
|
||
|
||
if self.args.rsp_slp:
|
||
time.sleep(self.args.rsp_slp)
|
||
|
||
self.ua = self.headers.get("user-agent", "")
|
||
self.is_rclone = self.ua.startswith("rclone/")
|
||
self.is_ancient = self.ua.startswith("Mozilla/4.")
|
||
|
||
zs = self.headers.get("connection", "").lower()
|
||
self.keepalive = not zs.startswith("close") and (
|
||
self.http_ver != "HTTP/1.0" or zs == "keep-alive"
|
||
)
|
||
self.is_https = (
|
||
self.headers.get("x-forwarded-proto", "").lower() == "https" or self.tls
|
||
)
|
||
|
||
n = self.args.rproxy
|
||
if n:
|
||
zso = self.headers.get("x-forwarded-for")
|
||
if zso and self.conn.addr[0] in ["127.0.0.1", "::1"]:
|
||
if n > 0:
|
||
n -= 1
|
||
|
||
zsl = zso.split(",")
|
||
try:
|
||
self.ip = zsl[n].strip()
|
||
except:
|
||
self.ip = zsl[0].strip()
|
||
t = "rproxy={} oob x-fwd {}"
|
||
self.log(t.format(self.args.rproxy, zso), c=3)
|
||
|
||
self.log_src = self.conn.set_rproxy(self.ip)
|
||
|
||
self.dip = self.ip.replace(":", ".")
|
||
|
||
if self.args.ihead:
|
||
keys = self.args.ihead
|
||
if "*" in keys:
|
||
keys = list(sorted(self.headers.keys()))
|
||
|
||
for k in keys:
|
||
zso = self.headers.get(k)
|
||
if zso is not None:
|
||
self.log("[H] {}: \033[33m[{}]".format(k, zso), 6)
|
||
|
||
if "&" in self.req and "?" not in self.req:
|
||
self.hint = "did you mean '?' instead of '&'"
|
||
|
||
# split req into vpath + uparam
|
||
uparam = {}
|
||
if "?" not in self.req:
|
||
self.trailing_slash = self.req.endswith("/")
|
||
vpath = undot(self.req)
|
||
else:
|
||
vpath, arglist = self.req.split("?", 1)
|
||
self.trailing_slash = vpath.endswith("/")
|
||
vpath = undot(vpath)
|
||
for k in arglist.split("&"):
|
||
if "=" in k:
|
||
k, zs = k.split("=", 1)
|
||
uparam[k.lower()] = zs.strip()
|
||
else:
|
||
uparam[k.lower()] = ""
|
||
|
||
self.ouparam = {k: zs for k, zs in uparam.items()}
|
||
|
||
zso = self.headers.get("cookie")
|
||
if zso:
|
||
zsll = [x.split("=", 1) for x in zso.split(";") if "=" in x]
|
||
cookies = {k.strip(): unescape_cookie(zs) for k, zs in zsll}
|
||
for kc, ku in [["cppwd", "pw"], ["b", "b"]]:
|
||
if kc in cookies and ku not in uparam:
|
||
uparam[ku] = cookies[kc]
|
||
else:
|
||
cookies = {}
|
||
|
||
if len(uparam) > 10 or len(cookies) > 50:
|
||
raise Pebkac(400, "u wot m8")
|
||
|
||
self.uparam = uparam
|
||
self.cookies = cookies
|
||
self.vpath = unquotep(vpath) # not query, so + means +
|
||
|
||
ok = "\x00" not in self.vpath
|
||
if ANYWIN:
|
||
ok = ok and not relchk(self.vpath)
|
||
|
||
if not ok:
|
||
self.log("invalid relpath [{}]".format(self.vpath))
|
||
return self.tx_404() and self.keepalive
|
||
|
||
pwd = ""
|
||
zso = self.headers.get("authorization")
|
||
if zso:
|
||
try:
|
||
zb = zso.split(" ")[1].encode("ascii")
|
||
zs = base64.b64decode(zb).decode("utf-8")
|
||
# try "pwd", "x:pwd", "pwd:x"
|
||
for zs in [zs] + zs.split(":", 1)[::-1]:
|
||
if self.asrv.iacct.get(zs):
|
||
pwd = zs
|
||
break
|
||
except:
|
||
pass
|
||
|
||
pwd = uparam.get("pw") or pwd
|
||
self.uname = self.asrv.iacct.get(pwd) or "*"
|
||
self.rvol = self.asrv.vfs.aread[self.uname]
|
||
self.wvol = self.asrv.vfs.awrite[self.uname]
|
||
self.mvol = self.asrv.vfs.amove[self.uname]
|
||
self.dvol = self.asrv.vfs.adel[self.uname]
|
||
self.gvol = self.asrv.vfs.aget[self.uname]
|
||
|
||
if pwd:
|
||
self.out_headerlist.append(("Set-Cookie", self.get_pwd_cookie(pwd)[0]))
|
||
|
||
if self.is_rclone:
|
||
uparam["raw"] = ""
|
||
uparam["dots"] = ""
|
||
uparam["b"] = ""
|
||
cookies["b"] = ""
|
||
|
||
ptn: Optional[Pattern[str]] = self.conn.lf_url # mypy404
|
||
self.do_log = not ptn or not ptn.search(self.req)
|
||
|
||
x = self.asrv.vfs.can_access(self.vpath, self.uname)
|
||
self.can_read, self.can_write, self.can_move, self.can_delete, self.can_get = x
|
||
|
||
try:
|
||
if self.mode in ["GET", "HEAD"]:
|
||
return self.handle_get() and self.keepalive
|
||
elif self.mode == "POST":
|
||
return self.handle_post() and self.keepalive
|
||
elif self.mode == "PUT":
|
||
return self.handle_put() and self.keepalive
|
||
elif self.mode == "OPTIONS":
|
||
return self.handle_options() and self.keepalive
|
||
else:
|
||
raise Pebkac(400, 'invalid HTTP mode "{0}"'.format(self.mode))
|
||
|
||
except Exception as ex:
|
||
if not hasattr(ex, "code"):
|
||
pex = Pebkac(500)
|
||
else:
|
||
pex = ex # type: ignore
|
||
|
||
try:
|
||
post = self.mode in ["POST", "PUT"] or "content-length" in self.headers
|
||
if not self._check_nonfatal(pex, post):
|
||
self.keepalive = False
|
||
|
||
msg = str(ex) if pex == ex else min_ex()
|
||
self.log("{}\033[0m, {}".format(msg, self.vpath), 3)
|
||
|
||
msg = "{}\r\nURL: {}\r\n".format(str(ex), self.vpath)
|
||
if self.hint:
|
||
msg += "hint: {}\r\n".format(self.hint)
|
||
|
||
msg = "<pre>" + html_escape(msg)
|
||
self.reply(msg.encode("utf-8", "replace"), status=pex.code, volsan=True)
|
||
return self.keepalive
|
||
except Pebkac:
|
||
return False
|
||
|
||
def permit_caching(self) -> None:
|
||
cache = self.uparam.get("cache")
|
||
if cache is None:
|
||
self.out_headers.update(NO_CACHE)
|
||
return
|
||
|
||
n = "604800" if cache == "i" else cache or "69"
|
||
self.out_headers["Cache-Control"] = "max-age=" + n
|
||
|
||
def k304(self) -> bool:
|
||
k304 = self.cookies.get("k304")
|
||
return k304 == "y" or ("; Trident/" in self.ua and not k304)
|
||
|
||
def send_headers(
|
||
self,
|
||
length: Optional[int],
|
||
status: int = 200,
|
||
mime: Optional[str] = None,
|
||
headers: Optional[dict[str, str]] = None,
|
||
) -> None:
|
||
response = ["{} {} {}".format(self.http_ver, status, HTTPCODE[status])]
|
||
|
||
if length is not None:
|
||
response.append("Content-Length: " + unicode(length))
|
||
|
||
if status == 304 and self.k304():
|
||
self.keepalive = False
|
||
|
||
# close if unknown length, otherwise take client's preference
|
||
response.append("Connection: " + ("Keep-Alive" if self.keepalive else "Close"))
|
||
|
||
# headers{} overrides anything set previously
|
||
if headers:
|
||
self.out_headers.update(headers)
|
||
|
||
# default to utf8 html if no content-type is set
|
||
if not mime:
|
||
mime = self.out_headers.get("Content-Type", "text/html; charset=utf-8")
|
||
|
||
assert mime
|
||
self.out_headers["Content-Type"] = mime
|
||
|
||
for k, zs in list(self.out_headers.items()) + self.out_headerlist:
|
||
response.append("{}: {}".format(k, zs))
|
||
|
||
try:
|
||
# best practice to separate headers and body into different packets
|
||
self.s.sendall("\r\n".join(response).encode("utf-8") + b"\r\n\r\n")
|
||
except:
|
||
raise Pebkac(400, "client d/c while replying headers")
|
||
|
||
def reply(
|
||
self,
|
||
body: bytes,
|
||
status: int = 200,
|
||
mime: Optional[str] = None,
|
||
headers: Optional[dict[str, str]] = None,
|
||
volsan: bool = False,
|
||
) -> bytes:
|
||
# TODO something to reply with user-supplied values safely
|
||
|
||
if volsan:
|
||
vols = list(self.asrv.vfs.all_vols.values())
|
||
body = vol_san(vols, body)
|
||
|
||
self.send_headers(len(body), status, mime, headers)
|
||
|
||
try:
|
||
if self.mode != "HEAD":
|
||
self.s.sendall(body)
|
||
except:
|
||
raise Pebkac(400, "client d/c while replying body")
|
||
|
||
return body
|
||
|
||
def loud_reply(self, body: str, *args: Any, **kwargs: Any) -> None:
|
||
if not kwargs.get("mime"):
|
||
kwargs["mime"] = "text/plain; charset=utf-8"
|
||
|
||
self.log(body.rstrip())
|
||
self.reply(body.encode("utf-8") + b"\r\n", *list(args), **kwargs)
|
||
|
||
def urlq(self, add: dict[str, str], rm: list[str]) -> str:
|
||
"""
|
||
generates url query based on uparam (b, pw, all others)
|
||
removing anything in rm, adding pairs in add
|
||
|
||
also list faster than set until ~20 items
|
||
"""
|
||
|
||
if self.is_rclone:
|
||
return ""
|
||
|
||
cmap = {"pw": "cppwd"}
|
||
kv = {
|
||
k: zs
|
||
for k, zs in self.uparam.items()
|
||
if k not in rm and self.cookies.get(cmap.get(k, k)) != zs
|
||
}
|
||
kv.update(add)
|
||
if not kv:
|
||
return ""
|
||
|
||
r = ["{}={}".format(k, quotep(zs)) if zs else k for k, zs in kv.items()]
|
||
return "?" + "&".join(r)
|
||
|
||
def redirect(
|
||
self,
|
||
vpath: str,
|
||
suf: str = "",
|
||
msg: str = "aight",
|
||
flavor: str = "go to",
|
||
click: bool = True,
|
||
status: int = 200,
|
||
use302: bool = False,
|
||
) -> bool:
|
||
html = self.j2s(
|
||
"msg",
|
||
h2='<a href="/{}">{} /{}</a>'.format(
|
||
quotep(vpath) + suf, flavor, html_escape(vpath, crlf=True) + suf
|
||
),
|
||
pre=msg,
|
||
click=click,
|
||
).encode("utf-8", "replace")
|
||
|
||
if use302:
|
||
self.reply(html, status=302, headers={"Location": "/" + vpath})
|
||
else:
|
||
self.reply(html, status=status)
|
||
|
||
return True
|
||
|
||
def handle_get(self) -> bool:
|
||
if self.do_log:
|
||
logmsg = "{:4} {}".format(self.mode, self.req)
|
||
|
||
if "range" in self.headers:
|
||
try:
|
||
rval = self.headers["range"].split("=", 1)[1]
|
||
except:
|
||
rval = self.headers["range"]
|
||
|
||
logmsg += " [\033[36m" + rval + "\033[0m]"
|
||
|
||
self.log(logmsg)
|
||
|
||
# "embedded" resources
|
||
if self.vpath.startswith(".cpr"):
|
||
if self.vpath.startswith(".cpr/ico/"):
|
||
return self.tx_ico(self.vpath.split("/")[-1], exact=True)
|
||
|
||
static_path = os.path.join(E.mod, "web/", self.vpath[5:])
|
||
return self.tx_file(static_path)
|
||
|
||
if not self.can_read and not self.can_write and not self.can_get:
|
||
if self.vpath:
|
||
self.log("inaccessible: [{}]".format(self.vpath))
|
||
return self.tx_404(True)
|
||
|
||
self.uparam["h"] = ""
|
||
|
||
if "tree" in self.uparam:
|
||
return self.tx_tree()
|
||
|
||
if "delete" in self.uparam:
|
||
return self.handle_rm([])
|
||
|
||
if "move" in self.uparam:
|
||
return self.handle_mv()
|
||
|
||
if "scan" in self.uparam:
|
||
return self.scanvol()
|
||
|
||
if not self.vpath:
|
||
if "reload" in self.uparam:
|
||
return self.handle_reload()
|
||
|
||
if "stack" in self.uparam:
|
||
return self.tx_stack()
|
||
|
||
if "ups" in self.uparam:
|
||
return self.tx_ups()
|
||
|
||
if "k304" in self.uparam:
|
||
return self.set_k304()
|
||
|
||
if "am_js" in self.uparam:
|
||
return self.set_am_js()
|
||
|
||
if "reset" in self.uparam:
|
||
return self.set_cfg_reset()
|
||
|
||
if "h" in self.uparam:
|
||
return self.tx_mounts()
|
||
|
||
# conditional redirect to single volumes
|
||
if self.vpath == "" and not self.ouparam:
|
||
nread = len(self.rvol)
|
||
nwrite = len(self.wvol)
|
||
if nread + nwrite == 1 or (self.rvol == self.wvol and nread == 1):
|
||
if nread == 1:
|
||
vpath = self.rvol[0]
|
||
else:
|
||
vpath = self.wvol[0]
|
||
|
||
if self.vpath != vpath:
|
||
self.redirect(vpath, flavor="redirecting to", use302=True)
|
||
return True
|
||
|
||
return self.tx_browser()
|
||
|
||
def handle_options(self) -> bool:
|
||
if self.do_log:
|
||
self.log("OPTIONS " + self.req)
|
||
|
||
self.send_headers(
|
||
None,
|
||
204,
|
||
headers={
|
||
"Access-Control-Allow-Origin": "*",
|
||
"Access-Control-Allow-Methods": "*",
|
||
"Access-Control-Allow-Headers": "*",
|
||
},
|
||
)
|
||
return True
|
||
|
||
def handle_put(self) -> bool:
|
||
self.log("PUT " + self.req)
|
||
|
||
if self.headers.get("expect", "").lower() == "100-continue":
|
||
try:
|
||
self.s.sendall(b"HTTP/1.1 100 Continue\r\n\r\n")
|
||
except:
|
||
raise Pebkac(400, "client d/c before 100 continue")
|
||
|
||
return self.handle_stash()
|
||
|
||
def handle_post(self) -> bool:
|
||
self.log("POST " + self.req)
|
||
|
||
if self.headers.get("expect", "").lower() == "100-continue":
|
||
try:
|
||
self.s.sendall(b"HTTP/1.1 100 Continue\r\n\r\n")
|
||
except:
|
||
raise Pebkac(400, "client d/c before 100 continue")
|
||
|
||
if "raw" in self.uparam:
|
||
return self.handle_stash()
|
||
|
||
ctype = self.headers.get("content-type", "").lower()
|
||
if not ctype:
|
||
raise Pebkac(400, "you can't post without a content-type header")
|
||
|
||
if "multipart/form-data" in ctype:
|
||
return self.handle_post_multipart()
|
||
|
||
if "text/plain" in ctype or "application/xml" in ctype:
|
||
return self.handle_post_json()
|
||
|
||
if "application/octet-stream" in ctype:
|
||
return self.handle_post_binary()
|
||
|
||
if "application/x-www-form-urlencoded" in ctype:
|
||
opt = self.args.urlform
|
||
if "stash" in opt:
|
||
return self.handle_stash()
|
||
|
||
if "save" in opt:
|
||
post_sz, _, _, _, path, _ = self.dump_to_file()
|
||
self.log("urlform: {} bytes, {}".format(post_sz, path))
|
||
elif "print" in opt:
|
||
reader, _ = self.get_body_reader()
|
||
for buf in reader:
|
||
orig = buf.decode("utf-8", "replace")
|
||
t = "urlform_raw {} @ {}\n {}\n"
|
||
self.log(t.format(len(orig), self.vpath, orig))
|
||
try:
|
||
zb = unquote(buf.replace(b"+", b" "))
|
||
plain = zb.decode("utf-8", "replace")
|
||
if buf.startswith(b"msg="):
|
||
plain = plain[4:]
|
||
|
||
t = "urlform_dec {} @ {}\n {}\n"
|
||
self.log(t.format(len(plain), self.vpath, plain))
|
||
except Exception as ex:
|
||
self.log(repr(ex))
|
||
|
||
if "get" in opt:
|
||
return self.handle_get()
|
||
|
||
raise Pebkac(405, "POST({}) is disabled in server config".format(ctype))
|
||
|
||
raise Pebkac(405, "don't know how to handle POST({})".format(ctype))
|
||
|
||
def get_body_reader(self) -> tuple[Generator[bytes, None, None], int]:
|
||
if "chunked" in self.headers.get("transfer-encoding", "").lower():
|
||
return read_socket_chunked(self.sr), -1
|
||
|
||
remains = int(self.headers.get("content-length", -1))
|
||
if remains == -1:
|
||
self.keepalive = False
|
||
return read_socket_unbounded(self.sr), remains
|
||
else:
|
||
return read_socket(self.sr, remains), remains
|
||
|
||
def dump_to_file(self) -> tuple[int, str, str, int, str, str]:
|
||
# post_sz, sha_hex, sha_b64, remains, path, url
|
||
reader, remains = self.get_body_reader()
|
||
vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True)
|
||
lim = vfs.get_dbv(rem)[0].lim
|
||
fdir = os.path.join(vfs.realpath, rem)
|
||
if lim:
|
||
fdir, rem = lim.all(self.ip, rem, remains, fdir)
|
||
|
||
fn = None
|
||
if rem and not self.trailing_slash and not bos.path.isdir(fdir):
|
||
fdir, fn = os.path.split(fdir)
|
||
rem, _ = vsplit(rem)
|
||
|
||
bos.makedirs(fdir)
|
||
|
||
open_ka: dict[str, Any] = {"fun": open}
|
||
open_a = ["wb", 512 * 1024]
|
||
|
||
# user-request || config-force
|
||
if ("gz" in vfs.flags or "xz" in vfs.flags) and (
|
||
"pk" in vfs.flags
|
||
or "pk" in self.uparam
|
||
or "gz" in self.uparam
|
||
or "xz" in self.uparam
|
||
):
|
||
fb = {"gz": 9, "xz": 0} # default/fallback level
|
||
lv = {} # selected level
|
||
alg = "" # selected algo (gz=preferred)
|
||
|
||
# user-prefs first
|
||
if "gz" in self.uparam or "pk" in self.uparam: # def.pk
|
||
alg = "gz"
|
||
if "xz" in self.uparam:
|
||
alg = "xz"
|
||
if alg:
|
||
zso = self.uparam.get(alg)
|
||
lv[alg] = fb[alg] if zso is None else int(zso)
|
||
|
||
if alg not in vfs.flags:
|
||
alg = "gz" if "gz" in vfs.flags else "xz"
|
||
|
||
# then server overrides
|
||
pk = vfs.flags.get("pk")
|
||
if pk is not None:
|
||
# config-forced on
|
||
alg = alg or "gz" # def.pk
|
||
try:
|
||
# config-forced opts
|
||
alg, nlv = pk.split(",")
|
||
lv[alg] = int(nlv)
|
||
except:
|
||
pass
|
||
|
||
lv[alg] = lv.get(alg) or fb.get(alg) or 0
|
||
|
||
self.log("compressing with {} level {}".format(alg, lv.get(alg)))
|
||
if alg == "gz":
|
||
open_ka["fun"] = gzip.GzipFile
|
||
open_a = ["wb", lv[alg], None, 0x5FEE6600] # 2021-01-01
|
||
elif alg == "xz":
|
||
open_ka = {"fun": lzma.open, "preset": lv[alg]}
|
||
open_a = ["wb"]
|
||
else:
|
||
self.log("fallthrough? thats a bug", 1)
|
||
|
||
suffix = "-{:.6f}-{}".format(time.time(), self.dip)
|
||
params = {"suffix": suffix, "fdir": fdir}
|
||
if self.args.nw:
|
||
params = {}
|
||
fn = os.devnull
|
||
|
||
params.update(open_ka)
|
||
|
||
if not fn:
|
||
fn = "put" + suffix
|
||
|
||
with ren_open(fn, *open_a, **params) as zfw:
|
||
f, fn = zfw["orz"]
|
||
path = os.path.join(fdir, fn)
|
||
post_sz, sha_hex, sha_b64 = hashcopy(reader, f, self.args.s_wr_slp)
|
||
|
||
if lim:
|
||
lim.nup(self.ip)
|
||
lim.bup(self.ip, post_sz)
|
||
try:
|
||
lim.chk_sz(post_sz)
|
||
except:
|
||
bos.unlink(path)
|
||
raise
|
||
|
||
if self.args.nw:
|
||
return post_sz, sha_hex, sha_b64, remains, path, ""
|
||
|
||
vfs, rem = vfs.get_dbv(rem)
|
||
self.conn.hsrv.broker.say(
|
||
"up2k.hash_file",
|
||
vfs.realpath,
|
||
vfs.flags,
|
||
rem,
|
||
fn,
|
||
self.ip,
|
||
time.time(),
|
||
)
|
||
|
||
vsuf = ""
|
||
if self.can_read and "fk" in vfs.flags:
|
||
vsuf = "?k=" + gen_filekey(
|
||
self.args.fk_salt,
|
||
path,
|
||
post_sz,
|
||
0 if ANYWIN else bos.stat(path).st_ino,
|
||
)[: vfs.flags["fk"]]
|
||
|
||
vpath = "/".join([x for x in [vfs.vpath, rem, fn] if x])
|
||
vpath = quotep(vpath)
|
||
|
||
url = "{}://{}/{}".format(
|
||
"https" if self.is_https else "http",
|
||
self.headers.get("host") or "{}:{}".format(*list(self.s.getsockname())),
|
||
vpath + vsuf,
|
||
)
|
||
|
||
return post_sz, sha_hex, sha_b64, remains, path, url
|
||
|
||
def handle_stash(self) -> bool:
|
||
post_sz, sha_hex, sha_b64, remains, path, url = self.dump_to_file()
|
||
spd = self._spd(post_sz)
|
||
t = "{} wrote {}/{} bytes to {} # {}"
|
||
self.log(t.format(spd, post_sz, remains, path, sha_b64[:28])) # 21
|
||
t = "{}\n{}\n{}\n{}\n".format(post_sz, sha_b64, sha_hex[:56], url)
|
||
self.reply(t.encode("utf-8"))
|
||
return True
|
||
|
||
def _spd(self, nbytes: int, add: bool = True) -> str:
|
||
if add:
|
||
self.conn.nbyte += nbytes
|
||
|
||
spd1 = get_spd(nbytes, self.t0)
|
||
spd2 = get_spd(self.conn.nbyte, self.conn.t0)
|
||
return "{} {} n{}".format(spd1, spd2, self.conn.nreq)
|
||
|
||
def handle_post_multipart(self) -> bool:
|
||
self.parser = MultipartParser(self.log, self.sr, self.headers)
|
||
self.parser.parse()
|
||
|
||
act = self.parser.require("act", 64)
|
||
|
||
if act == "login":
|
||
return self.handle_login()
|
||
|
||
if act == "mkdir":
|
||
return self.handle_mkdir()
|
||
|
||
if act == "new_md":
|
||
# kinda silly but has the least side effects
|
||
return self.handle_new_md()
|
||
|
||
if act == "bput":
|
||
return self.handle_plain_upload()
|
||
|
||
if act == "tput":
|
||
return self.handle_text_upload()
|
||
|
||
if act == "zip":
|
||
return self.handle_zip_post()
|
||
|
||
raise Pebkac(422, 'invalid action "{}"'.format(act))
|
||
|
||
def handle_zip_post(self) -> bool:
|
||
assert self.parser
|
||
for k in ["zip", "tar"]:
|
||
v = self.uparam.get(k)
|
||
if v is not None:
|
||
break
|
||
|
||
if v is None:
|
||
raise Pebkac(422, "need zip or tar keyword")
|
||
|
||
vn, rem = self.asrv.vfs.get(self.vpath, self.uname, True, False)
|
||
zs = self.parser.require("files", 1024 * 1024)
|
||
if not zs:
|
||
raise Pebkac(422, "need files list")
|
||
|
||
items = zs.replace("\r", "").split("\n")
|
||
items = [unquotep(x) for x in items if items]
|
||
|
||
self.parser.drop()
|
||
return self.tx_zip(k, v, vn, rem, items, self.args.ed)
|
||
|
||
def handle_post_json(self) -> bool:
|
||
try:
|
||
remains = int(self.headers["content-length"])
|
||
except:
|
||
raise Pebkac(411)
|
||
|
||
if remains > 1024 * 1024:
|
||
raise Pebkac(413, "json 2big")
|
||
|
||
enc = "utf-8"
|
||
ctype = self.headers.get("content-type", "").lower()
|
||
if "charset" in ctype:
|
||
enc = ctype.split("charset")[1].strip(" =").split(";")[0].strip()
|
||
|
||
try:
|
||
json_buf = self.sr.recv_ex(remains)
|
||
except UnrecvEOF:
|
||
raise Pebkac(422, "client disconnected while posting JSON")
|
||
|
||
self.log("decoding {} bytes of {} json".format(len(json_buf), enc))
|
||
try:
|
||
body = json.loads(json_buf.decode(enc, "replace"))
|
||
except:
|
||
raise Pebkac(422, "you POSTed invalid json")
|
||
|
||
if "srch" in self.uparam or "srch" in body:
|
||
return self.handle_search(body)
|
||
|
||
if "delete" in self.uparam:
|
||
return self.handle_rm(body)
|
||
|
||
# up2k-php compat
|
||
for k in "chunkpit.php", "handshake.php":
|
||
if self.vpath.endswith(k):
|
||
self.vpath = self.vpath[: -len(k)]
|
||
|
||
name = undot(body["name"])
|
||
if "/" in name:
|
||
raise Pebkac(400, "your client is old; press CTRL-SHIFT-R and try again")
|
||
|
||
vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True)
|
||
dbv, vrem = vfs.get_dbv(rem)
|
||
|
||
body["vtop"] = dbv.vpath
|
||
body["ptop"] = dbv.realpath
|
||
body["prel"] = vrem
|
||
body["addr"] = self.ip
|
||
body["vcfg"] = dbv.flags
|
||
|
||
if rem:
|
||
try:
|
||
dst = os.path.join(vfs.realpath, rem)
|
||
if not bos.path.isdir(dst):
|
||
bos.makedirs(dst)
|
||
except OSError as ex:
|
||
self.log("makedirs failed [{}]".format(dst))
|
||
if not bos.path.isdir(dst):
|
||
if ex.errno == 13:
|
||
raise Pebkac(500, "the server OS denied write-access")
|
||
|
||
if ex.errno == 17:
|
||
raise Pebkac(400, "some file got your folder name")
|
||
|
||
raise Pebkac(500, min_ex())
|
||
except:
|
||
raise Pebkac(500, min_ex())
|
||
|
||
x = self.conn.hsrv.broker.ask("up2k.handle_json", body)
|
||
ret = x.get()
|
||
ret = json.dumps(ret)
|
||
self.log(ret)
|
||
self.reply(ret.encode("utf-8"), mime="application/json")
|
||
return True
|
||
|
||
def handle_search(self, body: dict[str, Any]) -> bool:
|
||
idx = self.conn.get_u2idx()
|
||
if not hasattr(idx, "p_end"):
|
||
raise Pebkac(500, "sqlite3 is not available on the server; cannot search")
|
||
|
||
vols = []
|
||
seen = {}
|
||
for vtop in self.rvol:
|
||
vfs, _ = self.asrv.vfs.get(vtop, self.uname, True, False)
|
||
vfs = vfs.dbv or vfs
|
||
if vfs in seen:
|
||
continue
|
||
|
||
seen[vfs] = True
|
||
vols.append((vfs.vpath, vfs.realpath, vfs.flags))
|
||
|
||
t0 = time.time()
|
||
if idx.p_end:
|
||
penalty = 0.7
|
||
t_idle = t0 - idx.p_end
|
||
if idx.p_dur > 0.7 and t_idle < penalty:
|
||
t = "rate-limit {:.1f} sec, cost {:.2f}, idle {:.2f}"
|
||
raise Pebkac(429, t.format(penalty, idx.p_dur, t_idle))
|
||
|
||
if "srch" in body:
|
||
# search by up2k hashlist
|
||
vbody = copy.deepcopy(body)
|
||
vbody["hash"] = len(vbody["hash"])
|
||
self.log("qj: " + repr(vbody))
|
||
hits = idx.fsearch(vols, body)
|
||
msg: Any = repr(hits)
|
||
taglist: list[str] = []
|
||
else:
|
||
# search by query params
|
||
q = body["q"]
|
||
n = body.get("n", self.args.srch_hits)
|
||
self.log("qj: {} |{}|".format(q, n))
|
||
hits, taglist = idx.search(vols, q, n)
|
||
msg = len(hits)
|
||
|
||
idx.p_end = time.time()
|
||
idx.p_dur = idx.p_end - t0
|
||
self.log("q#: {} ({:.2f}s)".format(msg, idx.p_dur))
|
||
|
||
order = []
|
||
cfg = self.args.mte.split(",")
|
||
for t in cfg:
|
||
if t in taglist:
|
||
order.append(t)
|
||
for t in taglist:
|
||
if t not in order:
|
||
order.append(t)
|
||
|
||
r = json.dumps({"hits": hits, "tag_order": order}).encode("utf-8")
|
||
self.reply(r, mime="application/json")
|
||
return True
|
||
|
||
def handle_post_binary(self) -> bool:
|
||
try:
|
||
remains = int(self.headers["content-length"])
|
||
except:
|
||
raise Pebkac(400, "you must supply a content-length for binary POST")
|
||
|
||
try:
|
||
chash = self.headers["x-up2k-hash"]
|
||
wark = self.headers["x-up2k-wark"]
|
||
except KeyError:
|
||
raise Pebkac(400, "need hash and wark headers for binary POST")
|
||
|
||
vfs, _ = self.asrv.vfs.get(self.vpath, self.uname, False, True)
|
||
ptop = (vfs.dbv or vfs).realpath
|
||
|
||
x = self.conn.hsrv.broker.ask("up2k.handle_chunk", ptop, wark, chash)
|
||
response = x.get()
|
||
chunksize, cstart, path, lastmod, sprs = response
|
||
|
||
try:
|
||
if self.args.nw:
|
||
path = os.devnull
|
||
|
||
if remains > chunksize:
|
||
raise Pebkac(400, "your chunk is too big to fit")
|
||
|
||
self.log("writing {} #{} @{} len {}".format(path, chash, cstart, remains))
|
||
|
||
reader = read_socket(self.sr, remains)
|
||
|
||
f = None
|
||
fpool = not self.args.no_fpool and sprs
|
||
if fpool:
|
||
with self.mutex:
|
||
try:
|
||
f = self.u2fh.pop(path)
|
||
except:
|
||
pass
|
||
|
||
f = f or open(fsenc(path), "rb+", 512 * 1024)
|
||
|
||
try:
|
||
f.seek(cstart[0])
|
||
post_sz, _, sha_b64 = hashcopy(reader, f, self.args.s_wr_slp)
|
||
|
||
if sha_b64 != chash:
|
||
t = "your chunk got corrupted somehow (received {} bytes); expected vs received hash:\n{}\n{}"
|
||
raise Pebkac(400, t.format(post_sz, chash, sha_b64))
|
||
|
||
if len(cstart) > 1 and path != os.devnull:
|
||
self.log(
|
||
"clone {} to {}".format(
|
||
cstart[0], " & ".join(unicode(x) for x in cstart[1:])
|
||
)
|
||
)
|
||
ofs = 0
|
||
while ofs < chunksize:
|
||
bufsz = min(chunksize - ofs, 4 * 1024 * 1024)
|
||
f.seek(cstart[0] + ofs)
|
||
buf = f.read(bufsz)
|
||
for wofs in cstart[1:]:
|
||
f.seek(wofs + ofs)
|
||
f.write(buf)
|
||
|
||
ofs += len(buf)
|
||
|
||
self.log("clone {} done".format(cstart[0]))
|
||
finally:
|
||
if not fpool:
|
||
f.close()
|
||
else:
|
||
with self.mutex:
|
||
self.u2fh.put(path, f)
|
||
finally:
|
||
x = self.conn.hsrv.broker.ask("up2k.release_chunk", ptop, wark, chash)
|
||
x.get() # block client until released
|
||
|
||
x = self.conn.hsrv.broker.ask("up2k.confirm_chunk", ptop, wark, chash)
|
||
ztis = x.get()
|
||
try:
|
||
num_left, fin_path = ztis
|
||
except:
|
||
self.loud_reply(ztis, status=500)
|
||
return False
|
||
|
||
if not num_left and fpool:
|
||
with self.mutex:
|
||
self.u2fh.close(path)
|
||
|
||
# windows cant rename open files
|
||
if ANYWIN and path != fin_path and not self.args.nw:
|
||
self.conn.hsrv.broker.ask("up2k.finish_upload", ptop, wark).get()
|
||
|
||
if not ANYWIN and not num_left:
|
||
times = (int(time.time()), int(lastmod))
|
||
self.log("no more chunks, setting times {}".format(times))
|
||
try:
|
||
bos.utime(fin_path, times)
|
||
except:
|
||
self.log("failed to utime ({}, {})".format(fin_path, times))
|
||
|
||
spd = self._spd(post_sz)
|
||
self.log("{} thank".format(spd))
|
||
self.reply(b"thank")
|
||
return True
|
||
|
||
def handle_login(self) -> bool:
|
||
assert self.parser
|
||
pwd = self.parser.require("cppwd", 64)
|
||
self.parser.drop()
|
||
|
||
self.out_headerlist = [
|
||
x
|
||
for x in self.out_headerlist
|
||
if x[0] != "Set-Cookie" or "cppwd=" not in x[1]
|
||
]
|
||
|
||
dst = "/"
|
||
if self.vpath:
|
||
dst += quotep(self.vpath)
|
||
|
||
ck, msg = self.get_pwd_cookie(pwd)
|
||
html = self.j2s("msg", h1=msg, h2='<a href="' + dst + '">ack</a>', redir=dst)
|
||
self.reply(html.encode("utf-8"), headers={"Set-Cookie": ck})
|
||
return True
|
||
|
||
def get_pwd_cookie(self, pwd: str) -> tuple[str, str]:
|
||
if pwd in self.asrv.iacct:
|
||
msg = "login ok"
|
||
dur = int(60 * 60 * self.args.logout)
|
||
else:
|
||
msg = "naw dude"
|
||
pwd = "x" # nosec
|
||
dur = None
|
||
|
||
r = gencookie("cppwd", pwd, dur)
|
||
if self.is_ancient:
|
||
r = r.rsplit(" ", 1)[0]
|
||
|
||
return r, msg
|
||
|
||
def handle_mkdir(self) -> bool:
|
||
assert self.parser
|
||
new_dir = self.parser.require("name", 512)
|
||
self.parser.drop()
|
||
|
||
nullwrite = self.args.nw
|
||
vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True)
|
||
self._assert_safe_rem(rem)
|
||
|
||
sanitized = sanitize_fn(new_dir, "", [])
|
||
|
||
if not nullwrite:
|
||
fdir = os.path.join(vfs.realpath, rem)
|
||
fn = os.path.join(fdir, sanitized)
|
||
|
||
if not bos.path.isdir(fdir):
|
||
raise Pebkac(500, "parent folder does not exist")
|
||
|
||
if bos.path.isdir(fn):
|
||
raise Pebkac(500, "that folder exists already")
|
||
|
||
try:
|
||
bos.mkdir(fn)
|
||
except OSError as ex:
|
||
if ex.errno == 13:
|
||
raise Pebkac(500, "the server OS denied write-access")
|
||
|
||
raise Pebkac(500, "mkdir failed:\n" + min_ex())
|
||
except:
|
||
raise Pebkac(500, min_ex())
|
||
|
||
vpath = "{}/{}".format(self.vpath, sanitized).lstrip("/")
|
||
self.out_headers["X-New-Dir"] = quotep(sanitized)
|
||
self.redirect(vpath)
|
||
return True
|
||
|
||
def handle_new_md(self) -> bool:
|
||
assert self.parser
|
||
new_file = self.parser.require("name", 512)
|
||
self.parser.drop()
|
||
|
||
nullwrite = self.args.nw
|
||
vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True)
|
||
self._assert_safe_rem(rem)
|
||
|
||
if not new_file.endswith(".md"):
|
||
new_file += ".md"
|
||
|
||
sanitized = sanitize_fn(new_file, "", [])
|
||
|
||
if not nullwrite:
|
||
fdir = os.path.join(vfs.realpath, rem)
|
||
fn = os.path.join(fdir, sanitized)
|
||
|
||
if bos.path.exists(fn):
|
||
raise Pebkac(500, "that file exists already")
|
||
|
||
with open(fsenc(fn), "wb") as f:
|
||
f.write(b"`GRUNNUR`\n")
|
||
|
||
vpath = "{}/{}".format(self.vpath, sanitized).lstrip("/")
|
||
self.redirect(vpath, "?edit")
|
||
return True
|
||
|
||
def handle_plain_upload(self) -> bool:
|
||
assert self.parser
|
||
nullwrite = self.args.nw
|
||
vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True)
|
||
self._assert_safe_rem(rem)
|
||
|
||
upload_vpath = self.vpath
|
||
lim = vfs.get_dbv(rem)[0].lim
|
||
fdir_base = os.path.join(vfs.realpath, rem)
|
||
if lim:
|
||
fdir_base, rem = lim.all(self.ip, rem, -1, fdir_base)
|
||
upload_vpath = "{}/{}".format(vfs.vpath, rem).strip("/")
|
||
if not nullwrite:
|
||
bos.makedirs(fdir_base)
|
||
|
||
files: list[tuple[int, str, str, str, str, str]] = []
|
||
# sz, sha_hex, sha_b64, p_file, fname, abspath
|
||
errmsg = ""
|
||
t0 = time.time()
|
||
try:
|
||
assert self.parser.gen
|
||
for nfile, (p_field, p_file, p_data) in enumerate(self.parser.gen):
|
||
if not p_file:
|
||
self.log("discarding incoming file without filename")
|
||
# fallthrough
|
||
|
||
fdir = fdir_base
|
||
fname = sanitize_fn(
|
||
p_file or "", "", [".prologue.html", ".epilogue.html"]
|
||
)
|
||
if p_file and not nullwrite:
|
||
if not bos.path.isdir(fdir):
|
||
raise Pebkac(404, "that folder does not exist")
|
||
|
||
suffix = "-{:.6f}-{}".format(time.time(), self.dip)
|
||
open_args = {"fdir": fdir, "suffix": suffix}
|
||
|
||
# reserve destination filename
|
||
with ren_open(fname, "wb", fdir=fdir, suffix=suffix) as zfw:
|
||
fname = zfw["orz"][1]
|
||
|
||
tnam = fname + ".PARTIAL"
|
||
if self.args.dotpart:
|
||
tnam = "." + tnam
|
||
|
||
abspath = os.path.join(fdir, fname)
|
||
else:
|
||
open_args = {}
|
||
tnam = fname = os.devnull
|
||
fdir = ""
|
||
|
||
if lim:
|
||
lim.chk_bup(self.ip)
|
||
lim.chk_nup(self.ip)
|
||
|
||
try:
|
||
max_sz = lim.smax if lim else 0
|
||
with ren_open(tnam, "wb", 512 * 1024, **open_args) as zfw:
|
||
f, tnam = zfw["orz"]
|
||
tabspath = os.path.join(fdir, tnam)
|
||
self.log("writing to {}".format(tabspath))
|
||
sz, sha_hex, sha_b64 = hashcopy(
|
||
p_data, f, self.args.s_wr_slp, max_sz
|
||
)
|
||
if sz == 0:
|
||
raise Pebkac(400, "empty files in post")
|
||
|
||
if lim:
|
||
lim.nup(self.ip)
|
||
lim.bup(self.ip, sz)
|
||
try:
|
||
lim.chk_sz(sz)
|
||
lim.chk_bup(self.ip)
|
||
lim.chk_nup(self.ip)
|
||
except:
|
||
bos.unlink(tabspath)
|
||
bos.unlink(abspath)
|
||
fname = os.devnull
|
||
raise
|
||
|
||
atomic_move(tabspath, abspath)
|
||
files.append(
|
||
(sz, sha_hex, sha_b64, p_file or "(discarded)", fname, abspath)
|
||
)
|
||
dbv, vrem = vfs.get_dbv(rem)
|
||
self.conn.hsrv.broker.say(
|
||
"up2k.hash_file",
|
||
dbv.realpath,
|
||
dbv.flags,
|
||
vrem,
|
||
fname,
|
||
self.ip,
|
||
time.time(),
|
||
)
|
||
self.conn.nbyte += sz
|
||
|
||
except Pebkac:
|
||
self.parser.drop()
|
||
raise
|
||
|
||
except Pebkac as ex:
|
||
errmsg = vol_san(
|
||
list(self.asrv.vfs.all_vols.values()), unicode(ex).encode("utf-8")
|
||
).decode("utf-8")
|
||
|
||
td = max(0.1, time.time() - t0)
|
||
sz_total = sum(x[0] for x in files)
|
||
spd = (sz_total / td) / (1024 * 1024)
|
||
|
||
status = "OK"
|
||
if errmsg:
|
||
self.log(errmsg, 3)
|
||
status = "ERROR"
|
||
|
||
msg = "{} // {} bytes // {:.3f} MiB/s\n".format(status, sz_total, spd)
|
||
jmsg: dict[str, Any] = {
|
||
"status": status,
|
||
"sz": sz_total,
|
||
"mbps": round(spd, 3),
|
||
"files": [],
|
||
}
|
||
|
||
if errmsg:
|
||
msg += errmsg + "\n"
|
||
jmsg["error"] = errmsg
|
||
errmsg = "ERROR: " + errmsg
|
||
|
||
for sz, sha_hex, sha_b64, ofn, lfn, ap in files:
|
||
vsuf = ""
|
||
if self.can_read and "fk" in vfs.flags:
|
||
vsuf = "?k=" + gen_filekey(
|
||
self.args.fk_salt,
|
||
abspath,
|
||
sz,
|
||
0 if ANYWIN or not ap else bos.stat(ap).st_ino,
|
||
)[: vfs.flags["fk"]]
|
||
|
||
vpath = "{}/{}".format(upload_vpath, lfn).strip("/")
|
||
rel_url = quotep(vpath) + vsuf
|
||
msg += 'sha512: {} // {} // {} bytes // <a href="/{}">{}</a> {}\n'.format(
|
||
sha_hex[:56],
|
||
sha_b64,
|
||
sz,
|
||
rel_url,
|
||
html_escape(ofn, crlf=True),
|
||
vsuf,
|
||
)
|
||
# truncated SHA-512 prevents length extension attacks;
|
||
# using SHA-512/224, optionally SHA-512/256 = :64
|
||
jpart = {
|
||
"url": "{}://{}/{}".format(
|
||
"https" if self.is_https else "http",
|
||
self.headers.get("host")
|
||
or "{}:{}".format(*list(self.s.getsockname())),
|
||
rel_url,
|
||
),
|
||
"sha512": sha_hex[:56],
|
||
"sha_b64": sha_b64,
|
||
"sz": sz,
|
||
"fn": lfn,
|
||
"fn_orig": ofn,
|
||
"path": rel_url,
|
||
}
|
||
jmsg["files"].append(jpart)
|
||
|
||
vspd = self._spd(sz_total, False)
|
||
self.log("{} {}".format(vspd, msg))
|
||
|
||
if not nullwrite:
|
||
log_fn = "up.{:.6f}.txt".format(t0)
|
||
with open(log_fn, "wb") as f:
|
||
ft = "{}:{}".format(self.ip, self.addr[1])
|
||
ft = "{}\n{}\n{}\n".format(ft, msg.rstrip(), errmsg)
|
||
f.write(ft.encode("utf-8"))
|
||
|
||
sc = 400 if errmsg else 200
|
||
if "j" in self.uparam:
|
||
jtxt = json.dumps(jmsg, indent=2, sort_keys=True).encode("utf-8", "replace")
|
||
self.reply(jtxt, mime="application/json", status=sc)
|
||
else:
|
||
self.redirect(
|
||
self.vpath,
|
||
msg=msg,
|
||
flavor="return to",
|
||
click=False,
|
||
status=sc,
|
||
)
|
||
|
||
if errmsg:
|
||
return False
|
||
|
||
self.parser.drop()
|
||
return True
|
||
|
||
def handle_text_upload(self) -> bool:
|
||
assert self.parser
|
||
try:
|
||
cli_lastmod3 = int(self.parser.require("lastmod", 16))
|
||
except:
|
||
raise Pebkac(400, "could not read lastmod from request")
|
||
|
||
nullwrite = self.args.nw
|
||
vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True)
|
||
self._assert_safe_rem(rem)
|
||
|
||
clen = int(self.headers.get("content-length", -1))
|
||
if clen == -1:
|
||
raise Pebkac(411)
|
||
|
||
rp, fn = vsplit(rem)
|
||
fp = os.path.join(vfs.realpath, rp)
|
||
lim = vfs.get_dbv(rem)[0].lim
|
||
if lim:
|
||
fp, rp = lim.all(self.ip, rp, clen, fp)
|
||
bos.makedirs(fp)
|
||
|
||
fp = os.path.join(fp, fn)
|
||
rem = "{}/{}".format(rp, fn).strip("/")
|
||
|
||
if not rem.endswith(".md"):
|
||
raise Pebkac(400, "only markdown pls")
|
||
|
||
if nullwrite:
|
||
response = json.dumps({"ok": True, "lastmod": 0})
|
||
self.log(response)
|
||
# TODO reply should parser.drop()
|
||
self.parser.drop()
|
||
self.reply(response.encode("utf-8"))
|
||
return True
|
||
|
||
srv_lastmod = -1.0
|
||
srv_lastmod3 = -1
|
||
try:
|
||
st = bos.stat(fp)
|
||
srv_lastmod = st.st_mtime
|
||
srv_lastmod3 = int(srv_lastmod * 1000)
|
||
except OSError as ex:
|
||
if ex.errno != 2:
|
||
raise
|
||
|
||
# if file exists, chekc that timestamp matches the client's
|
||
if srv_lastmod >= 0:
|
||
same_lastmod = cli_lastmod3 in [-1, srv_lastmod3]
|
||
if not same_lastmod:
|
||
# some filesystems/transports limit precision to 1sec, hopefully floored
|
||
same_lastmod = (
|
||
srv_lastmod == int(cli_lastmod3 / 1000)
|
||
and cli_lastmod3 > srv_lastmod3
|
||
and cli_lastmod3 - srv_lastmod3 < 1000
|
||
)
|
||
|
||
if not same_lastmod:
|
||
response = json.dumps(
|
||
{
|
||
"ok": False,
|
||
"lastmod": srv_lastmod3,
|
||
"now": int(time.time() * 1000),
|
||
}
|
||
)
|
||
self.log(
|
||
"{} - {} = {}".format(
|
||
srv_lastmod3, cli_lastmod3, srv_lastmod3 - cli_lastmod3
|
||
)
|
||
)
|
||
self.log(response)
|
||
self.parser.drop()
|
||
self.reply(response.encode("utf-8"))
|
||
return True
|
||
|
||
mdir, mfile = os.path.split(fp)
|
||
mfile2 = "{}.{:.3f}.md".format(mfile[:-3], srv_lastmod)
|
||
try:
|
||
bos.mkdir(os.path.join(mdir, ".hist"))
|
||
except:
|
||
pass
|
||
bos.rename(fp, os.path.join(mdir, ".hist", mfile2))
|
||
|
||
assert self.parser.gen
|
||
p_field, _, p_data = next(self.parser.gen)
|
||
if p_field != "body":
|
||
raise Pebkac(400, "expected body, got {}".format(p_field))
|
||
|
||
with open(fsenc(fp), "wb", 512 * 1024) as f:
|
||
sz, sha512, _ = hashcopy(p_data, f, self.args.s_wr_slp)
|
||
|
||
if lim:
|
||
lim.nup(self.ip)
|
||
lim.bup(self.ip, sz)
|
||
try:
|
||
lim.chk_sz(sz)
|
||
except:
|
||
bos.unlink(fp)
|
||
raise
|
||
|
||
new_lastmod = bos.stat(fp).st_mtime
|
||
new_lastmod3 = int(new_lastmod * 1000)
|
||
sha512 = sha512[:56]
|
||
|
||
response = json.dumps(
|
||
{"ok": True, "lastmod": new_lastmod3, "size": sz, "sha512": sha512}
|
||
)
|
||
self.log(response)
|
||
self.parser.drop()
|
||
self.reply(response.encode("utf-8"))
|
||
return True
|
||
|
||
def _chk_lastmod(self, file_ts: int) -> tuple[str, bool]:
|
||
file_lastmod = http_ts(file_ts)
|
||
cli_lastmod = self.headers.get("if-modified-since")
|
||
if cli_lastmod:
|
||
try:
|
||
# some browser append "; length=573"
|
||
cli_lastmod = cli_lastmod.split(";")[0].strip()
|
||
cli_dt = time.strptime(cli_lastmod, HTTP_TS_FMT)
|
||
cli_ts = calendar.timegm(cli_dt)
|
||
return file_lastmod, int(file_ts) > int(cli_ts)
|
||
except Exception as ex:
|
||
self.log(
|
||
"lastmod {}\nremote: [{}]\n local: [{}]".format(
|
||
repr(ex), cli_lastmod, file_lastmod
|
||
)
|
||
)
|
||
return file_lastmod, file_lastmod != cli_lastmod
|
||
|
||
return file_lastmod, True
|
||
|
||
def tx_file(self, req_path: str) -> bool:
|
||
status = 200
|
||
logmsg = "{:4} {} ".format("", self.req)
|
||
logtail = ""
|
||
|
||
#
|
||
# if request is for foo.js, check if we have foo.js.{gz,br}
|
||
|
||
file_ts = 0
|
||
editions: dict[str, tuple[str, int]] = {}
|
||
for ext in ["", ".gz", ".br"]:
|
||
try:
|
||
fs_path = req_path + ext
|
||
st = bos.stat(fs_path)
|
||
if stat.S_ISDIR(st.st_mode):
|
||
continue
|
||
|
||
file_ts = max(file_ts, int(st.st_mtime))
|
||
editions[ext or "plain"] = (fs_path, st.st_size)
|
||
except:
|
||
pass
|
||
if not self.vpath.startswith(".cpr/"):
|
||
break
|
||
|
||
if not editions:
|
||
return self.tx_404()
|
||
|
||
#
|
||
# if-modified
|
||
|
||
file_lastmod, do_send = self._chk_lastmod(file_ts)
|
||
self.out_headers["Last-Modified"] = file_lastmod
|
||
if not do_send:
|
||
status = 304
|
||
|
||
#
|
||
# Accept-Encoding and UA decides which edition to send
|
||
|
||
decompress = False
|
||
supported_editions = [
|
||
x.strip()
|
||
for x in self.headers.get("accept-encoding", "").lower().split(",")
|
||
]
|
||
if ".br" in editions and "br" in supported_editions:
|
||
is_compressed = True
|
||
selected_edition = ".br"
|
||
fs_path, file_sz = editions[".br"]
|
||
self.out_headers["Content-Encoding"] = "br"
|
||
elif ".gz" in editions:
|
||
is_compressed = True
|
||
selected_edition = ".gz"
|
||
fs_path, file_sz = editions[".gz"]
|
||
if "gzip" not in supported_editions:
|
||
decompress = True
|
||
else:
|
||
if re.match(r"MSIE [4-6]\.", self.ua) and " SV1" not in self.ua:
|
||
decompress = True
|
||
|
||
if not decompress:
|
||
self.out_headers["Content-Encoding"] = "gzip"
|
||
else:
|
||
is_compressed = False
|
||
selected_edition = "plain"
|
||
|
||
try:
|
||
fs_path, file_sz = editions[selected_edition]
|
||
logmsg += "{} ".format(selected_edition.lstrip("."))
|
||
except:
|
||
# client is old and we only have .br
|
||
# (could make brotli a dep to fix this but it's not worth)
|
||
raise Pebkac(404)
|
||
|
||
#
|
||
# partial
|
||
|
||
lower = 0
|
||
upper = file_sz
|
||
hrange = self.headers.get("range")
|
||
|
||
# let's not support 206 with compression
|
||
if do_send and not is_compressed and hrange:
|
||
try:
|
||
a, b = hrange.split("=", 1)[1].split("-")
|
||
|
||
if a.strip():
|
||
lower = int(a.strip())
|
||
else:
|
||
lower = 0
|
||
|
||
if b.strip():
|
||
upper = int(b.strip()) + 1
|
||
else:
|
||
upper = file_sz
|
||
|
||
if upper > file_sz:
|
||
upper = file_sz
|
||
|
||
if lower < 0 or lower >= upper:
|
||
raise Exception()
|
||
|
||
except:
|
||
err = "invalid range ({}), size={}".format(hrange, file_sz)
|
||
self.loud_reply(
|
||
err,
|
||
status=416,
|
||
headers={"Content-Range": "bytes */{}".format(file_sz)},
|
||
)
|
||
return True
|
||
|
||
status = 206
|
||
self.out_headers["Content-Range"] = "bytes {}-{}/{}".format(
|
||
lower, upper - 1, file_sz
|
||
)
|
||
|
||
logtail += " [\033[36m{}-{}\033[0m]".format(lower, upper)
|
||
|
||
use_sendfile = False
|
||
if decompress:
|
||
open_func: Any = gzip.open
|
||
open_args: list[Any] = [fsenc(fs_path), "rb"]
|
||
# Content-Length := original file size
|
||
upper = gzip_orig_sz(fs_path)
|
||
else:
|
||
open_func = open
|
||
# 512 kB is optimal for huge files, use 64k
|
||
open_args = [fsenc(fs_path), "rb", 64 * 1024]
|
||
use_sendfile = (
|
||
not self.tls #
|
||
and not self.args.no_sendfile
|
||
and hasattr(os, "sendfile")
|
||
)
|
||
|
||
#
|
||
# send reply
|
||
|
||
if is_compressed:
|
||
self.out_headers["Cache-Control"] = "max-age=573"
|
||
else:
|
||
self.permit_caching()
|
||
|
||
if "txt" in self.uparam:
|
||
mime = "text/plain; charset={}".format(self.uparam["txt"] or "utf-8")
|
||
elif "mime" in self.uparam:
|
||
mime = str(self.uparam.get("mime"))
|
||
else:
|
||
mime = guess_mime(req_path)
|
||
|
||
self.out_headers["Accept-Ranges"] = "bytes"
|
||
self.send_headers(length=upper - lower, status=status, mime=mime)
|
||
|
||
logmsg += unicode(status) + logtail
|
||
|
||
if self.mode == "HEAD" or not do_send:
|
||
if self.do_log:
|
||
self.log(logmsg)
|
||
|
||
return True
|
||
|
||
ret = True
|
||
with open_func(*open_args) as f:
|
||
sendfun = sendfile_kern if use_sendfile else sendfile_py
|
||
remains = sendfun(
|
||
self.log, lower, upper, f, self.s, self.args.s_wr_sz, self.args.s_wr_slp
|
||
)
|
||
|
||
if remains > 0:
|
||
logmsg += " \033[31m" + unicode(upper - remains) + "\033[0m"
|
||
self.keepalive = False
|
||
|
||
spd = self._spd((upper - lower) - remains)
|
||
if self.do_log:
|
||
self.log("{}, {}".format(logmsg, spd))
|
||
|
||
return ret
|
||
|
||
def tx_zip(
|
||
self, fmt: str, uarg: str, vn: VFS, rem: str, items: list[str], dots: bool
|
||
) -> bool:
|
||
if self.args.no_zip:
|
||
raise Pebkac(400, "not enabled")
|
||
|
||
logmsg = "{:4} {} ".format("", self.req)
|
||
self.keepalive = False
|
||
|
||
if fmt == "tar":
|
||
mime = "application/x-tar"
|
||
packer: Type[StreamArc] = StreamTar
|
||
else:
|
||
mime = "application/zip"
|
||
packer = StreamZip
|
||
|
||
fn = items[0] if items and items[0] else self.vpath
|
||
if fn:
|
||
fn = fn.rstrip("/").split("/")[-1]
|
||
else:
|
||
fn = self.headers.get("host", "hey")
|
||
|
||
safe = (string.ascii_letters + string.digits).replace("%", "")
|
||
afn = "".join([x if x in safe.replace('"', "") else "_" for x in fn])
|
||
bascii = unicode(safe).encode("utf-8")
|
||
zb = fn.encode("utf-8", "xmlcharrefreplace")
|
||
if not PY2:
|
||
zbl = [
|
||
chr(x).encode("utf-8")
|
||
if x in bascii
|
||
else "%{:02x}".format(x).encode("ascii")
|
||
for x in zb
|
||
]
|
||
else:
|
||
zbl = [unicode(x) if x in bascii else "%{:02x}".format(ord(x)) for x in zb]
|
||
|
||
ufn = b"".join(zbl).decode("ascii")
|
||
|
||
cdis = "attachment; filename=\"{}.{}\"; filename*=UTF-8''{}.{}"
|
||
cdis = cdis.format(afn, fmt, ufn, fmt)
|
||
self.log(cdis)
|
||
self.send_headers(None, mime=mime, headers={"Content-Disposition": cdis})
|
||
|
||
fgen = vn.zipgen(rem, set(items), self.uname, dots, not self.args.no_scandir)
|
||
# for f in fgen: print(repr({k: f[k] for k in ["vp", "ap"]}))
|
||
bgen = packer(self.log, fgen, utf8="utf" in uarg, pre_crc="crc" in uarg)
|
||
bsent = 0
|
||
for buf in bgen.gen():
|
||
if not buf:
|
||
break
|
||
|
||
try:
|
||
self.s.sendall(buf)
|
||
bsent += len(buf)
|
||
except:
|
||
logmsg += " \033[31m" + unicode(bsent) + "\033[0m"
|
||
break
|
||
|
||
spd = self._spd(bsent)
|
||
self.log("{}, {}".format(logmsg, spd))
|
||
return True
|
||
|
||
def tx_ico(self, ext: str, exact: bool = False) -> bool:
|
||
self.permit_caching()
|
||
if ext.endswith("/"):
|
||
ext = "folder"
|
||
exact = True
|
||
|
||
bad = re.compile(r"[](){}/ []|^[0-9_-]*$")
|
||
n = ext.split(".")[::-1]
|
||
if not exact:
|
||
n = n[:-1]
|
||
|
||
ext = ""
|
||
for v in n:
|
||
if len(v) > 7 or bad.search(v):
|
||
break
|
||
|
||
ext = "{}.{}".format(v, ext)
|
||
|
||
ext = ext.rstrip(".") or "unk"
|
||
if len(ext) > 11:
|
||
ext = "⋯" + ext[-9:]
|
||
|
||
mime, ico = self.ico.get(ext, not exact)
|
||
|
||
dt = datetime.utcfromtimestamp(E.t0)
|
||
lm = dt.strftime("%a, %d %b %Y %H:%M:%S GMT")
|
||
self.reply(ico, mime=mime, headers={"Last-Modified": lm})
|
||
return True
|
||
|
||
def tx_md(self, fs_path: str) -> bool:
|
||
logmsg = "{:4} {} ".format("", self.req)
|
||
|
||
if not self.can_write:
|
||
if "edit" in self.uparam or "edit2" in self.uparam:
|
||
return self.tx_404(True)
|
||
|
||
tpl = "mde" if "edit2" in self.uparam else "md"
|
||
html_path = os.path.join(E.mod, "web", "{}.html".format(tpl))
|
||
template = self.j2j(tpl)
|
||
|
||
st = bos.stat(fs_path)
|
||
ts_md = st.st_mtime
|
||
|
||
st = bos.stat(html_path)
|
||
ts_html = st.st_mtime
|
||
|
||
sz_md = 0
|
||
for buf in yieldfile(fs_path):
|
||
sz_md += len(buf)
|
||
for c, v in [(b"&", 4), (b"<", 3), (b">", 3)]:
|
||
sz_md += (len(buf) - len(buf.replace(c, b""))) * v
|
||
|
||
file_ts = max(ts_md, ts_html, E.t0)
|
||
file_lastmod, do_send = self._chk_lastmod(file_ts)
|
||
self.out_headers["Last-Modified"] = file_lastmod
|
||
self.out_headers.update(NO_CACHE)
|
||
status = 200 if do_send else 304
|
||
|
||
arg_base = "?"
|
||
if "k" in self.uparam:
|
||
arg_base = "?k={}&".format(self.uparam["k"])
|
||
|
||
boundary = "\roll\tide"
|
||
targs = {
|
||
"ts": self.conn.hsrv.cachebuster(),
|
||
"svcname": self.args.doctitle,
|
||
"html_head": self.html_head,
|
||
"edit": "edit" in self.uparam,
|
||
"title": html_escape(self.vpath, crlf=True),
|
||
"lastmod": int(ts_md * 1000),
|
||
"have_emp": self.args.emp,
|
||
"md_chk_rate": self.args.mcr,
|
||
"md": boundary,
|
||
"arg_base": arg_base,
|
||
}
|
||
zs = template.render(**targs).encode("utf-8", "replace")
|
||
html = zs.split(boundary.encode("utf-8"))
|
||
if len(html) != 2:
|
||
raise Exception("boundary appears in " + html_path)
|
||
|
||
self.send_headers(sz_md + len(html[0]) + len(html[1]), status)
|
||
|
||
logmsg += unicode(status)
|
||
if self.mode == "HEAD" or not do_send:
|
||
if self.do_log:
|
||
self.log(logmsg)
|
||
|
||
return True
|
||
|
||
try:
|
||
self.s.sendall(html[0])
|
||
for buf in yieldfile(fs_path):
|
||
self.s.sendall(html_bescape(buf))
|
||
|
||
self.s.sendall(html[1])
|
||
|
||
except:
|
||
self.log(logmsg + " \033[31md/c\033[0m")
|
||
return False
|
||
|
||
if self.do_log:
|
||
self.log(logmsg + " " + unicode(len(html)))
|
||
|
||
return True
|
||
|
||
def tx_mounts(self) -> bool:
|
||
suf = self.urlq({}, ["h"])
|
||
avol = [x for x in self.wvol if x in self.rvol]
|
||
rvol, wvol, avol = [
|
||
[("/" + x).rstrip("/") + "/" for x in y]
|
||
for y in [self.rvol, self.wvol, avol]
|
||
]
|
||
|
||
if avol and not self.args.no_rescan:
|
||
x = self.conn.hsrv.broker.ask("up2k.get_state")
|
||
vs = json.loads(x.get())
|
||
vstate = {("/" + k).rstrip("/") + "/": v for k, v in vs["volstate"].items()}
|
||
else:
|
||
vstate = {}
|
||
vs = {"scanning": None, "hashq": None, "tagq": None, "mtpq": None}
|
||
|
||
if self.uparam.get("ls") in ["v", "t", "txt"]:
|
||
if self.uname == "*":
|
||
txt = "howdy stranger (you're not logged in)"
|
||
else:
|
||
txt = "welcome back {}".format(self.uname)
|
||
|
||
if vstate:
|
||
txt += "\nstatus:"
|
||
for k in ["scanning", "hashq", "tagq", "mtpq"]:
|
||
txt += " {}({})".format(k, vs[k])
|
||
|
||
if rvol:
|
||
txt += "\nyou can browse:"
|
||
for v in rvol:
|
||
txt += "\n " + v
|
||
|
||
if wvol:
|
||
txt += "\nyou can upload to:"
|
||
for v in wvol:
|
||
txt += "\n " + v
|
||
|
||
zb = txt.encode("utf-8", "replace") + b"\n"
|
||
self.reply(zb, mime="text/plain; charset=utf-8")
|
||
return True
|
||
|
||
html = self.j2s(
|
||
"splash",
|
||
this=self,
|
||
qvpath=quotep(self.vpath),
|
||
rvol=rvol,
|
||
wvol=wvol,
|
||
avol=avol,
|
||
vstate=vstate,
|
||
scanning=vs["scanning"],
|
||
hashq=vs["hashq"],
|
||
tagq=vs["tagq"],
|
||
mtpq=vs["mtpq"],
|
||
url_suf=suf,
|
||
k304=self.k304(),
|
||
)
|
||
self.reply(html.encode("utf-8"))
|
||
return True
|
||
|
||
def set_k304(self) -> bool:
|
||
ck = gencookie("k304", self.uparam["k304"], 60 * 60 * 24 * 365)
|
||
self.out_headerlist.append(("Set-Cookie", ck))
|
||
self.redirect("", "?h#cc")
|
||
return True
|
||
|
||
def set_am_js(self) -> bool:
|
||
v = "n" if self.uparam["am_js"] == "n" else "y"
|
||
ck = gencookie("js", v, 60 * 60 * 24 * 365)
|
||
self.out_headerlist.append(("Set-Cookie", ck))
|
||
self.reply(b"promoted\n")
|
||
return True
|
||
|
||
def set_cfg_reset(self) -> bool:
|
||
for k in ("k304", "js", "cppwd"):
|
||
self.out_headerlist.append(("Set-Cookie", gencookie(k, "x", None)))
|
||
|
||
self.redirect("", "?h#cc")
|
||
return True
|
||
|
||
def tx_404(self, is_403: bool = False) -> bool:
|
||
rc = 404
|
||
if self.args.vague_403:
|
||
t = '<h1 id="n">404 not found ┐( ´ -`)┌</h1><p id="o">or maybe you don\'t have access -- try logging in or <a href="/?h">go home</a></p>'
|
||
elif is_403:
|
||
t = '<h1 id="p">403 forbiddena ~┻━┻</h1><p id="q">you\'ll have to log in or <a href="/?h">go home</a></p>'
|
||
rc = 403
|
||
else:
|
||
t = '<h1 id="n">404 not found ┐( ´ -`)┌</h1><p><a id="r" href="/?h">go home</a></p>'
|
||
|
||
html = self.j2s("splash", this=self, qvpath=quotep(self.vpath), msg=t)
|
||
self.reply(html.encode("utf-8"), status=rc)
|
||
return True
|
||
|
||
def scanvol(self) -> bool:
|
||
if not self.can_read or not self.can_write:
|
||
raise Pebkac(403, "not allowed for user " + self.uname)
|
||
|
||
if self.args.no_rescan:
|
||
raise Pebkac(403, "the rescan feature is disabled in server config")
|
||
|
||
vn, _ = self.asrv.vfs.get(self.vpath, self.uname, True, True)
|
||
|
||
args = [self.asrv.vfs.all_vols, [vn.vpath], False]
|
||
|
||
x = self.conn.hsrv.broker.ask("up2k.rescan", *args)
|
||
err = x.get()
|
||
if not err:
|
||
self.redirect("", "?h")
|
||
return True
|
||
|
||
raise Pebkac(500, err)
|
||
|
||
def handle_reload(self) -> bool:
|
||
act = self.uparam.get("reload")
|
||
if act != "cfg":
|
||
raise Pebkac(400, "only config files ('cfg') can be reloaded rn")
|
||
|
||
if not [x for x in self.wvol if x in self.rvol]:
|
||
raise Pebkac(403, "not allowed for user " + self.uname)
|
||
|
||
if self.args.no_reload:
|
||
raise Pebkac(403, "the reload feature is disabled in server config")
|
||
|
||
x = self.conn.hsrv.broker.ask("reload")
|
||
return self.redirect("", "?h", x.get(), "return to", False)
|
||
|
||
def tx_stack(self) -> bool:
|
||
if not [x for x in self.wvol if x in self.rvol]:
|
||
raise Pebkac(403, "not allowed for user " + self.uname)
|
||
|
||
if self.args.no_stack:
|
||
raise Pebkac(403, "the stackdump feature is disabled in server config")
|
||
|
||
ret = "<pre>{}\n{}".format(time.time(), html_escape(alltrace()))
|
||
self.reply(ret.encode("utf-8"))
|
||
return True
|
||
|
||
def tx_tree(self) -> bool:
|
||
top = self.uparam["tree"] or ""
|
||
dst = self.vpath
|
||
if top in [".", ".."]:
|
||
top = undot(self.vpath + "/" + top)
|
||
|
||
if top == dst:
|
||
dst = ""
|
||
elif top:
|
||
if not dst.startswith(top + "/"):
|
||
raise Pebkac(400, "arg funk")
|
||
|
||
dst = dst[len(top) + 1 :]
|
||
|
||
ret = self.gen_tree(top, dst)
|
||
zs = json.dumps(ret)
|
||
self.reply(zs.encode("utf-8"), mime="application/json")
|
||
return True
|
||
|
||
def gen_tree(self, top: str, target: str) -> dict[str, Any]:
|
||
ret: dict[str, Any] = {}
|
||
excl = None
|
||
if target:
|
||
excl, target = (target.split("/", 1) + [""])[:2]
|
||
sub = self.gen_tree("/".join([top, excl]).strip("/"), target)
|
||
ret["k" + quotep(excl)] = sub
|
||
|
||
try:
|
||
vn, rem = self.asrv.vfs.get(top, self.uname, True, False)
|
||
fsroot, vfs_ls, vfs_virt = vn.ls(
|
||
rem, self.uname, not self.args.no_scandir, [[True], [False, True]]
|
||
)
|
||
except:
|
||
vfs_ls = []
|
||
vfs_virt = {}
|
||
for v in self.rvol:
|
||
d1, d2 = v.rsplit("/", 1) if "/" in v else ["", v]
|
||
if d1 == top:
|
||
vfs_virt[d2] = self.asrv.vfs # typechk, value never read
|
||
|
||
dirs = []
|
||
|
||
dirnames = [x[0] for x in vfs_ls if stat.S_ISDIR(x[1].st_mode)]
|
||
|
||
if not self.args.ed or "dots" not in self.uparam:
|
||
dirnames = exclude_dotfiles(dirnames)
|
||
|
||
for fn in [x for x in dirnames if x != excl]:
|
||
dirs.append(quotep(fn))
|
||
|
||
for x in vfs_virt:
|
||
if x != excl:
|
||
dirs.append(x)
|
||
|
||
ret["a"] = dirs
|
||
return ret
|
||
|
||
def tx_ups(self) -> bool:
|
||
if not self.args.unpost:
|
||
raise Pebkac(400, "the unpost feature is disabled in server config")
|
||
|
||
idx = self.conn.get_u2idx()
|
||
if not hasattr(idx, "p_end"):
|
||
raise Pebkac(500, "sqlite3 is not available on the server; cannot unpost")
|
||
|
||
filt = self.uparam.get("filter")
|
||
lm = "ups [{}]".format(filt)
|
||
self.log(lm)
|
||
|
||
ret: list[dict[str, Any]] = []
|
||
t0 = time.time()
|
||
lim = time.time() - self.args.unpost
|
||
for vol in self.asrv.vfs.all_vols.values():
|
||
cur = idx.get_cur(vol.realpath)
|
||
if not cur:
|
||
continue
|
||
|
||
q = "select sz, rd, fn, at from up where ip=? and at>?"
|
||
for sz, rd, fn, at in cur.execute(q, (self.ip, lim)):
|
||
vp = "/" + "/".join(x for x in [vol.vpath, rd, fn] if x)
|
||
if filt and filt not in vp:
|
||
continue
|
||
|
||
ret.append({"vp": quotep(vp), "sz": sz, "at": at})
|
||
if len(ret) > 3000:
|
||
ret.sort(key=lambda x: x["at"], reverse=True) # type: ignore
|
||
ret = ret[:2000]
|
||
|
||
ret.sort(key=lambda x: x["at"], reverse=True) # type: ignore
|
||
ret = ret[:2000]
|
||
|
||
jtxt = json.dumps(ret, indent=2, sort_keys=True).encode("utf-8", "replace")
|
||
self.log("{} #{} {:.2f}sec".format(lm, len(ret), time.time() - t0))
|
||
self.reply(jtxt, mime="application/json")
|
||
return True
|
||
|
||
def handle_rm(self, req: list[str]) -> bool:
|
||
if not req and not self.can_delete:
|
||
raise Pebkac(403, "not allowed for user " + self.uname)
|
||
|
||
if self.args.no_del:
|
||
raise Pebkac(403, "the delete feature is disabled in server config")
|
||
|
||
if not req:
|
||
req = [self.vpath]
|
||
|
||
x = self.conn.hsrv.broker.ask("up2k.handle_rm", self.uname, self.ip, req)
|
||
self.loud_reply(x.get())
|
||
return True
|
||
|
||
def handle_mv(self) -> bool:
|
||
if not self.can_move:
|
||
raise Pebkac(403, "not allowed for user " + self.uname)
|
||
|
||
if self.args.no_mv:
|
||
raise Pebkac(403, "the rename/move feature is disabled in server config")
|
||
|
||
# full path of new loc (incl filename)
|
||
dst = self.uparam.get("move")
|
||
if not dst:
|
||
raise Pebkac(400, "need dst vpath")
|
||
|
||
# x-www-form-urlencoded (url query part) uses
|
||
# either + or %20 for 0x20 so handle both
|
||
dst = unquotep(dst.replace("+", " "))
|
||
x = self.conn.hsrv.broker.ask("up2k.handle_mv", self.uname, self.vpath, dst)
|
||
self.loud_reply(x.get())
|
||
return True
|
||
|
||
def tx_ls(self, ls: dict[str, Any]) -> bool:
|
||
dirs = ls["dirs"]
|
||
files = ls["files"]
|
||
arg = self.uparam["ls"]
|
||
if arg in ["v", "t", "txt"]:
|
||
try:
|
||
biggest = max(ls["files"] + ls["dirs"], key=itemgetter("sz"))["sz"]
|
||
except:
|
||
biggest = 0
|
||
|
||
if arg == "v":
|
||
fmt = "\033[0;7;36m{{}} {{:>{}}}\033[0m {{}}"
|
||
nfmt = "{}"
|
||
biggest = 0
|
||
f2 = "".join(
|
||
"{}{{}}".format(x)
|
||
for x in [
|
||
"\033[7m",
|
||
"\033[27m",
|
||
"",
|
||
"\033[0;1m",
|
||
"\033[0;36m",
|
||
"\033[0m",
|
||
]
|
||
)
|
||
ctab = {"B": 6, "K": 5, "M": 1, "G": 3}
|
||
for lst in [dirs, files]:
|
||
for x in lst:
|
||
a = x["dt"].replace("-", " ").replace(":", " ").split(" ")
|
||
x["dt"] = f2.format(*list(a))
|
||
sz = humansize(x["sz"], True)
|
||
x["sz"] = "\033[0;3{}m{:>5}".format(ctab.get(sz[-1:], 0), sz)
|
||
else:
|
||
fmt = "{{}} {{:{},}} {{}}"
|
||
nfmt = "{:,}"
|
||
|
||
for x in dirs:
|
||
n = x["name"] + "/"
|
||
if arg == "v":
|
||
n = "\033[94m" + n
|
||
|
||
x["name"] = n
|
||
|
||
fmt = fmt.format(len(nfmt.format(biggest)))
|
||
retl = [
|
||
"# {}: {}".format(x, ls[x])
|
||
for x in ["acct", "perms", "srvinf"]
|
||
if x in ls
|
||
]
|
||
retl += [
|
||
fmt.format(x["dt"], x["sz"], x["name"])
|
||
for y in [dirs, files]
|
||
for x in y
|
||
]
|
||
ret = "\n".join(retl)
|
||
mime = "text/plain; charset=utf-8"
|
||
else:
|
||
[x.pop(k) for k in ["name", "dt"] for y in [dirs, files] for x in y]
|
||
|
||
ret = json.dumps(ls)
|
||
mime = "application/json"
|
||
|
||
ret += "\n\033[0m" if arg == "v" else "\n"
|
||
self.reply(ret.encode("utf-8", "replace"), mime=mime)
|
||
return True
|
||
|
||
def tx_browser(self) -> bool:
|
||
vpath = ""
|
||
vpnodes = [["", "/"]]
|
||
if self.vpath:
|
||
for node in self.vpath.split("/"):
|
||
if not vpath:
|
||
vpath = node
|
||
else:
|
||
vpath += "/" + node
|
||
|
||
vpnodes.append([quotep(vpath) + "/", html_escape(node, crlf=True)])
|
||
|
||
vn, rem = self.asrv.vfs.get(self.vpath, self.uname, False, False)
|
||
abspath = vn.canonical(rem)
|
||
dbv, vrem = vn.get_dbv(rem)
|
||
|
||
try:
|
||
st = bos.stat(abspath)
|
||
except:
|
||
return self.tx_404()
|
||
|
||
if rem.startswith(".hist/up2k.") or (
|
||
rem.endswith("/dir.txt") and rem.startswith(".hist/th/")
|
||
):
|
||
raise Pebkac(403)
|
||
|
||
self.html_head = vn.flags.get("html_head", "")
|
||
if vn.flags.get("norobots"):
|
||
self.out_headers["X-Robots-Tag"] = "noindex, nofollow"
|
||
else:
|
||
self.out_headers.pop("X-Robots-Tag", None)
|
||
|
||
is_dir = stat.S_ISDIR(st.st_mode)
|
||
if self.can_read:
|
||
th_fmt = self.uparam.get("th")
|
||
if th_fmt is not None:
|
||
if is_dir:
|
||
for fn in self.args.th_covers.split(","):
|
||
fp = os.path.join(abspath, fn)
|
||
if bos.path.exists(fp):
|
||
vrem = "{}/{}".format(vrem.rstrip("/"), fn).strip("/")
|
||
is_dir = False
|
||
break
|
||
|
||
if is_dir:
|
||
return self.tx_ico("a.folder")
|
||
|
||
thp = None
|
||
if self.thumbcli:
|
||
thp = self.thumbcli.get(dbv, vrem, int(st.st_mtime), th_fmt)
|
||
|
||
if thp:
|
||
return self.tx_file(thp)
|
||
|
||
return self.tx_ico(rem)
|
||
|
||
if not is_dir and (self.can_read or self.can_get):
|
||
if not self.can_read and "fk" in vn.flags:
|
||
correct = gen_filekey(
|
||
self.args.fk_salt, abspath, st.st_size, 0 if ANYWIN else st.st_ino
|
||
)[: vn.flags["fk"]]
|
||
got = self.uparam.get("k")
|
||
if got != correct:
|
||
self.log("wrong filekey, want {}, got {}".format(correct, got))
|
||
return self.tx_404()
|
||
|
||
if abspath.endswith(".md") and "raw" not in self.uparam:
|
||
return self.tx_md(abspath)
|
||
|
||
return self.tx_file(abspath)
|
||
|
||
elif is_dir and not self.can_read and not self.can_write:
|
||
return self.tx_404(True)
|
||
|
||
srv_info = []
|
||
|
||
try:
|
||
if not self.args.nih:
|
||
srv_info.append(unicode(socket.gethostname()).split(".")[0])
|
||
except:
|
||
self.log("#wow #whoa")
|
||
|
||
try:
|
||
# some fuses misbehave
|
||
if not self.args.nid:
|
||
if WINDOWS:
|
||
try:
|
||
bfree = ctypes.c_ulonglong(0)
|
||
ctypes.windll.kernel32.GetDiskFreeSpaceExW( # type: ignore
|
||
ctypes.c_wchar_p(abspath), None, None, ctypes.pointer(bfree)
|
||
)
|
||
srv_info.append(humansize(bfree.value) + " free")
|
||
except:
|
||
pass
|
||
else:
|
||
sv = os.statvfs(fsenc(abspath))
|
||
free = humansize(sv.f_frsize * sv.f_bfree, True)
|
||
total = humansize(sv.f_frsize * sv.f_blocks, True)
|
||
|
||
srv_info.append("{} free of {}".format(free, total))
|
||
except:
|
||
pass
|
||
|
||
srv_infot = "</span> // <span>".join(srv_info)
|
||
|
||
perms = []
|
||
if self.can_read:
|
||
perms.append("read")
|
||
if self.can_write:
|
||
perms.append("write")
|
||
if self.can_move:
|
||
perms.append("move")
|
||
if self.can_delete:
|
||
perms.append("delete")
|
||
if self.can_get:
|
||
perms.append("get")
|
||
|
||
url_suf = self.urlq({}, ["k"])
|
||
is_ls = "ls" in self.uparam
|
||
is_js = self.args.force_js or self.cookies.get("js") == "y"
|
||
|
||
tpl = "browser"
|
||
if "b" in self.uparam:
|
||
tpl = "browser2"
|
||
is_js = False
|
||
|
||
logues = ["", ""]
|
||
if not self.args.no_logues:
|
||
for n, fn in enumerate([".prologue.html", ".epilogue.html"]):
|
||
fn = os.path.join(abspath, fn)
|
||
if bos.path.exists(fn):
|
||
with open(fsenc(fn), "rb") as f:
|
||
logues[n] = f.read().decode("utf-8")
|
||
|
||
readme = ""
|
||
if not self.args.no_readme and not logues[1]:
|
||
for fn in ["README.md", "readme.md"]:
|
||
fn = os.path.join(abspath, fn)
|
||
if bos.path.exists(fn):
|
||
with open(fsenc(fn), "rb") as f:
|
||
readme = f.read().decode("utf-8")
|
||
break
|
||
|
||
ls_ret = {
|
||
"dirs": [],
|
||
"files": [],
|
||
"taglist": [],
|
||
"srvinf": srv_infot,
|
||
"acct": self.uname,
|
||
"idx": ("e2d" in vn.flags),
|
||
"perms": perms,
|
||
"logues": logues,
|
||
"readme": readme,
|
||
}
|
||
j2a = {
|
||
"vdir": quotep(self.vpath),
|
||
"vpnodes": vpnodes,
|
||
"files": [],
|
||
"ls0": None,
|
||
"acct": self.uname,
|
||
"perms": json.dumps(perms),
|
||
"taglist": [],
|
||
"def_hcols": [],
|
||
"have_emp": self.args.emp,
|
||
"have_up2k_idx": ("e2d" in vn.flags),
|
||
"have_tags_idx": ("e2t" in vn.flags),
|
||
"have_acode": (not self.args.no_acode),
|
||
"have_mv": (not self.args.no_mv),
|
||
"have_del": (not self.args.no_del),
|
||
"have_zip": (not self.args.no_zip),
|
||
"have_unpost": (self.args.unpost > 0),
|
||
"have_b_u": (self.can_write and self.uparam.get("b") == "u"),
|
||
"url_suf": url_suf,
|
||
"logues": logues,
|
||
"readme": readme,
|
||
"title": html_escape(self.vpath, crlf=True),
|
||
"srv_info": srv_infot,
|
||
"lang": self.args.lang,
|
||
"dtheme": self.args.theme,
|
||
"themes": self.args.themes,
|
||
"turbolvl": self.args.turbo,
|
||
}
|
||
|
||
if self.args.js_browser:
|
||
j2a["js"] = self.args.js_browser
|
||
|
||
if self.args.css_browser:
|
||
j2a["css"] = self.args.css_browser
|
||
|
||
if not self.conn.hsrv.prism:
|
||
j2a["no_prism"] = True
|
||
|
||
if not self.can_read:
|
||
if is_ls:
|
||
return self.tx_ls(ls_ret)
|
||
|
||
if not stat.S_ISDIR(st.st_mode):
|
||
return self.tx_404(True)
|
||
|
||
if "zip" in self.uparam or "tar" in self.uparam:
|
||
raise Pebkac(403)
|
||
|
||
html = self.j2s(tpl, **j2a)
|
||
self.reply(html.encode("utf-8", "replace"))
|
||
return True
|
||
|
||
for k in ["zip", "tar"]:
|
||
v = self.uparam.get(k)
|
||
if v is not None:
|
||
return self.tx_zip(k, v, vn, rem, [], self.args.ed)
|
||
|
||
fsroot, vfs_ls, vfs_virt = vn.ls(
|
||
rem, self.uname, not self.args.no_scandir, [[True], [False, True]]
|
||
)
|
||
stats = {k: v for k, v in vfs_ls}
|
||
ls_names = [x[0] for x in vfs_ls]
|
||
ls_names.extend(list(vfs_virt.keys()))
|
||
|
||
# check for old versions of files,
|
||
# [num-backups, most-recent, hist-path]
|
||
hist: dict[str, tuple[int, float, str]] = {}
|
||
histdir = os.path.join(fsroot, ".hist")
|
||
ptn = re.compile(r"(.*)\.([0-9]+\.[0-9]{3})(\.[^\.]+)$")
|
||
try:
|
||
for hfn in bos.listdir(histdir):
|
||
m = ptn.match(hfn)
|
||
if not m:
|
||
continue
|
||
|
||
fn = m.group(1) + m.group(3)
|
||
n, ts, _ = hist.get(fn, (0, 0, ""))
|
||
hist[fn] = (n + 1, max(ts, float(m.group(2))), hfn)
|
||
except:
|
||
pass
|
||
|
||
# show dotfiles if permitted and requested
|
||
if not self.args.ed or "dots" not in self.uparam:
|
||
ls_names = exclude_dotfiles(ls_names)
|
||
|
||
icur = None
|
||
if "e2t" in vn.flags:
|
||
idx = self.conn.get_u2idx()
|
||
icur = idx.get_cur(dbv.realpath)
|
||
|
||
add_fk = vn.flags.get("fk")
|
||
|
||
dirs = []
|
||
files = []
|
||
for fn in ls_names:
|
||
base = ""
|
||
href = fn
|
||
if not is_ls and not is_js and not self.trailing_slash and vpath:
|
||
base = "/" + vpath + "/"
|
||
href = base + fn
|
||
|
||
if fn in vfs_virt:
|
||
fspath = vfs_virt[fn].realpath
|
||
else:
|
||
fspath = fsroot + "/" + fn
|
||
|
||
try:
|
||
inf = stats.get(fn) or bos.stat(fspath)
|
||
except:
|
||
self.log("broken symlink: {}".format(repr(fspath)))
|
||
continue
|
||
|
||
is_dir = stat.S_ISDIR(inf.st_mode)
|
||
if is_dir:
|
||
href += "/"
|
||
if self.args.no_zip:
|
||
margin = "DIR"
|
||
else:
|
||
margin = '<a href="{}?zip">zip</a>'.format(quotep(href))
|
||
elif fn in hist:
|
||
margin = '<a href="{}.hist/{}">#{}</a>'.format(
|
||
base, html_escape(hist[fn][2], quot=True, crlf=True), hist[fn][0]
|
||
)
|
||
else:
|
||
margin = "-"
|
||
|
||
sz = inf.st_size
|
||
zd = datetime.utcfromtimestamp(inf.st_mtime)
|
||
dt = zd.strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
try:
|
||
ext = "---" if is_dir else fn.rsplit(".", 1)[1]
|
||
if len(ext) > 16:
|
||
ext = ext[:16]
|
||
except:
|
||
ext = "%"
|
||
|
||
if add_fk:
|
||
href = "{}?k={}".format(
|
||
quotep(href),
|
||
gen_filekey(
|
||
self.args.fk_salt, fspath, sz, 0 if ANYWIN else inf.st_ino
|
||
)[:add_fk],
|
||
)
|
||
else:
|
||
href = quotep(href)
|
||
|
||
item = {
|
||
"lead": margin,
|
||
"href": href,
|
||
"name": fn,
|
||
"sz": sz,
|
||
"ext": ext,
|
||
"dt": dt,
|
||
"ts": int(inf.st_mtime),
|
||
}
|
||
if is_dir:
|
||
dirs.append(item)
|
||
else:
|
||
files.append(item)
|
||
item["rd"] = rem
|
||
|
||
tagset: set[str] = set()
|
||
for fe in files:
|
||
fn = fe["name"]
|
||
rd = fe["rd"]
|
||
del fe["rd"]
|
||
if not icur:
|
||
break
|
||
|
||
if vn != dbv:
|
||
_, rd = vn.get_dbv(rd)
|
||
|
||
q = "select w from up where rd = ? and fn = ?"
|
||
r = None
|
||
try:
|
||
r = icur.execute(q, (rd, fn)).fetchone()
|
||
except Exception as ex:
|
||
if "database is locked" in str(ex):
|
||
break
|
||
|
||
try:
|
||
args = s3enc(idx.mem_cur, rd, fn)
|
||
r = icur.execute(q, args).fetchone()
|
||
except:
|
||
t = "tag list error, {}/{}\n{}"
|
||
self.log(t.format(rd, fn, min_ex()))
|
||
break
|
||
|
||
tags: dict[str, Any] = {}
|
||
fe["tags"] = tags
|
||
|
||
if not r:
|
||
continue
|
||
|
||
w = r[0][:16]
|
||
q = "select k, v from mt where w = ? and +k != 'x'"
|
||
try:
|
||
for k, v in icur.execute(q, (w,)):
|
||
tagset.add(k)
|
||
tags[k] = v
|
||
except:
|
||
t = "tag read error, {}/{} [{}]:\n{}"
|
||
self.log(t.format(rd, fn, w, min_ex()))
|
||
break
|
||
|
||
if icur:
|
||
taglist = [k for k in vn.flags.get("mte", "").split(",") if k in tagset]
|
||
for fe in dirs:
|
||
fe["tags"] = {}
|
||
else:
|
||
taglist = list(tagset)
|
||
|
||
if is_ls:
|
||
ls_ret["dirs"] = dirs
|
||
ls_ret["files"] = files
|
||
ls_ret["taglist"] = taglist
|
||
return self.tx_ls(ls_ret)
|
||
|
||
doc = self.uparam.get("doc") if self.can_read else None
|
||
if doc:
|
||
doc = unquotep(doc.replace("+", " ").split("?")[0])
|
||
j2a["docname"] = doc
|
||
doctxt = None
|
||
if next((x for x in files if x["name"] == doc), None):
|
||
docpath = os.path.join(abspath, doc)
|
||
sz = bos.path.getsize(docpath)
|
||
if sz < 1024 * self.args.txt_max:
|
||
with open(fsenc(docpath), "rb") as f:
|
||
doctxt = f.read().decode("utf-8", "replace")
|
||
else:
|
||
self.log("doc 404: [{}]".format(doc), c=6)
|
||
doctxt = "( textfile not found )"
|
||
|
||
if doctxt is not None:
|
||
j2a["doc"] = doctxt
|
||
|
||
for d in dirs:
|
||
d["name"] += "/"
|
||
|
||
dirs.sort(key=itemgetter("name"))
|
||
|
||
if is_js:
|
||
j2a["ls0"] = {"dirs": dirs, "files": files, "taglist": taglist}
|
||
j2a["files"] = []
|
||
else:
|
||
j2a["files"] = dirs + files
|
||
|
||
j2a["taglist"] = taglist
|
||
j2a["txt_ext"] = self.args.textfiles.replace(",", " ")
|
||
|
||
if "mth" in vn.flags:
|
||
j2a["def_hcols"] = vn.flags["mth"].split(",")
|
||
|
||
html = self.j2s(tpl, **j2a)
|
||
self.reply(html.encode("utf-8", "replace"))
|
||
return True
|