add safety profiles + improve helptext + speed

This commit is contained in:
ed 2022-06-16 10:21:44 +02:00
parent 438384425a
commit eb3fa5aa6b
5 changed files with 225 additions and 26 deletions

View file

@ -24,7 +24,7 @@ from .__init__ import ANYWIN, PY2, VT100, WINDOWS, E, unicode
from .__version__ import CODENAME, S_BUILD_DT, S_VERSION
from .authsrv import re_vol
from .svchub import SvcHub
from .util import IMPLICATIONS, align_tab, ansi_re, min_ex, py_desc
from .util import IMPLICATIONS, align_tab, ansi_re, min_ex, py_desc, termsize, wrap
try:
from types import FrameType
@ -43,6 +43,12 @@ printed: list[str] = []
class RiceFormatter(argparse.HelpFormatter):
def __init__(self, *args: Any, **kwargs: Any) -> None:
if PY2:
kwargs["width"] = termsize()[0]
super(RiceFormatter, self).__init__(*args, **kwargs)
def _get_help_string(self, action: argparse.Action) -> str:
"""
same as ArgumentDefaultsHelpFormatter(HelpFormatter)
@ -52,7 +58,7 @@ class RiceFormatter(argparse.HelpFormatter):
if not VT100:
fmt = " (default: %(default)s)"
ret = str(action.help)
ret = unicode(action.help)
if "%(default)" not in ret:
if action.default is not argparse.SUPPRESS:
defaulting_nargs = [argparse.OPTIONAL, argparse.ZERO_OR_MORE]
@ -64,6 +70,27 @@ class RiceFormatter(argparse.HelpFormatter):
"""same as RawDescriptionHelpFormatter(HelpFormatter)"""
return "".join(indent + line + "\n" for line in text.splitlines())
def __add_whitespace(self, idx: int, iWSpace: int, text: str) -> str:
return (" " * iWSpace) + text if idx else text
def _split_lines(self, text: str, width: int) -> list[str]:
# https://stackoverflow.com/a/35925919
textRows = text.splitlines()
ptn = re.compile(r"\s*[0-9\-]{0,}\.?\s*")
for idx, line in enumerate(textRows):
search = ptn.search(line)
if not line.strip():
textRows[idx] = " "
elif search:
lWSpace = search.end()
lines = [
self.__add_whitespace(i, lWSpace, x)
for i, x in enumerate(wrap(line, width, width - 1))
]
textRows[idx] = lines
return [item for sublist in textRows for item in sublist]
class Dodge11874(RiceFormatter):
def __init__(self, *args: Any, **kwargs: Any) -> None:
@ -71,6 +98,14 @@ class Dodge11874(RiceFormatter):
super(Dodge11874, self).__init__(*args, **kwargs)
class BasicDodge11874(
argparse.ArgumentDefaultsHelpFormatter, argparse.RawDescriptionHelpFormatter
):
def __init__(self, *args: Any, **kwargs: Any) -> None:
kwargs["width"] = 9003
super(BasicDodge11874, self).__init__(*args, **kwargs)
def lprint(*a: Any, **ka: Any) -> None:
txt: str = " ".join(unicode(x) for x in a) + ka.get("end", "\n")
printed.append(txt)
@ -483,6 +518,9 @@ def run_argparse(argv: list[str], formatter: Any) -> argparse.Namespace:
ap2.add_argument("--no-lifetime", action="store_true", help="disable automatic deletion of uploads after a certain time (lifetime volflag)")
ap2 = ap.add_argument_group('safety options')
ap2.add_argument("-s", action="count", default=0, help="increase safety: Disable thumbnails / potentially dangerous software (ffmpeg/pillow/vips), hide partial uploads, avoid crawlers.\n └─Alias of\033[32m --dotpart --no-thumb --no-mtag-ff --no-robots --force-js")
ap2.add_argument("-ss", action="store_true", help="further increase safety: Prevent js-injection, accidental move/delete, broken symlinks, 404 on 403.\n └─Alias of\033[32m -s --no-dot-mv --no-dot-ren --unpost=0 --no-del --no-mv --hardlink --vague-403 -nih")
ap2.add_argument("-sss", action="store_true", help="further increase safety: Enable logging to disk, scan for dangerous symlinks.\n └─Alias of\033[32m -ss -lo=cpp-%%Y-%%m%%d-%%H%%M%%S.txt.xz --ls=**,*,ln,p,r")
ap2.add_argument("--ls", metavar="U[,V[,F]]", type=u, help="do a sanity/safety check of all volumes on startup; arguments USER,VOL,FLAGS; example [**,*,ln,p,r]")
ap2.add_argument("--salt", type=u, default="hunter2", help="up2k file-hash salt; used to generate unpredictable internal identifiers for uploads -- doesn't really matter")
ap2.add_argument("--fk-salt", metavar="SALT", type=u, default=fk_salt, help="per-file accesskey salt; used to generate unpredictable URLs for hidden files -- this one DOES matter")
@ -644,16 +682,21 @@ def main(argv: Optional[list[str]] = None) -> None:
except:
pass
for fmtr in [RiceFormatter, Dodge11874, BasicDodge11874]:
try:
al = run_argparse(argv, RiceFormatter)
except AssertionError:
al = run_argparse(argv, Dodge11874)
al = run_argparse(argv, fmtr)
except SystemExit:
raise
except:
lprint("\n[ {} ]:\n{}\n".format(fmtr, min_ex()))
assert al
if WINDOWS and not al.keep_qem:
try:
disable_quickedit()
except:
print("\nfailed to disable quick-edit-mode:\n" + min_ex() + "\n")
lprint("\nfailed to disable quick-edit-mode:\n" + min_ex() + "\n")
if not VT100:
al.wintitle = ""

View file

@ -57,6 +57,29 @@ class SvcHub(object):
self.log_mutex = threading.Lock()
self.next_day = 0
if args.sss or args.s >= 3:
args.ss = True
args.lo = args.lo or "cpp-%Y-%m%d-%H%M%S.txt.xz"
args.ls = args.ls or "**,*,ln,p,r"
if args.ss or args.s >= 2:
args.s = True
args.no_dot_mv = True
args.no_dot_ren = True
args.unpost = 0
args.no_del = True
args.no_mv = True
args.hardlink = True
args.vague_403 = True
args.nih = True
if args.s:
args.dotpart = True
args.no_thumb = True
args.no_mtag_ff = True
args.no_robots = True
args.force_js = True
self.log = self._log_disabled if args.q else self._log_enabled
if args.lo:
self._setup_logfile(printed)

View file

@ -1608,6 +1608,116 @@ def align_tab(lines: list[str]) -> list[str]:
return ["".join(x.ljust(y + 2) for x, y in zip(row, lens)) for row in rows]
def visual_length(txt: str) -> int:
# from r0c
eoc = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
clen = 0
pend = None
counting = True
for ch in txt:
# escape sequences can never contain ESC;
# treat pend as regular text if so
if ch == "\033" and pend:
clen += len(pend)
counting = True
pend = None
if not counting:
if ch in eoc:
counting = True
else:
if pend:
pend += ch
if pend.startswith("\033["):
counting = False
else:
clen += len(pend)
counting = True
pend = None
else:
if ch == "\033":
pend = "{0}".format(ch)
else:
co = ord(ch)
# the safe parts of latin1 and cp437 (no greek stuff)
if (
co < 0x100 # ascii + lower half of latin1
or (co >= 0x2500 and co <= 0x25A0) # box drawings
or (co >= 0x2800 and co <= 0x28FF) # braille
):
clen += 1
else:
# assume moonrunes or other double-width
clen += 2
return clen
def wrap(txt: str, maxlen: int, maxlen2: int) -> list[str]:
# from r0c
words = re.sub(r"([, ])", r"\1\n", txt.rstrip()).split("\n")
pad = maxlen - maxlen2
ret = []
for word in words:
if len(word) * 2 < maxlen or visual_length(word) < maxlen:
ret.append(word)
else:
while visual_length(word) >= maxlen:
ret.append(word[: maxlen - 1] + "-")
word = word[maxlen - 1 :]
if word:
ret.append(word)
words = ret
ret = []
ln = ""
spent = 0
for word in words:
wl = visual_length(word)
if spent + wl > maxlen:
ret.append(ln)
maxlen = maxlen2
spent = 0
ln = " " * pad
ln += word
spent += wl
if ln:
ret.append(ln)
return ret
def termsize() -> tuple[int, int]:
# from hashwalk
env = os.environ
def ioctl_GWINSZ(fd):
try:
import fcntl, termios, struct
cr = struct.unpack("hh", fcntl.ioctl(fd, termios.TIOCGWINSZ, "1234"))
except:
return
return cr
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
if not cr:
try:
fd = os.open(os.ctermid(), os.O_RDONLY)
cr = ioctl_GWINSZ(fd)
os.close(fd)
except:
pass
if not cr:
try:
cr = (env["LINES"], env["COLUMNS"])
except:
cr = (25, 80)
return int(cr[1]), int(cr[0])
class Pebkac(Exception):
def __init__(self, code: int, msg: Optional[str] = None) -> None:
super(Pebkac, self).__init__(msg or HTTPCODE[code])

View file

@ -21,20 +21,39 @@ def uh(top):
if os.path.exists(top + "/uh"):
return
libs = "typing|types|collections\.abc"
ptn = re.compile(r"^(\s*)(from (?:{0}) import |import (?:{0})\b).*".format(libs))
# pr("building support for your python ver")
pr("unhinting")
files = []
for (dp, _, fns) in os.walk(top):
for fn in fns:
if not fn.endswith(".py"):
continue
pr(".")
fp = os.path.join(dp, fn)
files.append(fp)
try:
import multiprocessing as mp
with mp.Pool(os.cpu_count()) as pool:
pool.map(uh1, files)
except Exception as ex:
print("\nnon-mp fallback due to {}\n".format(ex))
for fp in files:
uh1(fp)
pr("k\n\n")
with open(top + "/uh", "wb") as f:
f.write(b"a")
def uh1(fp):
pr(".")
cs = strip_file_to_string(fp, no_ast=True, to_empty=True)
libs = "typing|types|collections\.abc"
ptn = re.compile(r"^(\s*)(from (?:{0}) import |import (?:{0})\b).*".format(libs))
# remove expensive imports too
lns = []
for ln in cs.split("\n"):
@ -48,10 +67,6 @@ def uh(top):
with open(fp, "wb") as f:
f.write(cs.encode("utf-8"))
pr("k\n\n")
with open(top + "/uh", "wb") as f:
f.write(b"a")
if __name__ == "__main__":
uh(".")

View file

@ -3,6 +3,7 @@
from __future__ import print_function, unicode_literals
import io
import os
import sys
import tokenize
@ -10,6 +11,7 @@ import tokenize
def uncomment(fpath):
"""modified https://stackoverflow.com/a/62074206"""
print(".", end="", flush=True)
with open(fpath, "rb") as f:
orig = f.read().decode("utf-8")
@ -66,8 +68,14 @@ def uncomment(fpath):
def main():
print("uncommenting", end="", flush=True)
try:
import multiprocessing as mp
with mp.Pool(os.cpu_count()) as pool:
pool.map(uncomment, sys.argv[1:])
except Exception as ex:
print("\nnon-mp fallback due to {}\n".format(ex))
for f in sys.argv[1:]:
print(".", end="", flush=True)
uncomment(f)
print("k")