From acbb8267e1ab7eea7f62c467d6bfd227294127b0 Mon Sep 17 00:00:00 2001 From: ed Date: Sat, 10 Feb 2024 23:50:17 +0000 Subject: [PATCH] tftp: add directory listing --- copyparty/__main__.py | 4 ++- copyparty/svchub.py | 7 ++++ copyparty/tftpd.py | 84 ++++++++++++++++++++++++++++++++++++++----- copyparty/util.py | 17 +++++++-- tests/test_dots.py | 2 +- tests/util.py | 2 +- 6 files changed, 101 insertions(+), 15 deletions(-) diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 3e190caf..561f33c9 100755 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -43,10 +43,10 @@ from .util import ( DEF_MTH, IMPLICATIONS, JINJA_VER, + PARTFTPY_VER, PY_DESC, PYFTPD_VER, SQLITE_VER, - PARTFTPY_VER, UNPLICATIONS, align_tab, ansi_re, @@ -1019,6 +1019,8 @@ def add_tftp(ap): ap2.add_argument("--tftp", metavar="PORT", type=int, help="enable TFTP server on \033[33mPORT\033[0m, for example \033[32m69 \033[0mor \033[32m3969") ap2.add_argument("--tftpv", action="store_true", help="verbose") ap2.add_argument("--tftpvv", action="store_true", help="verboser") + ap2.add_argument("--tftp-lsf", metavar="PTN", type=u, default="\\.?(dir|ls)(\\.txt)?", help="return a directory listing if a file with this name is requested and it does not exist; defaults matches .ls, dir, .dir.txt, ls.txt, ...") + ap2.add_argument("--tftp-nols", action="store_true", help="if someone tries to download a directory, return an error instead of showing its directory listing") ap2.add_argument("--tftp-ipa", metavar="PFX", type=u, default="", help="only accept connections from IP-addresses starting with \033[33mPFX\033[0m; specify [\033[32many\033[0m] to disable inheriting \033[33m--ipa\033[0m. Example: [\033[32m127., 10.89., 192.168.\033[0m]") ap2.add_argument("--tftp-pr", metavar="P-P", type=u, help="the range of UDP ports to use for data transfer, for example \033[32m12000-13000") diff --git a/copyparty/svchub.py b/copyparty/svchub.py index 49f49068..8bf3c645 100644 --- a/copyparty/svchub.py +++ b/copyparty/svchub.py @@ -451,6 +451,13 @@ class SvcHub(object): else: setattr(al, k, re.compile(vs)) + for k in "tftp_lsf".split(" "): + vs = getattr(al, k) + if not vs or vs == "no": + setattr(al, k, None) + else: + setattr(al, k, re.compile("^" + vs + "$")) + if not al.sus_urls: al.ban_url = "no" elif al.ban_url == "no": diff --git a/copyparty/tftpd.py b/copyparty/tftpd.py index ce23b626..8022b88a 100644 --- a/copyparty/tftpd.py +++ b/copyparty/tftpd.py @@ -4,14 +4,17 @@ from __future__ import print_function, unicode_literals try: from types import SimpleNamespace except: + class SimpleNamespace(object): def __init__(self, **attr): self.__dict__.update(attr) + import inspect import logging import os import stat +from datetime import datetime from partftpy import TftpContexts, TftpServer, TftpStates from partftpy.TftpShared import TftpException @@ -19,7 +22,7 @@ from partftpy.TftpShared import TftpException from .__init__ import PY2, TYPE_CHECKING from .authsrv import VFS from .bos import bos -from .util import Daemon, min_ex, pybin, runhook, undot +from .util import BytesIO, Daemon, exclude_dotfiles, runhook, undot if True: # pylint: disable=using-constant-test from typing import Any, Union @@ -40,6 +43,7 @@ def _serverInitial(self, pkt: Any, raddress: str, rport: int) -> bool: yeet("client rejected (--tftp-ipa): %s" % (raddress,)) return ret + # patch ipa-check into partftpd _hub: list["SvcHub"] = [] _orig_serverInitial = TftpStates.TftpServerState.serverInitial @@ -113,9 +117,7 @@ class Tftpd(object): def nlog(self, msg: str, c: Union[int, str] = 0) -> None: self.log("tftp", msg, c) - def _v2a( - self, caller: str, vpath: str, perms: list, *a: Any - ) -> tuple[VFS, str]: + def _v2a(self, caller: str, vpath: str, perms: list, *a: Any) -> tuple[VFS, str]: vpath = vpath.replace("\\", "/").lstrip("/") if not perms: perms = [True, True] @@ -124,9 +126,71 @@ class Tftpd(object): vfs, rem = self.asrv.vfs.get(vpath, "*", *perms) return vfs, vfs.canonical(rem) - def _ls(self, vpath: str, raddress: str, rport: int) -> Any: + def _ls(self, vpath: str, raddress: str, rport: int, force=False) -> Any: # generate file listing if vpath is dir.txt and return as file object - return None + if not force: + vpath, fn = os.path.split(vpath.replace("\\", "/")) + ptn = self.args.tftp_lsf + if not ptn or not ptn.match(fn.lower()): + return None + + vn, rem = self.asrv.vfs.get(vpath, "*", True, False) + fsroot, vfs_ls, vfs_virt = vn.ls( + rem, + "*", + not self.args.no_scandir, + [[True, False]], + ) + dnames = set([x[0] for x in vfs_ls if stat.S_ISDIR(x[1].st_mode)]) + dirs1 = [(v.st_mtime, v.st_size, k + "/") for k, v in vfs_ls if k in dnames] + fils1 = [(v.st_mtime, v.st_size, k) for k, v in vfs_ls if k not in dnames] + real1 = dirs1 + fils1 + realt = [(datetime.fromtimestamp(mt), sz, fn) for mt, sz, fn in real1] + reals = [ + ( + "%04d-%02d-%02d %02d:%02d:%02d" + % ( + zd.year, + zd.month, + zd.day, + zd.hour, + zd.minute, + zd.second, + ), + sz, + fn, + ) + for zd, sz, fn in realt + ] + virs = [("????-??-?? ??:??:??", 0, k + "/") for k in vfs_virt.keys()] + ls = virs + reals + + if "*" not in vn.axs.udot: + names = set(exclude_dotfiles([x[2] for x in ls])) + ls = [x for x in ls if x[2] in names] + + try: + biggest = max([x[1] for x in ls]) + except: + biggest = 0 + + perms = [] + if "*" in vn.axs.uread: + perms.append("read") + if "*" in vn.axs.udot: + perms.append("hidden") + if "*" in vn.axs.uwrite: + if "*" in vn.axs.udel: + perms.append("overwrite") + else: + perms.append("write") + + fmt = "{{}} {{:{},}} {{}}" + fmt = fmt.format(len("{:,}".format(biggest))) + retl = ["# permissions: %s" % (", ".join(perms),)] + retl += [fmt.format(*x) for x in ls] + ret = "\n".join(retl).encode("utf-8", "replace") + return BytesIO(ret) def _open(self, vpath: str, mode: str, *a: Any, **ka: Any) -> Any: rd = wr = False @@ -151,6 +215,9 @@ class Tftpd(object): ): yeet("blocked by xbu server config: " + vpath) + if not self.args.tftp_nols and bos.path.isdir(ap): + return self._ls(vpath, "", 0, True) + return open(ap, mode, *a, **ka) def _mkdir(self, vpath: str, *a) -> None: @@ -162,9 +229,7 @@ class Tftpd(object): def _unlink(self, vpath: str) -> None: # return bos.unlink(self._v2a("stat", vpath, *a)[1]) - vfs, ap = self._v2a( - "delete", vpath, [True, False, False, True] - ) + vfs, ap = self._v2a("delete", vpath, [True, False, False, True]) try: inf = bos.stat(ap) @@ -238,6 +303,7 @@ class Tftpd(object): fos.path.islink = self._hook fos.path.realpath = self._hook + def yeet(msg: str) -> None: warning(msg) raise TftpException(msg) diff --git a/copyparty/util.py b/copyparty/util.py index e4295f34..e3b7c6b1 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -431,13 +431,24 @@ except: PY_DESC = py_desc() -VERSIONS = "copyparty v{} ({})\n{}\n sqlite {} | jinja {} | pyftpd {} | tftp {}".format( - S_VERSION, S_BUILD_DT, PY_DESC, SQLITE_VER, JINJA_VER, PYFTPD_VER, PARTFTPY_VER +VERSIONS = ( + "copyparty v{} ({})\n{}\n sqlite {} | jinja {} | pyftpd {} | tftp {}".format( + S_VERSION, S_BUILD_DT, PY_DESC, SQLITE_VER, JINJA_VER, PYFTPD_VER, PARTFTPY_VER + ) ) _: Any = (mp, BytesIO, quote, unquote, SQLITE_VER, JINJA_VER, PYFTPD_VER, PARTFTPY_VER) -__all__ = ["mp", "BytesIO", "quote", "unquote", "SQLITE_VER", "JINJA_VER", "PYFTPD_VER", "PARTFTPY_VER"] +__all__ = [ + "mp", + "BytesIO", + "quote", + "unquote", + "SQLITE_VER", + "JINJA_VER", + "PYFTPD_VER", + "PARTFTPY_VER", +] class Daemon(threading.Thread): diff --git a/tests/test_dots.py b/tests/test_dots.py index 5822dfdd..3e8e60a0 100644 --- a/tests/test_dots.py +++ b/tests/test_dots.py @@ -11,8 +11,8 @@ import unittest from copyparty.authsrv import AuthSrv from copyparty.httpcli import HttpCli -from copyparty.up2k import Up2k from copyparty.u2idx import U2idx +from copyparty.up2k import Up2k from tests import util as tu from tests.util import Cfg diff --git a/tests/util.py b/tests/util.py index 60b955da..a91c1cce 100644 --- a/tests/util.py +++ b/tests/util.py @@ -43,8 +43,8 @@ if MACOS: from copyparty.__init__ import E from copyparty.__main__ import init_E -from copyparty.util import FHC, Garda, Unrecv from copyparty.u2idx import U2idx +from copyparty.util import FHC, Garda, Unrecv init_E(E)