make pylance happier

This commit is contained in:
ed 2022-10-29 20:40:25 +00:00
parent 3411b0993f
commit edad3246e0
28 changed files with 154 additions and 180 deletions

View file

@ -7,10 +7,13 @@ import sys
import time
try:
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING
except:
TYPE_CHECKING = False
if True:
from typing import Any
PY2 = sys.version_info < (3,)
if PY2:
sys.dont_write_bytecode = True
@ -40,8 +43,8 @@ except:
class EnvParams(object):
def __init__(self) -> None:
self.t0 = time.time()
self.mod = None
self.cfg = None
self.mod = ""
self.cfg = ""
self.ox = getattr(sys, "oxidized", None)

View file

@ -38,13 +38,11 @@ from .util import (
wrap,
)
try:
if True: # pylint: disable=using-constant-test
from collections.abc import Callable
from types import FrameType
from typing import Any, Optional
except:
pass
try:
HAVE_SSL = True
@ -79,9 +77,9 @@ class RiceFormatter(argparse.HelpFormatter):
ret += fmt
if not VT100:
ret = re.sub("\033\[[0-9;]+m", "", ret)
ret = re.sub("\033\\[[0-9;]+m", "", ret)
return ret
return ret # type: ignore
def _fill_text(self, text: str, width: int, indent: str) -> str:
"""same as RawDescriptionHelpFormatter(HelpFormatter)"""
@ -104,7 +102,7 @@ class RiceFormatter(argparse.HelpFormatter):
self.__add_whitespace(i, lWSpace, x)
for i, x in enumerate(wrap(line, width, width - 1))
]
textRows[idx] = lines
textRows[idx] = lines # type: ignore
return [item for sublist in textRows for item in sublist]
@ -141,7 +139,7 @@ def init_E(E: EnvParams) -> None:
# __init__ runs 18 times when oxidized; do expensive stuff here
def get_unixdir() -> str:
paths: list[tuple[Callable[..., str], str]] = [
paths: list[tuple[Callable[..., Any], str]] = [
(os.environ.get, "XDG_CONFIG_HOME"),
(os.path.expanduser, "~/.config"),
(os.environ.get, "TMPDIR"),
@ -163,7 +161,7 @@ def init_E(E: EnvParams) -> None:
if not os.path.isdir(p):
os.mkdir(p)
return p
return p # type: ignore
except:
pass
@ -867,7 +865,7 @@ def main(argv: Optional[list[str]] = None) -> None:
retry = True
lprint("\n[ {} ]:\n{}\n".format(fmtr, min_ex()))
assert al
assert al # type: ignore
al.E = E # __init__ is not shared when oxidized
if WINDOWS and not al.keep_qem:

View file

@ -31,15 +31,13 @@ from .util import (
unhumanize,
)
try:
if True: # pylint: disable=using-constant-test
from collections.abc import Iterable
import typing
from typing import Any, Generator, Optional, Union
from .util import RootLogger
except:
pass
if TYPE_CHECKING:
pass
@ -412,7 +410,7 @@ class VFS(object):
will_move: bool = False,
will_del: bool = False,
will_get: bool = False,
err=403,
err: int = 403,
) -> tuple["VFS", str]:
"""returns [vfsnode,fs_remainder] if user has the requested permissions"""
if ANYWIN:
@ -762,6 +760,7 @@ class AuthSrv(object):
t = "WARNING (config-file): permission flag 'a' is deprecated; please use 'rw' instead"
self.log(t, 1)
assert vol_dst
self._read_vol_str(lvl, uname, daxs[vol_dst], mflags[vol_dst])
def _read_vol_str(
@ -911,6 +910,7 @@ class AuthSrv(object):
zv.flags = mflags[dst]
zv.dbv = None
assert vfs
vfs.all_vols = {}
vfs.get_all_vols(vfs.all_vols)

View file

@ -4,14 +4,13 @@ from __future__ import print_function, unicode_literals
import os
from ..util import SYMTIME, fsdec, fsenc
from . import path
from . import path as path
try:
if True: # pylint: disable=using-constant-test
from typing import Any, Optional
except:
pass
_ = (path,)
__all__ = ["path"]
# grep -hRiE '(^|[^a-zA-Z_\.-])os\.' . | gsed -r 's/ /\n/g;s/\(/(\n/g' | grep -hRiE '(^|[^a-zA-Z_\.-])os\.' | sort | uniq -c
# printf 'os\.(%s)' "$(grep ^def bos/__init__.py | gsed -r 's/^def //;s/\(.*//' | tr '\n' '|' | gsed -r 's/.$//')"

View file

@ -14,10 +14,8 @@ from .util import Daemon, mp
if TYPE_CHECKING:
from .svchub import SvcHub
try:
if True: # pylint: disable=using-constant-test
from typing import Any
except:
pass
class MProcess(mp.Process):

View file

@ -15,12 +15,10 @@ from .broker_util import BrokerCli, ExceptionalQueue
from .httpsrv import HttpSrv
from .util import FAKE_MP, Daemon, HMaccas
try:
if True: # pylint: disable=using-constant-test
from types import FrameType
from typing import Any, Optional, Union
except:
pass
class MpWorker(BrokerCli):

View file

@ -12,10 +12,8 @@ from .util import HMaccas
if TYPE_CHECKING:
from .svchub import SvcHub
try:
if True: # pylint: disable=using-constant-test
from typing import Any
except:
pass
class BrokerThr(BrokerCli):

View file

@ -10,12 +10,10 @@ from .__init__ import TYPE_CHECKING
from .authsrv import AuthSrv
from .util import HMaccas, Pebkac
try:
if True: # pylint: disable=using-constant-test
from typing import Any, Optional, Union
from .util import RootLogger
except:
pass
if TYPE_CHECKING:
from .httpsrv import HttpSrv
@ -41,12 +39,14 @@ class BrokerCli(object):
for example resolving httpconn.* in httpcli -- see lines tagged #mypy404
"""
log: "RootLogger"
args: argparse.Namespace
asrv: AuthSrv
httpsrv: "HttpSrv"
iphash: HMaccas
def __init__(self) -> None:
self.log: "RootLogger" = None
self.args: argparse.Namespace = None
self.asrv: AuthSrv = None
self.httpsrv: "HttpSrv" = None
self.iphash: HMaccas = None
pass
def ask(self, dest: str, *args: Any) -> ExceptionalQueue:
return ExceptionalQueue(1)

View file

@ -4,10 +4,8 @@ import xml.etree.ElementTree as ET
from .__init__ import PY2
try:
if True: # pylint: disable=using-constant-test
from typing import Any, Optional
except:
pass
def get_ET() -> ET.XMLParser:

View file

@ -10,12 +10,10 @@ from .authsrv import AXS, VFS
from .bos import bos
from .util import chkcmd, min_ex
try:
if True: # pylint: disable=using-constant-test
from typing import Optional, Union
from .util import RootLogger
except:
pass
class Fstab(object):

View file

@ -6,7 +6,6 @@ import logging
import os
import stat
import sys
import threading
import time
from pyftpdlib.authorizers import AuthenticationFailed, DummyAuthorizer
@ -31,11 +30,9 @@ except ImportError:
if TYPE_CHECKING:
from .svchub import SvcHub
try:
if True: # pylint: disable=using-constant-test
import typing
from typing import Any, Optional
except:
pass
class FtpAuth(DummyAuthorizer):
@ -288,8 +285,8 @@ class FtpFs(AbstractedFS):
class FtpHandler(FTPHandler):
abstracted_fs = FtpFs
hub: "SvcHub" = None
args: argparse.Namespace = None
hub: "SvcHub"
args: argparse.Namespace
def __init__(self, conn: Any, server: Any, ioloop: Any = None) -> None:
self.hub: "SvcHub" = FtpHandler.hub

View file

@ -83,11 +83,9 @@ from .util import (
yieldfile,
)
try:
if True: # pylint: disable=using-constant-test
import typing
from typing import Any, Generator, Match, Optional, Pattern, Type, Union
except:
pass
if TYPE_CHECKING:
from .httpconn import HttpConn
@ -139,11 +137,13 @@ class HttpCli(object):
self.cookies: dict[str, str] = {}
self.vpath = " "
self.uname = " "
self.pw = " "
self.rvol = [" "]
self.wvol = [" "]
self.mvol = [" "]
self.dvol = [" "]
self.gvol = [" "]
self.upvol = [" "]
self.do_log = True
self.can_read = False
self.can_write = False
@ -423,7 +423,7 @@ class HttpCli(object):
if not hasattr(ex, "code"):
pex = Pebkac(500)
else:
pex = ex # type: ignore
pex: Pebkac = ex # type: ignore
try:
post = self.mode in ["POST", "PUT"] or "content-length" in self.headers
@ -848,7 +848,7 @@ class HttpCli(object):
from xml.etree import ElementTree as ET
from .dxml import mkenod, mktnod, parse_xml
vn, rem = self.asrv.vfs.get(self.vpath, self.uname, False, False)
self.asrv.vfs.get(self.vpath, self.uname, False, False)
# abspath = vn.dcanonical(rem)
buf = b""
@ -870,8 +870,8 @@ class HttpCli(object):
xroot.insert(0, parse_xml(txt))
xprop = xroot.find(r"./{DAV:}propertyupdate/{DAV:}set/{DAV:}prop")
assert xprop
for el in xprop:
el.clear()
for ze in xprop:
ze.clear()
txt = """<multistatus xmlns="DAV:"><response><propstat><status>HTTP/1.1 403 Forbidden</status></propstat></response></multistatus>"""
xroot = parse_xml(txt)
@ -942,7 +942,7 @@ class HttpCli(object):
ret += ET.tostring(xroot).decode("utf-8")
if not bos.path.isfile(abspath):
with open(fsenc(abspath), "w") as _:
with open(fsenc(abspath), "wb") as _:
pass
self.reply(ret.encode(enc, "replace"), 200, "text/xml; charset=" + enc)
@ -1127,7 +1127,7 @@ class HttpCli(object):
raise Pebkac(405, "don't know how to handle POST({})".format(ctype))
def get_xml_enc(self, txt) -> str:
def get_xml_enc(self, txt: str) -> str:
ofs = txt[:512].find(' encoding="')
enc = ""
if ofs + 1:
@ -1153,11 +1153,11 @@ class HttpCli(object):
else:
return read_socket(self.sr, remains), remains
def dump_to_file(self, is_put) -> tuple[int, str, str, int, str, str]:
def dump_to_file(self, is_put: bool) -> tuple[int, str, str, int, str, str]:
# post_sz, sha_hex, sha_b64, remains, path, url
reader, remains = self.get_body_reader()
vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True)
rnd, want_url, lifetime = self.upload_flags(vfs)
rnd, _, lifetime = self.upload_flags(vfs)
lim = vfs.get_dbv(rem)[0].lim
fdir = vfs.canonical(rem)
if lim:
@ -1324,7 +1324,7 @@ class HttpCli(object):
return post_sz, sha_hex, sha_b64, remains, path, url
def handle_stash(self, is_put) -> bool:
def handle_stash(self, is_put: bool) -> bool:
post_sz, sha_hex, sha_b64, remains, path, url = self.dump_to_file(is_put)
spd = self._spd(post_sz)
t = "{} wrote {}/{} bytes to {} # {}"
@ -1432,14 +1432,13 @@ class HttpCli(object):
def handle_zip_post(self) -> bool:
assert self.parser
for k in ["zip", "tar"]:
v = self.uparam.get(k)
if v is not None:
break
if v is None:
try:
k = next(x for x in self.uparam if x in ("zip", "tar"))
except:
raise Pebkac(422, "need zip or tar keyword")
v = self.uparam[k]
vn, rem = self.asrv.vfs.get(self.vpath, self.uname, True, False)
zs = self.parser.require("files", 1024 * 1024)
if not zs:
@ -1504,8 +1503,8 @@ class HttpCli(object):
body["vcfg"] = dbv.flags
if rem:
dst = vfs.canonical(rem)
try:
dst = vfs.canonical(rem)
if not bos.path.isdir(dst):
bos.makedirs(dst)
except OSError as ex:
@ -1749,7 +1748,7 @@ class HttpCli(object):
sanitized = sanitize_fn(new_dir, "", [])
return self._mkdir(vjoin(self.vpath, sanitized))
def _mkdir(self, vpath) -> bool:
def _mkdir(self, vpath: str) -> bool:
nullwrite = self.args.nw
vfs, rem = self.asrv.vfs.get(vpath, self.uname, False, True)
self._assert_safe_rem(rem)
@ -2466,7 +2465,7 @@ class HttpCli(object):
for c, v in [(b"&", 4), (b"<", 3), (b">", 3)]:
sz_md += (len(buf) - len(buf.replace(c, b""))) * v
file_ts = max(ts_md, ts_html, self.E.t0)
file_ts = int(max(ts_md, ts_html, self.E.t0))
file_lastmod, do_send = self._chk_lastmod(file_ts)
self.out_headers["Last-Modified"] = file_lastmod
self.out_headers.update(NO_CACHE)

View file

@ -25,10 +25,8 @@ from .th_srv import HAVE_PIL, HAVE_VIPS
from .u2idx import U2idx
from .util import HMaccas, shut_socket
try:
if True: # pylint: disable=using-constant-test
from typing import Optional, Pattern, Union
except:
pass
if TYPE_CHECKING:
from .httpsrv import HttpSrv

View file

@ -47,10 +47,8 @@ from .util import (
if TYPE_CHECKING:
from .broker_util import BrokerCli
try:
if True: # pylint: disable=using-constant-test
from typing import Any, Optional
except:
pass
class HttpSrv(object):

View file

@ -12,25 +12,23 @@ from .__init__ import PY2, WINDOWS, E, unicode
from .bos import bos
from .util import REKOBO_LKEY, fsenc, min_ex, retchk, runcmd, uncyg
try:
if True: # pylint: disable=using-constant-test
from typing import Any, Union
from .util import RootLogger
except:
pass
def have_ff(cmd: str) -> bool:
def have_ff(scmd: str) -> bool:
if PY2:
print("# checking {}".format(cmd))
cmd = (cmd + " -version").encode("ascii").split(b" ")
print("# checking {}".format(scmd))
acmd = (scmd + " -version").encode("ascii").split(b" ")
try:
sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.PIPE).communicate()
sp.Popen(acmd, stdout=sp.PIPE, stderr=sp.PIPE).communicate()
return True
except:
return False
else:
return bool(shutil.which(cmd))
return bool(shutil.which(scmd))
HAVE_FFMPEG = have_ff("ffmpeg")
@ -269,7 +267,7 @@ class MTag(object):
if self.backend == "mutagen":
self.get = self.get_mutagen
try:
import mutagen # noqa: F401 # pylint: disable=unused-import,import-outside-toplevel
from mutagen import version # noqa: F401
except:
self.log("could not load Mutagen, trying FFprobe instead", c=3)
self.backend = "ffprobe"
@ -381,18 +379,20 @@ class MTag(object):
parser_output[alias] = (priority, tv[0])
# take first value (lowest priority / most preferred)
ret = {sk: unicode(tv[1]).strip() for sk, tv in parser_output.items()}
ret: dict[str, Union[str, float]] = {
sk: unicode(tv[1]).strip() for sk, tv in parser_output.items()
}
# track 3/7 => track 3
for sk, tv in ret.items():
for sk, zv in ret.items():
if sk[0] == ".":
sv = str(tv).split("/")[0].strip().lstrip("0")
sv = str(zv).split("/")[0].strip().lstrip("0")
ret[sk] = sv or 0
# normalize key notation to rkeobo
okey = ret.get("key")
if okey:
key = okey.replace(" ", "").replace("maj", "").replace("min", "m")
key = str(okey).replace(" ", "").replace("maj", "").replace("min", "m")
ret["key"] = REKOBO_LKEY.get(key.lower(), okey)
return ret
@ -441,10 +441,11 @@ class MTag(object):
if not bos.path.isfile(abspath):
return {}
import mutagen
from mutagen import File
try:
md = mutagen.File(fsenc(abspath), easy=True)
md = File(fsenc(abspath), easy=True)
assert md
if not md.info.length and not md.info.codec:
raise Exception()
except:

View file

@ -14,10 +14,8 @@ from .authsrv import LEELOO_DALLAS, VFS
from .util import Daemon, min_ex
from .bos import bos
try:
if True: # pylint: disable=using-constant-test
from typing import Any
except:
pass
if TYPE_CHECKING:
from .svchub import SvcHub
@ -144,7 +142,7 @@ class SMB(object):
return ls
def _open(
self, vpath: str, flags: int, chmod: int = 0o777, *a: Any, **ka: Any
self, vpath: str, flags: int, *a: Any, chmod: int = 0o777, **ka: Any
) -> Any:
f_ro = os.O_RDONLY
if ANYWIN:
@ -158,7 +156,7 @@ class SMB(object):
if wr and not vfs.axs.uwrite:
yeet("blocked write (no-write-acc): " + vpath)
ret = bos.open(ap, flags, chmod, *a, **ka)
ret = bos.open(ap, flags, *a, mode=chmod, **ka)
if wr:
now = time.time()
nf = len(self.files)

View file

@ -2,7 +2,6 @@
from __future__ import print_function, unicode_literals
import tarfile
import threading
from queue import Queue
@ -10,12 +9,10 @@ from .bos import bos
from .sutil import StreamArc, errdesc
from .util import Daemon, fsenc, min_ex
try:
if True: # pylint: disable=using-constant-test
from typing import Any, Generator, Optional
from .util import NamedLogger
except:
pass
class QFile(object): # inherit io.StringIO for painful typing

View file

@ -11,12 +11,10 @@ from __future__ import print_function, unicode_literals
import collections
import itertools
try:
if True: # pylint: disable=using-constant-test
from collections.abc import Sequence
from typing import Callable, List, Optional, Tuple, Union
except:
pass
def num_char_count_bits(ver: int) -> int:

View file

@ -20,10 +20,8 @@ PY3 = sys.version_info > (3,)
WINDOWS = platform.system() == "Windows"
FS_ERRORS = "surrogateescape"
try:
if True: # pylint: disable=using-constant-test
from typing import Any
except:
pass
if PY3:

View file

@ -6,12 +6,10 @@ from datetime import datetime
from .bos import bos
try:
if True: # pylint: disable=using-constant-test
from typing import Any, Generator, Optional
from .util import NamedLogger
except:
pass
class StreamArc(object):
@ -25,7 +23,7 @@ class StreamArc(object):
self.fgen = fgen
def gen(self) -> Generator[Optional[bytes], None, None]:
pass
raise Exception("override me")
def errdesc(errors: list[tuple[str, str]]) -> tuple[dict[str, Any], list[str]]:

View file

@ -19,13 +19,11 @@ import threading
import time
from datetime import datetime, timedelta
try:
if True: # pylint: disable=using-constant-test
from types import FrameType
import typing
from typing import Any, Optional, Union
except:
pass
from .__init__ import ANYWIN, MACOS, VT100, EnvParams, unicode
from .authsrv import AuthSrv
@ -355,7 +353,7 @@ class SvcHub(object):
self.shutdown()
def kill9(self, delay: float = 0.0):
def kill9(self, delay: float = 0.0) -> None:
if delay > 0.01:
time.sleep(delay)
print("component stuck; performing sigkill")

View file

@ -9,12 +9,10 @@ from .bos import bos
from .sutil import StreamArc, errdesc
from .util import min_ex, sanitize_fn, spack, sunpack, yieldfile
try:
if True: # pylint: disable=using-constant-test
from typing import Any, Generator, Optional
from .util import NamedLogger
except:
pass
def dostime2unix(buf: bytes) -> int:

View file

@ -9,10 +9,8 @@ from .bos import bos
from .th_srv import HAVE_WEBP, thumb_path
from .util import Cooldown
try:
if True: # pylint: disable=using-constant-test
from typing import Optional, Union
except:
pass
if TYPE_CHECKING:
from .httpsrv import HttpSrv

View file

@ -26,10 +26,8 @@ from .util import (
vsplit,
)
try:
if True: # pylint: disable=using-constant-test
from typing import Optional, Union
except:
pass
if TYPE_CHECKING:
from .svchub import SvcHub

View file

@ -30,10 +30,8 @@ try:
except:
pass
try:
if True: # pylint: disable=using-constant-test
from typing import Any, Optional, Union
except:
pass
if TYPE_CHECKING:
from .httpconn import HttpConn
@ -199,7 +197,7 @@ class U2idx(object):
v = "exists(select 1 from mt where mt.w = mtw and " + vq
else:
raise Pebkac(400, "invalid key [" + v + "]")
raise Pebkac(400, "invalid key [{}]".format(v))
q += v + " "
continue

View file

@ -60,10 +60,8 @@ if HAVE_SQLITE3:
DB_VER = 5
try:
if True: # pylint: disable=using-constant-test
from typing import Any, Optional, Pattern, Union
except:
pass
if TYPE_CHECKING:
from .svchub import SvcHub
@ -940,6 +938,7 @@ class Up2k(object):
if n:
t = "forgetting {} shadowed autoindexed files in [{}] > [{}]"
self.log(t.format(n, top, sh_rd))
assert sh_erd
q = "delete from dh where (d = ? or d like ?||'%')"
db.c.execute(q, (sh_erd, sh_erd + "/"))

View file

@ -76,7 +76,7 @@ try:
except:
HAVE_PSUTIL = False
try:
if True: # pylint: disable=using-constant-test
import types
from collections.abc import Callable, Iterable
@ -91,21 +91,16 @@ try:
def __call__(self, msg: str, c: Union[int, str] = 0) -> None:
return None
except:
pass
if TYPE_CHECKING:
import magic
from .authsrv import VFS
FAKE_MP = False
try:
if not FAKE_MP:
import multiprocessing as mp
else:
import multiprocessing.dummy as mp # type: ignore
import multiprocessing as mp
# import multiprocessing.dummy as mp
except ImportError:
# support jython
mp = None # type: ignore
@ -351,8 +346,8 @@ class Daemon(threading.Thread):
self,
target: Any,
name: Optional[str] = None,
a: Iterable[Any] = None,
r=True,
a: Optional[Iterable[Any]] = None,
r: bool = True,
) -> None:
threading.Thread.__init__(self, target=target, name=name, args=a or ())
self.daemon = True
@ -1049,6 +1044,7 @@ def ren_open(
with fun(fsenc(fpath), *args, **kwargs) as f:
if b64:
assert fdir
fp2 = "fn-trunc.{}.txt".format(b64)
fp2 = os.path.join(fdir, fp2)
with open(fsenc(fp2), "wb") as f2:
@ -1075,7 +1071,7 @@ def ren_open(
raise
if not b64:
zs = (orig_name + "\n" + suffix).encode("utf-8", "replace")
zs = "{}\n{}".format(orig_name, suffix).encode("utf-8", "replace")
zs = hashlib.sha512(zs).digest()[:12]
b64 = base64.urlsafe_b64encode(zs).decode("utf-8")
@ -1225,6 +1221,7 @@ class MultipartParser(object):
buf = buf[d:]
# look for boundary near the end of the buffer
n = 0
for n in range(1, len(buf) + 1):
if not buf[-n:] in self.boundary:
n -= 1
@ -1383,8 +1380,8 @@ def gen_filekey_dbg(
except:
ctx = ""
p2 = "a"
try:
p2 = "a"
p2 = absreal(fspath)
if p2 != fspath:
raise Exception()
@ -1581,17 +1578,21 @@ def html_bescape(s: bytes, quot: bool = False, crlf: bool = False) -> bytes:
return s
def quotep(txt: str) -> str:
def _quotep2(txt: str) -> str:
"""url quoter which deals with bytes correctly"""
btxt = w8enc(txt)
quot1 = quote(btxt, safe=b"/")
if not PY2:
quot2 = quot1.encode("ascii")
else:
quot2 = quot1
quot = quote(btxt, safe=b"/")
return w8dec(quot.replace(b" ", b"+"))
quot3 = quot2.replace(b" ", b"+")
return w8dec(quot3)
def _quotep3(txt: str) -> str:
"""url quoter which deals with bytes correctly"""
btxt = w8enc(txt)
quot = quote(btxt, safe=b"/").encode("utf-8")
return w8dec(quot.replace(b" ", b"+"))
quotep = _quotep3 if not PY2 else _quotep2
def unquotep(txt: str) -> str:
@ -1616,22 +1617,30 @@ def vjoin(rd: str, fn: str) -> str:
return rd or fn
def w8dec(txt: bytes) -> str:
def _w8dec2(txt: bytes) -> str:
"""decodes filesystem-bytes to wtf8"""
if PY2:
return surrogateescape.decodefilename(txt)
return surrogateescape.decodefilename(txt)
def _w8enc2(txt: str) -> bytes:
"""encodes wtf8 to filesystem-bytes"""
return surrogateescape.encodefilename(txt)
def _w8dec3(txt: bytes) -> str:
"""decodes filesystem-bytes to wtf8"""
return txt.decode(FS_ENCODING, "surrogateescape")
def w8enc(txt: str) -> bytes:
def _w8enc3(txt: str) -> bytes:
"""encodes wtf8 to filesystem-bytes"""
if PY2:
return surrogateescape.encodefilename(txt)
return txt.encode(FS_ENCODING, "surrogateescape")
w8dec = _w8dec3 if not PY2 else _w8dec2
w8enc = _w8enc3 if not PY2 else _w8enc2
def w8b64dec(txt: str) -> str:
"""decodes base64(filesystem-bytes) to wtf8"""
return w8dec(base64.urlsafe_b64decode(txt.encode("ascii")))
@ -1642,17 +1651,17 @@ def w8b64enc(txt: str) -> str:
return base64.urlsafe_b64encode(w8enc(txt)).decode("ascii")
if PY2 and WINDOWS:
if not PY2 or not WINDOWS:
fsenc = w8enc
fsdec = w8dec
else:
# moonrunes become \x3f with bytestrings,
# losing mojibake support is worth
def _not_actually_mbcs(txt):
def _not_actually_mbcs(txt: str) -> str:
return txt
fsenc = _not_actually_mbcs
fsdec = _not_actually_mbcs
else:
fsenc = w8enc
fsdec = w8dec
def s3enc(mem_cur: "sqlite3.Cursor", rd: str, fn: str) -> tuple[str, str]:
@ -1952,6 +1961,7 @@ def statdir(
if lstat and (PY2 or os.stat not in os.supports_follow_symlinks):
scandir = False
src = "statdir"
try:
btop = fsenc(top)
if scandir and hasattr(os, "scandir"):
@ -2129,7 +2139,7 @@ def killtree(root: int) -> None:
os.kill(pid, signal.SIGTERM)
else:
# windows gets minimal effort sorry
os.kill(pid, signal.SIGTERM)
os.kill(root, signal.SIGTERM)
return
for n in range(10):
@ -2151,19 +2161,21 @@ def runcmd(
kill = ka.pop("kill", "t") # [t]ree [m]ain [n]one
capture = ka.pop("capture", 3) # 0=none 1=stdout 2=stderr 3=both
sin = ka.pop("sin", None)
sin: Optional[bytes] = ka.pop("sin", None)
if sin:
ka["stdin"] = sp.PIPE
cout = sp.PIPE if capture in [1, 3] else None
cerr = sp.PIPE if capture in [2, 3] else None
bout: bytes
berr: bytes
p = sp.Popen(argv, stdout=cout, stderr=cerr, **ka)
if not timeout or PY2:
stdout, stderr = p.communicate(sin)
bout, berr = p.communicate(sin)
else:
try:
stdout, stderr = p.communicate(sin, timeout=timeout)
bout, berr = p.communicate(sin, timeout=timeout)
except sp.TimeoutExpired:
if kill == "n":
return -18, "", "" # SIGCONT; leave it be
@ -2173,15 +2185,15 @@ def runcmd(
killtree(p.pid)
try:
stdout, stderr = p.communicate(timeout=1)
bout, berr = p.communicate(timeout=1)
except:
stdout = b""
stderr = b""
bout = b""
berr = b""
stdout = stdout.decode("utf-8", "replace") if cout else b""
stderr = stderr.decode("utf-8", "replace") if cerr else b""
stdout = bout.decode("utf-8", "replace") if cout else ""
stderr = berr.decode("utf-8", "replace") if cerr else ""
rc = p.returncode
rc: int = p.returncode
if rc is None:
rc = -14 # SIGALRM; failed to kill

View file

@ -1,7 +1,6 @@
# coding: utf-8
from __future__ import print_function, unicode_literals
import re
import os
import sys
from strip_hints import strip_file_to_string
@ -51,16 +50,18 @@ def uh1(fp):
pr(".")
cs = strip_file_to_string(fp, no_ast=True, to_empty=True)
libs = "typing|types|collections\.abc"
ptn = re.compile(r"^(\s*)(from (?:{0}) import |import (?:{0})\b).*".format(libs))
# remove expensive imports too
lns = []
on = True
for ln in cs.split("\n"):
m = ptn.match(ln)
if m and "SimpleNamespace" not in ln:
ln = m.group(1) + "raise Exception()"
if ln.startswith("if True:"):
on = False
continue
if not on and (not ln.strip() or ln.startswith(" ")):
continue
on = True
lns.append(ln)
cs = "\n".join(lns)