diff --git a/copyparty/__init__.py b/copyparty/__init__.py index 8b78a3d2..04cffaf5 100644 --- a/copyparty/__init__.py +++ b/copyparty/__init__.py @@ -7,10 +7,13 @@ import sys import time try: - from typing import TYPE_CHECKING, Any + from typing import TYPE_CHECKING except: TYPE_CHECKING = False +if True: + from typing import Any + PY2 = sys.version_info < (3,) if PY2: sys.dont_write_bytecode = True @@ -40,8 +43,8 @@ except: class EnvParams(object): def __init__(self) -> None: self.t0 = time.time() - self.mod = None - self.cfg = None + self.mod = "" + self.cfg = "" self.ox = getattr(sys, "oxidized", None) diff --git a/copyparty/__main__.py b/copyparty/__main__.py index d09fe91a..3aec892e 100755 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -38,13 +38,11 @@ from .util import ( wrap, ) -try: +if True: # pylint: disable=using-constant-test from collections.abc import Callable from types import FrameType from typing import Any, Optional -except: - pass try: HAVE_SSL = True @@ -79,9 +77,9 @@ class RiceFormatter(argparse.HelpFormatter): ret += fmt if not VT100: - ret = re.sub("\033\[[0-9;]+m", "", ret) + ret = re.sub("\033\\[[0-9;]+m", "", ret) - return ret + return ret # type: ignore def _fill_text(self, text: str, width: int, indent: str) -> str: """same as RawDescriptionHelpFormatter(HelpFormatter)""" @@ -104,7 +102,7 @@ class RiceFormatter(argparse.HelpFormatter): self.__add_whitespace(i, lWSpace, x) for i, x in enumerate(wrap(line, width, width - 1)) ] - textRows[idx] = lines + textRows[idx] = lines # type: ignore return [item for sublist in textRows for item in sublist] @@ -141,7 +139,7 @@ def init_E(E: EnvParams) -> None: # __init__ runs 18 times when oxidized; do expensive stuff here def get_unixdir() -> str: - paths: list[tuple[Callable[..., str], str]] = [ + paths: list[tuple[Callable[..., Any], str]] = [ (os.environ.get, "XDG_CONFIG_HOME"), (os.path.expanduser, "~/.config"), (os.environ.get, "TMPDIR"), @@ -163,7 +161,7 @@ def init_E(E: EnvParams) -> None: if not os.path.isdir(p): os.mkdir(p) - return p + return p # type: ignore except: pass @@ -867,7 +865,7 @@ def main(argv: Optional[list[str]] = None) -> None: retry = True lprint("\n[ {} ]:\n{}\n".format(fmtr, min_ex())) - assert al + assert al # type: ignore al.E = E # __init__ is not shared when oxidized if WINDOWS and not al.keep_qem: diff --git a/copyparty/authsrv.py b/copyparty/authsrv.py index 3c432966..8596ba1d 100644 --- a/copyparty/authsrv.py +++ b/copyparty/authsrv.py @@ -31,15 +31,13 @@ from .util import ( unhumanize, ) -try: +if True: # pylint: disable=using-constant-test from collections.abc import Iterable import typing from typing import Any, Generator, Optional, Union from .util import RootLogger -except: - pass if TYPE_CHECKING: pass @@ -412,7 +410,7 @@ class VFS(object): will_move: bool = False, will_del: bool = False, will_get: bool = False, - err=403, + err: int = 403, ) -> tuple["VFS", str]: """returns [vfsnode,fs_remainder] if user has the requested permissions""" if ANYWIN: @@ -762,6 +760,7 @@ class AuthSrv(object): t = "WARNING (config-file): permission flag 'a' is deprecated; please use 'rw' instead" self.log(t, 1) + assert vol_dst self._read_vol_str(lvl, uname, daxs[vol_dst], mflags[vol_dst]) def _read_vol_str( @@ -911,6 +910,7 @@ class AuthSrv(object): zv.flags = mflags[dst] zv.dbv = None + assert vfs vfs.all_vols = {} vfs.get_all_vols(vfs.all_vols) diff --git a/copyparty/bos/bos.py b/copyparty/bos/bos.py index 25087b5d..125dc111 100644 --- a/copyparty/bos/bos.py +++ b/copyparty/bos/bos.py @@ -4,14 +4,13 @@ from __future__ import print_function, unicode_literals import os from ..util import SYMTIME, fsdec, fsenc -from . import path +from . import path as path -try: +if True: # pylint: disable=using-constant-test from typing import Any, Optional -except: - pass _ = (path,) +__all__ = ["path"] # grep -hRiE '(^|[^a-zA-Z_\.-])os\.' . | gsed -r 's/ /\n/g;s/\(/(\n/g' | grep -hRiE '(^|[^a-zA-Z_\.-])os\.' | sort | uniq -c # printf 'os\.(%s)' "$(grep ^def bos/__init__.py | gsed -r 's/^def //;s/\(.*//' | tr '\n' '|' | gsed -r 's/.$//')" diff --git a/copyparty/broker_mp.py b/copyparty/broker_mp.py index 6afe634d..b0bf9883 100644 --- a/copyparty/broker_mp.py +++ b/copyparty/broker_mp.py @@ -14,10 +14,8 @@ from .util import Daemon, mp if TYPE_CHECKING: from .svchub import SvcHub -try: +if True: # pylint: disable=using-constant-test from typing import Any -except: - pass class MProcess(mp.Process): diff --git a/copyparty/broker_mpw.py b/copyparty/broker_mpw.py index d119b914..41c6ca7c 100644 --- a/copyparty/broker_mpw.py +++ b/copyparty/broker_mpw.py @@ -15,12 +15,10 @@ from .broker_util import BrokerCli, ExceptionalQueue from .httpsrv import HttpSrv from .util import FAKE_MP, Daemon, HMaccas -try: +if True: # pylint: disable=using-constant-test from types import FrameType from typing import Any, Optional, Union -except: - pass class MpWorker(BrokerCli): diff --git a/copyparty/broker_thr.py b/copyparty/broker_thr.py index b5ce5b06..515671c6 100644 --- a/copyparty/broker_thr.py +++ b/copyparty/broker_thr.py @@ -12,10 +12,8 @@ from .util import HMaccas if TYPE_CHECKING: from .svchub import SvcHub -try: +if True: # pylint: disable=using-constant-test from typing import Any -except: - pass class BrokerThr(BrokerCli): diff --git a/copyparty/broker_util.py b/copyparty/broker_util.py index 2705e7d2..105ac535 100644 --- a/copyparty/broker_util.py +++ b/copyparty/broker_util.py @@ -10,12 +10,10 @@ from .__init__ import TYPE_CHECKING from .authsrv import AuthSrv from .util import HMaccas, Pebkac -try: +if True: # pylint: disable=using-constant-test from typing import Any, Optional, Union from .util import RootLogger -except: - pass if TYPE_CHECKING: from .httpsrv import HttpSrv @@ -41,12 +39,14 @@ class BrokerCli(object): for example resolving httpconn.* in httpcli -- see lines tagged #mypy404 """ + log: "RootLogger" + args: argparse.Namespace + asrv: AuthSrv + httpsrv: "HttpSrv" + iphash: HMaccas + def __init__(self) -> None: - self.log: "RootLogger" = None - self.args: argparse.Namespace = None - self.asrv: AuthSrv = None - self.httpsrv: "HttpSrv" = None - self.iphash: HMaccas = None + pass def ask(self, dest: str, *args: Any) -> ExceptionalQueue: return ExceptionalQueue(1) diff --git a/copyparty/dxml.py b/copyparty/dxml.py index 31f0dd4d..c44b3afc 100644 --- a/copyparty/dxml.py +++ b/copyparty/dxml.py @@ -4,10 +4,8 @@ import xml.etree.ElementTree as ET from .__init__ import PY2 -try: +if True: # pylint: disable=using-constant-test from typing import Any, Optional -except: - pass def get_ET() -> ET.XMLParser: diff --git a/copyparty/fsutil.py b/copyparty/fsutil.py index cfba6161..5bf5ab6b 100644 --- a/copyparty/fsutil.py +++ b/copyparty/fsutil.py @@ -10,12 +10,10 @@ from .authsrv import AXS, VFS from .bos import bos from .util import chkcmd, min_ex -try: +if True: # pylint: disable=using-constant-test from typing import Optional, Union from .util import RootLogger -except: - pass class Fstab(object): diff --git a/copyparty/ftpd.py b/copyparty/ftpd.py index cbbba965..a8cc22c1 100644 --- a/copyparty/ftpd.py +++ b/copyparty/ftpd.py @@ -6,7 +6,6 @@ import logging import os import stat import sys -import threading import time from pyftpdlib.authorizers import AuthenticationFailed, DummyAuthorizer @@ -31,11 +30,9 @@ except ImportError: if TYPE_CHECKING: from .svchub import SvcHub -try: +if True: # pylint: disable=using-constant-test import typing from typing import Any, Optional -except: - pass class FtpAuth(DummyAuthorizer): @@ -288,8 +285,8 @@ class FtpFs(AbstractedFS): class FtpHandler(FTPHandler): abstracted_fs = FtpFs - hub: "SvcHub" = None - args: argparse.Namespace = None + hub: "SvcHub" + args: argparse.Namespace def __init__(self, conn: Any, server: Any, ioloop: Any = None) -> None: self.hub: "SvcHub" = FtpHandler.hub diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 142eee3d..2b430f50 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -83,11 +83,9 @@ from .util import ( yieldfile, ) -try: +if True: # pylint: disable=using-constant-test import typing from typing import Any, Generator, Match, Optional, Pattern, Type, Union -except: - pass if TYPE_CHECKING: from .httpconn import HttpConn @@ -139,11 +137,13 @@ class HttpCli(object): self.cookies: dict[str, str] = {} self.vpath = " " self.uname = " " + self.pw = " " self.rvol = [" "] self.wvol = [" "] self.mvol = [" "] self.dvol = [" "] self.gvol = [" "] + self.upvol = [" "] self.do_log = True self.can_read = False self.can_write = False @@ -423,7 +423,7 @@ class HttpCli(object): if not hasattr(ex, "code"): pex = Pebkac(500) else: - pex = ex # type: ignore + pex: Pebkac = ex # type: ignore try: post = self.mode in ["POST", "PUT"] or "content-length" in self.headers @@ -848,7 +848,7 @@ class HttpCli(object): from xml.etree import ElementTree as ET from .dxml import mkenod, mktnod, parse_xml - vn, rem = self.asrv.vfs.get(self.vpath, self.uname, False, False) + self.asrv.vfs.get(self.vpath, self.uname, False, False) # abspath = vn.dcanonical(rem) buf = b"" @@ -870,8 +870,8 @@ class HttpCli(object): xroot.insert(0, parse_xml(txt)) xprop = xroot.find(r"./{DAV:}propertyupdate/{DAV:}set/{DAV:}prop") assert xprop - for el in xprop: - el.clear() + for ze in xprop: + ze.clear() txt = """HTTP/1.1 403 Forbidden""" xroot = parse_xml(txt) @@ -942,7 +942,7 @@ class HttpCli(object): ret += ET.tostring(xroot).decode("utf-8") if not bos.path.isfile(abspath): - with open(fsenc(abspath), "w") as _: + with open(fsenc(abspath), "wb") as _: pass self.reply(ret.encode(enc, "replace"), 200, "text/xml; charset=" + enc) @@ -1127,7 +1127,7 @@ class HttpCli(object): raise Pebkac(405, "don't know how to handle POST({})".format(ctype)) - def get_xml_enc(self, txt) -> str: + def get_xml_enc(self, txt: str) -> str: ofs = txt[:512].find(' encoding="') enc = "" if ofs + 1: @@ -1153,11 +1153,11 @@ class HttpCli(object): else: return read_socket(self.sr, remains), remains - def dump_to_file(self, is_put) -> tuple[int, str, str, int, str, str]: + def dump_to_file(self, is_put: bool) -> tuple[int, str, str, int, str, str]: # post_sz, sha_hex, sha_b64, remains, path, url reader, remains = self.get_body_reader() vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True) - rnd, want_url, lifetime = self.upload_flags(vfs) + rnd, _, lifetime = self.upload_flags(vfs) lim = vfs.get_dbv(rem)[0].lim fdir = vfs.canonical(rem) if lim: @@ -1324,7 +1324,7 @@ class HttpCli(object): return post_sz, sha_hex, sha_b64, remains, path, url - def handle_stash(self, is_put) -> bool: + def handle_stash(self, is_put: bool) -> bool: post_sz, sha_hex, sha_b64, remains, path, url = self.dump_to_file(is_put) spd = self._spd(post_sz) t = "{} wrote {}/{} bytes to {} # {}" @@ -1432,14 +1432,13 @@ class HttpCli(object): def handle_zip_post(self) -> bool: assert self.parser - for k in ["zip", "tar"]: - v = self.uparam.get(k) - if v is not None: - break - - if v is None: + try: + k = next(x for x in self.uparam if x in ("zip", "tar")) + except: raise Pebkac(422, "need zip or tar keyword") + v = self.uparam[k] + vn, rem = self.asrv.vfs.get(self.vpath, self.uname, True, False) zs = self.parser.require("files", 1024 * 1024) if not zs: @@ -1504,8 +1503,8 @@ class HttpCli(object): body["vcfg"] = dbv.flags if rem: + dst = vfs.canonical(rem) try: - dst = vfs.canonical(rem) if not bos.path.isdir(dst): bos.makedirs(dst) except OSError as ex: @@ -1749,7 +1748,7 @@ class HttpCli(object): sanitized = sanitize_fn(new_dir, "", []) return self._mkdir(vjoin(self.vpath, sanitized)) - def _mkdir(self, vpath) -> bool: + def _mkdir(self, vpath: str) -> bool: nullwrite = self.args.nw vfs, rem = self.asrv.vfs.get(vpath, self.uname, False, True) self._assert_safe_rem(rem) @@ -2466,7 +2465,7 @@ class HttpCli(object): for c, v in [(b"&", 4), (b"<", 3), (b">", 3)]: sz_md += (len(buf) - len(buf.replace(c, b""))) * v - file_ts = max(ts_md, ts_html, self.E.t0) + file_ts = int(max(ts_md, ts_html, self.E.t0)) file_lastmod, do_send = self._chk_lastmod(file_ts) self.out_headers["Last-Modified"] = file_lastmod self.out_headers.update(NO_CACHE) diff --git a/copyparty/httpconn.py b/copyparty/httpconn.py index 9c9b0c22..b1d37b0f 100644 --- a/copyparty/httpconn.py +++ b/copyparty/httpconn.py @@ -25,10 +25,8 @@ from .th_srv import HAVE_PIL, HAVE_VIPS from .u2idx import U2idx from .util import HMaccas, shut_socket -try: +if True: # pylint: disable=using-constant-test from typing import Optional, Pattern, Union -except: - pass if TYPE_CHECKING: from .httpsrv import HttpSrv diff --git a/copyparty/httpsrv.py b/copyparty/httpsrv.py index 26b26f2b..17647fd4 100644 --- a/copyparty/httpsrv.py +++ b/copyparty/httpsrv.py @@ -47,10 +47,8 @@ from .util import ( if TYPE_CHECKING: from .broker_util import BrokerCli -try: +if True: # pylint: disable=using-constant-test from typing import Any, Optional -except: - pass class HttpSrv(object): diff --git a/copyparty/mtag.py b/copyparty/mtag.py index 5812ddec..3e72d1d3 100644 --- a/copyparty/mtag.py +++ b/copyparty/mtag.py @@ -12,25 +12,23 @@ from .__init__ import PY2, WINDOWS, E, unicode from .bos import bos from .util import REKOBO_LKEY, fsenc, min_ex, retchk, runcmd, uncyg -try: +if True: # pylint: disable=using-constant-test from typing import Any, Union from .util import RootLogger -except: - pass -def have_ff(cmd: str) -> bool: +def have_ff(scmd: str) -> bool: if PY2: - print("# checking {}".format(cmd)) - cmd = (cmd + " -version").encode("ascii").split(b" ") + print("# checking {}".format(scmd)) + acmd = (scmd + " -version").encode("ascii").split(b" ") try: - sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.PIPE).communicate() + sp.Popen(acmd, stdout=sp.PIPE, stderr=sp.PIPE).communicate() return True except: return False else: - return bool(shutil.which(cmd)) + return bool(shutil.which(scmd)) HAVE_FFMPEG = have_ff("ffmpeg") @@ -269,7 +267,7 @@ class MTag(object): if self.backend == "mutagen": self.get = self.get_mutagen try: - import mutagen # noqa: F401 # pylint: disable=unused-import,import-outside-toplevel + from mutagen import version # noqa: F401 except: self.log("could not load Mutagen, trying FFprobe instead", c=3) self.backend = "ffprobe" @@ -381,18 +379,20 @@ class MTag(object): parser_output[alias] = (priority, tv[0]) # take first value (lowest priority / most preferred) - ret = {sk: unicode(tv[1]).strip() for sk, tv in parser_output.items()} + ret: dict[str, Union[str, float]] = { + sk: unicode(tv[1]).strip() for sk, tv in parser_output.items() + } # track 3/7 => track 3 - for sk, tv in ret.items(): + for sk, zv in ret.items(): if sk[0] == ".": - sv = str(tv).split("/")[0].strip().lstrip("0") + sv = str(zv).split("/")[0].strip().lstrip("0") ret[sk] = sv or 0 # normalize key notation to rkeobo okey = ret.get("key") if okey: - key = okey.replace(" ", "").replace("maj", "").replace("min", "m") + key = str(okey).replace(" ", "").replace("maj", "").replace("min", "m") ret["key"] = REKOBO_LKEY.get(key.lower(), okey) return ret @@ -441,10 +441,11 @@ class MTag(object): if not bos.path.isfile(abspath): return {} - import mutagen + from mutagen import File try: - md = mutagen.File(fsenc(abspath), easy=True) + md = File(fsenc(abspath), easy=True) + assert md if not md.info.length and not md.info.codec: raise Exception() except: diff --git a/copyparty/smbd.py b/copyparty/smbd.py index 3a9dd970..4efe24d9 100644 --- a/copyparty/smbd.py +++ b/copyparty/smbd.py @@ -14,10 +14,8 @@ from .authsrv import LEELOO_DALLAS, VFS from .util import Daemon, min_ex from .bos import bos -try: +if True: # pylint: disable=using-constant-test from typing import Any -except: - pass if TYPE_CHECKING: from .svchub import SvcHub @@ -144,7 +142,7 @@ class SMB(object): return ls def _open( - self, vpath: str, flags: int, chmod: int = 0o777, *a: Any, **ka: Any + self, vpath: str, flags: int, *a: Any, chmod: int = 0o777, **ka: Any ) -> Any: f_ro = os.O_RDONLY if ANYWIN: @@ -158,7 +156,7 @@ class SMB(object): if wr and not vfs.axs.uwrite: yeet("blocked write (no-write-acc): " + vpath) - ret = bos.open(ap, flags, chmod, *a, **ka) + ret = bos.open(ap, flags, *a, mode=chmod, **ka) if wr: now = time.time() nf = len(self.files) diff --git a/copyparty/star.py b/copyparty/star.py index c7731b2b..1bb4618b 100644 --- a/copyparty/star.py +++ b/copyparty/star.py @@ -2,7 +2,6 @@ from __future__ import print_function, unicode_literals import tarfile -import threading from queue import Queue @@ -10,12 +9,10 @@ from .bos import bos from .sutil import StreamArc, errdesc from .util import Daemon, fsenc, min_ex -try: +if True: # pylint: disable=using-constant-test from typing import Any, Generator, Optional from .util import NamedLogger -except: - pass class QFile(object): # inherit io.StringIO for painful typing diff --git a/copyparty/stolen/qrcodegen.py b/copyparty/stolen/qrcodegen.py index 3edc3171..98dc804b 100644 --- a/copyparty/stolen/qrcodegen.py +++ b/copyparty/stolen/qrcodegen.py @@ -11,12 +11,10 @@ from __future__ import print_function, unicode_literals import collections import itertools -try: +if True: # pylint: disable=using-constant-test from collections.abc import Sequence from typing import Callable, List, Optional, Tuple, Union -except: - pass def num_char_count_bits(ver: int) -> int: diff --git a/copyparty/stolen/surrogateescape.py b/copyparty/stolen/surrogateescape.py index c11b455f..22eb00b4 100644 --- a/copyparty/stolen/surrogateescape.py +++ b/copyparty/stolen/surrogateescape.py @@ -20,10 +20,8 @@ PY3 = sys.version_info > (3,) WINDOWS = platform.system() == "Windows" FS_ERRORS = "surrogateescape" -try: +if True: # pylint: disable=using-constant-test from typing import Any -except: - pass if PY3: diff --git a/copyparty/sutil.py b/copyparty/sutil.py index 98f2a88d..b6b21dc0 100644 --- a/copyparty/sutil.py +++ b/copyparty/sutil.py @@ -6,12 +6,10 @@ from datetime import datetime from .bos import bos -try: +if True: # pylint: disable=using-constant-test from typing import Any, Generator, Optional from .util import NamedLogger -except: - pass class StreamArc(object): @@ -25,7 +23,7 @@ class StreamArc(object): self.fgen = fgen def gen(self) -> Generator[Optional[bytes], None, None]: - pass + raise Exception("override me") def errdesc(errors: list[tuple[str, str]]) -> tuple[dict[str, Any], list[str]]: diff --git a/copyparty/svchub.py b/copyparty/svchub.py index 7d3e14bb..8af9a29a 100644 --- a/copyparty/svchub.py +++ b/copyparty/svchub.py @@ -19,13 +19,11 @@ import threading import time from datetime import datetime, timedelta -try: +if True: # pylint: disable=using-constant-test from types import FrameType import typing from typing import Any, Optional, Union -except: - pass from .__init__ import ANYWIN, MACOS, VT100, EnvParams, unicode from .authsrv import AuthSrv @@ -355,7 +353,7 @@ class SvcHub(object): self.shutdown() - def kill9(self, delay: float = 0.0): + def kill9(self, delay: float = 0.0) -> None: if delay > 0.01: time.sleep(delay) print("component stuck; performing sigkill") diff --git a/copyparty/szip.py b/copyparty/szip.py index 6a5b4cfb..06bae7e0 100644 --- a/copyparty/szip.py +++ b/copyparty/szip.py @@ -9,12 +9,10 @@ from .bos import bos from .sutil import StreamArc, errdesc from .util import min_ex, sanitize_fn, spack, sunpack, yieldfile -try: +if True: # pylint: disable=using-constant-test from typing import Any, Generator, Optional from .util import NamedLogger -except: - pass def dostime2unix(buf: bytes) -> int: diff --git a/copyparty/th_cli.py b/copyparty/th_cli.py index 473d1855..afc15cb8 100644 --- a/copyparty/th_cli.py +++ b/copyparty/th_cli.py @@ -9,10 +9,8 @@ from .bos import bos from .th_srv import HAVE_WEBP, thumb_path from .util import Cooldown -try: +if True: # pylint: disable=using-constant-test from typing import Optional, Union -except: - pass if TYPE_CHECKING: from .httpsrv import HttpSrv diff --git a/copyparty/th_srv.py b/copyparty/th_srv.py index 29d07745..89d73caf 100644 --- a/copyparty/th_srv.py +++ b/copyparty/th_srv.py @@ -26,10 +26,8 @@ from .util import ( vsplit, ) -try: +if True: # pylint: disable=using-constant-test from typing import Optional, Union -except: - pass if TYPE_CHECKING: from .svchub import SvcHub diff --git a/copyparty/u2idx.py b/copyparty/u2idx.py index 6d8ae7ef..e5470e44 100644 --- a/copyparty/u2idx.py +++ b/copyparty/u2idx.py @@ -30,10 +30,8 @@ try: except: pass -try: +if True: # pylint: disable=using-constant-test from typing import Any, Optional, Union -except: - pass if TYPE_CHECKING: from .httpconn import HttpConn @@ -199,7 +197,7 @@ class U2idx(object): v = "exists(select 1 from mt where mt.w = mtw and " + vq else: - raise Pebkac(400, "invalid key [" + v + "]") + raise Pebkac(400, "invalid key [{}]".format(v)) q += v + " " continue diff --git a/copyparty/up2k.py b/copyparty/up2k.py index e91947a0..a100ced5 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -60,10 +60,8 @@ if HAVE_SQLITE3: DB_VER = 5 -try: +if True: # pylint: disable=using-constant-test from typing import Any, Optional, Pattern, Union -except: - pass if TYPE_CHECKING: from .svchub import SvcHub @@ -940,6 +938,7 @@ class Up2k(object): if n: t = "forgetting {} shadowed autoindexed files in [{}] > [{}]" self.log(t.format(n, top, sh_rd)) + assert sh_erd q = "delete from dh where (d = ? or d like ?||'%')" db.c.execute(q, (sh_erd, sh_erd + "/")) diff --git a/copyparty/util.py b/copyparty/util.py index fbcbc82f..0b840648 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -76,7 +76,7 @@ try: except: HAVE_PSUTIL = False -try: +if True: # pylint: disable=using-constant-test import types from collections.abc import Callable, Iterable @@ -91,21 +91,16 @@ try: def __call__(self, msg: str, c: Union[int, str] = 0) -> None: return None -except: - pass if TYPE_CHECKING: import magic from .authsrv import VFS -FAKE_MP = False - try: - if not FAKE_MP: - import multiprocessing as mp - else: - import multiprocessing.dummy as mp # type: ignore + import multiprocessing as mp + + # import multiprocessing.dummy as mp except ImportError: # support jython mp = None # type: ignore @@ -351,8 +346,8 @@ class Daemon(threading.Thread): self, target: Any, name: Optional[str] = None, - a: Iterable[Any] = None, - r=True, + a: Optional[Iterable[Any]] = None, + r: bool = True, ) -> None: threading.Thread.__init__(self, target=target, name=name, args=a or ()) self.daemon = True @@ -1049,6 +1044,7 @@ def ren_open( with fun(fsenc(fpath), *args, **kwargs) as f: if b64: + assert fdir fp2 = "fn-trunc.{}.txt".format(b64) fp2 = os.path.join(fdir, fp2) with open(fsenc(fp2), "wb") as f2: @@ -1075,7 +1071,7 @@ def ren_open( raise if not b64: - zs = (orig_name + "\n" + suffix).encode("utf-8", "replace") + zs = "{}\n{}".format(orig_name, suffix).encode("utf-8", "replace") zs = hashlib.sha512(zs).digest()[:12] b64 = base64.urlsafe_b64encode(zs).decode("utf-8") @@ -1225,6 +1221,7 @@ class MultipartParser(object): buf = buf[d:] # look for boundary near the end of the buffer + n = 0 for n in range(1, len(buf) + 1): if not buf[-n:] in self.boundary: n -= 1 @@ -1383,8 +1380,8 @@ def gen_filekey_dbg( except: ctx = "" + p2 = "a" try: - p2 = "a" p2 = absreal(fspath) if p2 != fspath: raise Exception() @@ -1581,17 +1578,21 @@ def html_bescape(s: bytes, quot: bool = False, crlf: bool = False) -> bytes: return s -def quotep(txt: str) -> str: +def _quotep2(txt: str) -> str: """url quoter which deals with bytes correctly""" btxt = w8enc(txt) - quot1 = quote(btxt, safe=b"/") - if not PY2: - quot2 = quot1.encode("ascii") - else: - quot2 = quot1 + quot = quote(btxt, safe=b"/") + return w8dec(quot.replace(b" ", b"+")) - quot3 = quot2.replace(b" ", b"+") - return w8dec(quot3) + +def _quotep3(txt: str) -> str: + """url quoter which deals with bytes correctly""" + btxt = w8enc(txt) + quot = quote(btxt, safe=b"/").encode("utf-8") + return w8dec(quot.replace(b" ", b"+")) + + +quotep = _quotep3 if not PY2 else _quotep2 def unquotep(txt: str) -> str: @@ -1616,22 +1617,30 @@ def vjoin(rd: str, fn: str) -> str: return rd or fn -def w8dec(txt: bytes) -> str: +def _w8dec2(txt: bytes) -> str: """decodes filesystem-bytes to wtf8""" - if PY2: - return surrogateescape.decodefilename(txt) + return surrogateescape.decodefilename(txt) + +def _w8enc2(txt: str) -> bytes: + """encodes wtf8 to filesystem-bytes""" + return surrogateescape.encodefilename(txt) + + +def _w8dec3(txt: bytes) -> str: + """decodes filesystem-bytes to wtf8""" return txt.decode(FS_ENCODING, "surrogateescape") -def w8enc(txt: str) -> bytes: +def _w8enc3(txt: str) -> bytes: """encodes wtf8 to filesystem-bytes""" - if PY2: - return surrogateescape.encodefilename(txt) - return txt.encode(FS_ENCODING, "surrogateescape") +w8dec = _w8dec3 if not PY2 else _w8dec2 +w8enc = _w8enc3 if not PY2 else _w8enc2 + + def w8b64dec(txt: str) -> str: """decodes base64(filesystem-bytes) to wtf8""" return w8dec(base64.urlsafe_b64decode(txt.encode("ascii"))) @@ -1642,17 +1651,17 @@ def w8b64enc(txt: str) -> str: return base64.urlsafe_b64encode(w8enc(txt)).decode("ascii") -if PY2 and WINDOWS: +if not PY2 or not WINDOWS: + fsenc = w8enc + fsdec = w8dec +else: # moonrunes become \x3f with bytestrings, # losing mojibake support is worth - def _not_actually_mbcs(txt): + def _not_actually_mbcs(txt: str) -> str: return txt fsenc = _not_actually_mbcs fsdec = _not_actually_mbcs -else: - fsenc = w8enc - fsdec = w8dec def s3enc(mem_cur: "sqlite3.Cursor", rd: str, fn: str) -> tuple[str, str]: @@ -1952,6 +1961,7 @@ def statdir( if lstat and (PY2 or os.stat not in os.supports_follow_symlinks): scandir = False + src = "statdir" try: btop = fsenc(top) if scandir and hasattr(os, "scandir"): @@ -2129,7 +2139,7 @@ def killtree(root: int) -> None: os.kill(pid, signal.SIGTERM) else: # windows gets minimal effort sorry - os.kill(pid, signal.SIGTERM) + os.kill(root, signal.SIGTERM) return for n in range(10): @@ -2151,19 +2161,21 @@ def runcmd( kill = ka.pop("kill", "t") # [t]ree [m]ain [n]one capture = ka.pop("capture", 3) # 0=none 1=stdout 2=stderr 3=both - sin = ka.pop("sin", None) + sin: Optional[bytes] = ka.pop("sin", None) if sin: ka["stdin"] = sp.PIPE cout = sp.PIPE if capture in [1, 3] else None cerr = sp.PIPE if capture in [2, 3] else None + bout: bytes + berr: bytes p = sp.Popen(argv, stdout=cout, stderr=cerr, **ka) if not timeout or PY2: - stdout, stderr = p.communicate(sin) + bout, berr = p.communicate(sin) else: try: - stdout, stderr = p.communicate(sin, timeout=timeout) + bout, berr = p.communicate(sin, timeout=timeout) except sp.TimeoutExpired: if kill == "n": return -18, "", "" # SIGCONT; leave it be @@ -2173,15 +2185,15 @@ def runcmd( killtree(p.pid) try: - stdout, stderr = p.communicate(timeout=1) + bout, berr = p.communicate(timeout=1) except: - stdout = b"" - stderr = b"" + bout = b"" + berr = b"" - stdout = stdout.decode("utf-8", "replace") if cout else b"" - stderr = stderr.decode("utf-8", "replace") if cerr else b"" + stdout = bout.decode("utf-8", "replace") if cout else "" + stderr = berr.decode("utf-8", "replace") if cerr else "" - rc = p.returncode + rc: int = p.returncode if rc is None: rc = -14 # SIGALRM; failed to kill diff --git a/scripts/strip_hints/a.py b/scripts/strip_hints/a.py index 4901bd0f..2d6caff1 100644 --- a/scripts/strip_hints/a.py +++ b/scripts/strip_hints/a.py @@ -1,7 +1,6 @@ # coding: utf-8 from __future__ import print_function, unicode_literals -import re import os import sys from strip_hints import strip_file_to_string @@ -51,16 +50,18 @@ def uh1(fp): pr(".") cs = strip_file_to_string(fp, no_ast=True, to_empty=True) - libs = "typing|types|collections\.abc" - ptn = re.compile(r"^(\s*)(from (?:{0}) import |import (?:{0})\b).*".format(libs)) - # remove expensive imports too lns = [] + on = True for ln in cs.split("\n"): - m = ptn.match(ln) - if m and "SimpleNamespace" not in ln: - ln = m.group(1) + "raise Exception()" + if ln.startswith("if True:"): + on = False + continue + if not on and (not ln.strip() or ln.startswith(" ")): + continue + + on = True lns.append(ln) cs = "\n".join(lns)