tftp: add directory listing

This commit is contained in:
ed 2024-02-10 23:50:17 +00:00
parent 8796c09f56
commit acbb8267e1
6 changed files with 101 additions and 15 deletions

View file

@ -43,10 +43,10 @@ from .util import (
DEF_MTH, DEF_MTH,
IMPLICATIONS, IMPLICATIONS,
JINJA_VER, JINJA_VER,
PARTFTPY_VER,
PY_DESC, PY_DESC,
PYFTPD_VER, PYFTPD_VER,
SQLITE_VER, SQLITE_VER,
PARTFTPY_VER,
UNPLICATIONS, UNPLICATIONS,
align_tab, align_tab,
ansi_re, ansi_re,
@ -1019,6 +1019,8 @@ def add_tftp(ap):
ap2.add_argument("--tftp", metavar="PORT", type=int, help="enable TFTP server on \033[33mPORT\033[0m, for example \033[32m69 \033[0mor \033[32m3969") ap2.add_argument("--tftp", metavar="PORT", type=int, help="enable TFTP server on \033[33mPORT\033[0m, for example \033[32m69 \033[0mor \033[32m3969")
ap2.add_argument("--tftpv", action="store_true", help="verbose") ap2.add_argument("--tftpv", action="store_true", help="verbose")
ap2.add_argument("--tftpvv", action="store_true", help="verboser") ap2.add_argument("--tftpvv", action="store_true", help="verboser")
ap2.add_argument("--tftp-lsf", metavar="PTN", type=u, default="\\.?(dir|ls)(\\.txt)?", help="return a directory listing if a file with this name is requested and it does not exist; defaults matches .ls, dir, .dir.txt, ls.txt, ...")
ap2.add_argument("--tftp-nols", action="store_true", help="if someone tries to download a directory, return an error instead of showing its directory listing")
ap2.add_argument("--tftp-ipa", metavar="PFX", type=u, default="", help="only accept connections from IP-addresses starting with \033[33mPFX\033[0m; specify [\033[32many\033[0m] to disable inheriting \033[33m--ipa\033[0m. Example: [\033[32m127., 10.89., 192.168.\033[0m]") ap2.add_argument("--tftp-ipa", metavar="PFX", type=u, default="", help="only accept connections from IP-addresses starting with \033[33mPFX\033[0m; specify [\033[32many\033[0m] to disable inheriting \033[33m--ipa\033[0m. Example: [\033[32m127., 10.89., 192.168.\033[0m]")
ap2.add_argument("--tftp-pr", metavar="P-P", type=u, help="the range of UDP ports to use for data transfer, for example \033[32m12000-13000") ap2.add_argument("--tftp-pr", metavar="P-P", type=u, help="the range of UDP ports to use for data transfer, for example \033[32m12000-13000")

View file

@ -451,6 +451,13 @@ class SvcHub(object):
else: else:
setattr(al, k, re.compile(vs)) setattr(al, k, re.compile(vs))
for k in "tftp_lsf".split(" "):
vs = getattr(al, k)
if not vs or vs == "no":
setattr(al, k, None)
else:
setattr(al, k, re.compile("^" + vs + "$"))
if not al.sus_urls: if not al.sus_urls:
al.ban_url = "no" al.ban_url = "no"
elif al.ban_url == "no": elif al.ban_url == "no":

View file

@ -4,14 +4,17 @@ from __future__ import print_function, unicode_literals
try: try:
from types import SimpleNamespace from types import SimpleNamespace
except: except:
class SimpleNamespace(object): class SimpleNamespace(object):
def __init__(self, **attr): def __init__(self, **attr):
self.__dict__.update(attr) self.__dict__.update(attr)
import inspect import inspect
import logging import logging
import os import os
import stat import stat
from datetime import datetime
from partftpy import TftpContexts, TftpServer, TftpStates from partftpy import TftpContexts, TftpServer, TftpStates
from partftpy.TftpShared import TftpException from partftpy.TftpShared import TftpException
@ -19,7 +22,7 @@ from partftpy.TftpShared import TftpException
from .__init__ import PY2, TYPE_CHECKING from .__init__ import PY2, TYPE_CHECKING
from .authsrv import VFS from .authsrv import VFS
from .bos import bos from .bos import bos
from .util import Daemon, min_ex, pybin, runhook, undot from .util import BytesIO, Daemon, exclude_dotfiles, runhook, undot
if True: # pylint: disable=using-constant-test if True: # pylint: disable=using-constant-test
from typing import Any, Union from typing import Any, Union
@ -40,6 +43,7 @@ def _serverInitial(self, pkt: Any, raddress: str, rport: int) -> bool:
yeet("client rejected (--tftp-ipa): %s" % (raddress,)) yeet("client rejected (--tftp-ipa): %s" % (raddress,))
return ret return ret
# patch ipa-check into partftpd # patch ipa-check into partftpd
_hub: list["SvcHub"] = [] _hub: list["SvcHub"] = []
_orig_serverInitial = TftpStates.TftpServerState.serverInitial _orig_serverInitial = TftpStates.TftpServerState.serverInitial
@ -113,9 +117,7 @@ class Tftpd(object):
def nlog(self, msg: str, c: Union[int, str] = 0) -> None: def nlog(self, msg: str, c: Union[int, str] = 0) -> None:
self.log("tftp", msg, c) self.log("tftp", msg, c)
def _v2a( def _v2a(self, caller: str, vpath: str, perms: list, *a: Any) -> tuple[VFS, str]:
self, caller: str, vpath: str, perms: list, *a: Any
) -> tuple[VFS, str]:
vpath = vpath.replace("\\", "/").lstrip("/") vpath = vpath.replace("\\", "/").lstrip("/")
if not perms: if not perms:
perms = [True, True] perms = [True, True]
@ -124,9 +126,71 @@ class Tftpd(object):
vfs, rem = self.asrv.vfs.get(vpath, "*", *perms) vfs, rem = self.asrv.vfs.get(vpath, "*", *perms)
return vfs, vfs.canonical(rem) return vfs, vfs.canonical(rem)
def _ls(self, vpath: str, raddress: str, rport: int) -> Any: def _ls(self, vpath: str, raddress: str, rport: int, force=False) -> Any:
# generate file listing if vpath is dir.txt and return as file object # generate file listing if vpath is dir.txt and return as file object
return None if not force:
vpath, fn = os.path.split(vpath.replace("\\", "/"))
ptn = self.args.tftp_lsf
if not ptn or not ptn.match(fn.lower()):
return None
vn, rem = self.asrv.vfs.get(vpath, "*", True, False)
fsroot, vfs_ls, vfs_virt = vn.ls(
rem,
"*",
not self.args.no_scandir,
[[True, False]],
)
dnames = set([x[0] for x in vfs_ls if stat.S_ISDIR(x[1].st_mode)])
dirs1 = [(v.st_mtime, v.st_size, k + "/") for k, v in vfs_ls if k in dnames]
fils1 = [(v.st_mtime, v.st_size, k) for k, v in vfs_ls if k not in dnames]
real1 = dirs1 + fils1
realt = [(datetime.fromtimestamp(mt), sz, fn) for mt, sz, fn in real1]
reals = [
(
"%04d-%02d-%02d %02d:%02d:%02d"
% (
zd.year,
zd.month,
zd.day,
zd.hour,
zd.minute,
zd.second,
),
sz,
fn,
)
for zd, sz, fn in realt
]
virs = [("????-??-?? ??:??:??", 0, k + "/") for k in vfs_virt.keys()]
ls = virs + reals
if "*" not in vn.axs.udot:
names = set(exclude_dotfiles([x[2] for x in ls]))
ls = [x for x in ls if x[2] in names]
try:
biggest = max([x[1] for x in ls])
except:
biggest = 0
perms = []
if "*" in vn.axs.uread:
perms.append("read")
if "*" in vn.axs.udot:
perms.append("hidden")
if "*" in vn.axs.uwrite:
if "*" in vn.axs.udel:
perms.append("overwrite")
else:
perms.append("write")
fmt = "{{}} {{:{},}} {{}}"
fmt = fmt.format(len("{:,}".format(biggest)))
retl = ["# permissions: %s" % (", ".join(perms),)]
retl += [fmt.format(*x) for x in ls]
ret = "\n".join(retl).encode("utf-8", "replace")
return BytesIO(ret)
def _open(self, vpath: str, mode: str, *a: Any, **ka: Any) -> Any: def _open(self, vpath: str, mode: str, *a: Any, **ka: Any) -> Any:
rd = wr = False rd = wr = False
@ -151,6 +215,9 @@ class Tftpd(object):
): ):
yeet("blocked by xbu server config: " + vpath) yeet("blocked by xbu server config: " + vpath)
if not self.args.tftp_nols and bos.path.isdir(ap):
return self._ls(vpath, "", 0, True)
return open(ap, mode, *a, **ka) return open(ap, mode, *a, **ka)
def _mkdir(self, vpath: str, *a) -> None: def _mkdir(self, vpath: str, *a) -> None:
@ -162,9 +229,7 @@ class Tftpd(object):
def _unlink(self, vpath: str) -> None: def _unlink(self, vpath: str) -> None:
# return bos.unlink(self._v2a("stat", vpath, *a)[1]) # return bos.unlink(self._v2a("stat", vpath, *a)[1])
vfs, ap = self._v2a( vfs, ap = self._v2a("delete", vpath, [True, False, False, True])
"delete", vpath, [True, False, False, True]
)
try: try:
inf = bos.stat(ap) inf = bos.stat(ap)
@ -238,6 +303,7 @@ class Tftpd(object):
fos.path.islink = self._hook fos.path.islink = self._hook
fos.path.realpath = self._hook fos.path.realpath = self._hook
def yeet(msg: str) -> None: def yeet(msg: str) -> None:
warning(msg) warning(msg)
raise TftpException(msg) raise TftpException(msg)

View file

@ -431,13 +431,24 @@ except:
PY_DESC = py_desc() PY_DESC = py_desc()
VERSIONS = "copyparty v{} ({})\n{}\n sqlite {} | jinja {} | pyftpd {} | tftp {}".format( VERSIONS = (
S_VERSION, S_BUILD_DT, PY_DESC, SQLITE_VER, JINJA_VER, PYFTPD_VER, PARTFTPY_VER "copyparty v{} ({})\n{}\n sqlite {} | jinja {} | pyftpd {} | tftp {}".format(
S_VERSION, S_BUILD_DT, PY_DESC, SQLITE_VER, JINJA_VER, PYFTPD_VER, PARTFTPY_VER
)
) )
_: Any = (mp, BytesIO, quote, unquote, SQLITE_VER, JINJA_VER, PYFTPD_VER, PARTFTPY_VER) _: Any = (mp, BytesIO, quote, unquote, SQLITE_VER, JINJA_VER, PYFTPD_VER, PARTFTPY_VER)
__all__ = ["mp", "BytesIO", "quote", "unquote", "SQLITE_VER", "JINJA_VER", "PYFTPD_VER", "PARTFTPY_VER"] __all__ = [
"mp",
"BytesIO",
"quote",
"unquote",
"SQLITE_VER",
"JINJA_VER",
"PYFTPD_VER",
"PARTFTPY_VER",
]
class Daemon(threading.Thread): class Daemon(threading.Thread):

View file

@ -11,8 +11,8 @@ import unittest
from copyparty.authsrv import AuthSrv from copyparty.authsrv import AuthSrv
from copyparty.httpcli import HttpCli from copyparty.httpcli import HttpCli
from copyparty.up2k import Up2k
from copyparty.u2idx import U2idx from copyparty.u2idx import U2idx
from copyparty.up2k import Up2k
from tests import util as tu from tests import util as tu
from tests.util import Cfg from tests.util import Cfg

View file

@ -43,8 +43,8 @@ if MACOS:
from copyparty.__init__ import E from copyparty.__init__ import E
from copyparty.__main__ import init_E from copyparty.__main__ import init_E
from copyparty.util import FHC, Garda, Unrecv
from copyparty.u2idx import U2idx from copyparty.u2idx import U2idx
from copyparty.util import FHC, Garda, Unrecv
init_E(E) init_E(E)