tftp: add directory listing

This commit is contained in:
ed 2024-02-10 23:50:17 +00:00
parent 8796c09f56
commit acbb8267e1
6 changed files with 101 additions and 15 deletions

View file

@ -43,10 +43,10 @@ from .util import (
DEF_MTH,
IMPLICATIONS,
JINJA_VER,
PARTFTPY_VER,
PY_DESC,
PYFTPD_VER,
SQLITE_VER,
PARTFTPY_VER,
UNPLICATIONS,
align_tab,
ansi_re,
@ -1019,6 +1019,8 @@ def add_tftp(ap):
ap2.add_argument("--tftp", metavar="PORT", type=int, help="enable TFTP server on \033[33mPORT\033[0m, for example \033[32m69 \033[0mor \033[32m3969")
ap2.add_argument("--tftpv", action="store_true", help="verbose")
ap2.add_argument("--tftpvv", action="store_true", help="verboser")
ap2.add_argument("--tftp-lsf", metavar="PTN", type=u, default="\\.?(dir|ls)(\\.txt)?", help="return a directory listing if a file with this name is requested and it does not exist; defaults matches .ls, dir, .dir.txt, ls.txt, ...")
ap2.add_argument("--tftp-nols", action="store_true", help="if someone tries to download a directory, return an error instead of showing its directory listing")
ap2.add_argument("--tftp-ipa", metavar="PFX", type=u, default="", help="only accept connections from IP-addresses starting with \033[33mPFX\033[0m; specify [\033[32many\033[0m] to disable inheriting \033[33m--ipa\033[0m. Example: [\033[32m127., 10.89., 192.168.\033[0m]")
ap2.add_argument("--tftp-pr", metavar="P-P", type=u, help="the range of UDP ports to use for data transfer, for example \033[32m12000-13000")

View file

@ -451,6 +451,13 @@ class SvcHub(object):
else:
setattr(al, k, re.compile(vs))
for k in "tftp_lsf".split(" "):
vs = getattr(al, k)
if not vs or vs == "no":
setattr(al, k, None)
else:
setattr(al, k, re.compile("^" + vs + "$"))
if not al.sus_urls:
al.ban_url = "no"
elif al.ban_url == "no":

View file

@ -4,14 +4,17 @@ from __future__ import print_function, unicode_literals
try:
from types import SimpleNamespace
except:
class SimpleNamespace(object):
def __init__(self, **attr):
self.__dict__.update(attr)
import inspect
import logging
import os
import stat
from datetime import datetime
from partftpy import TftpContexts, TftpServer, TftpStates
from partftpy.TftpShared import TftpException
@ -19,7 +22,7 @@ from partftpy.TftpShared import TftpException
from .__init__ import PY2, TYPE_CHECKING
from .authsrv import VFS
from .bos import bos
from .util import Daemon, min_ex, pybin, runhook, undot
from .util import BytesIO, Daemon, exclude_dotfiles, runhook, undot
if True: # pylint: disable=using-constant-test
from typing import Any, Union
@ -40,6 +43,7 @@ def _serverInitial(self, pkt: Any, raddress: str, rport: int) -> bool:
yeet("client rejected (--tftp-ipa): %s" % (raddress,))
return ret
# patch ipa-check into partftpd
_hub: list["SvcHub"] = []
_orig_serverInitial = TftpStates.TftpServerState.serverInitial
@ -113,9 +117,7 @@ class Tftpd(object):
def nlog(self, msg: str, c: Union[int, str] = 0) -> None:
self.log("tftp", msg, c)
def _v2a(
self, caller: str, vpath: str, perms: list, *a: Any
) -> tuple[VFS, str]:
def _v2a(self, caller: str, vpath: str, perms: list, *a: Any) -> tuple[VFS, str]:
vpath = vpath.replace("\\", "/").lstrip("/")
if not perms:
perms = [True, True]
@ -124,10 +126,72 @@ class Tftpd(object):
vfs, rem = self.asrv.vfs.get(vpath, "*", *perms)
return vfs, vfs.canonical(rem)
def _ls(self, vpath: str, raddress: str, rport: int) -> Any:
def _ls(self, vpath: str, raddress: str, rport: int, force=False) -> Any:
# generate file listing if vpath is dir.txt and return as file object
if not force:
vpath, fn = os.path.split(vpath.replace("\\", "/"))
ptn = self.args.tftp_lsf
if not ptn or not ptn.match(fn.lower()):
return None
vn, rem = self.asrv.vfs.get(vpath, "*", True, False)
fsroot, vfs_ls, vfs_virt = vn.ls(
rem,
"*",
not self.args.no_scandir,
[[True, False]],
)
dnames = set([x[0] for x in vfs_ls if stat.S_ISDIR(x[1].st_mode)])
dirs1 = [(v.st_mtime, v.st_size, k + "/") for k, v in vfs_ls if k in dnames]
fils1 = [(v.st_mtime, v.st_size, k) for k, v in vfs_ls if k not in dnames]
real1 = dirs1 + fils1
realt = [(datetime.fromtimestamp(mt), sz, fn) for mt, sz, fn in real1]
reals = [
(
"%04d-%02d-%02d %02d:%02d:%02d"
% (
zd.year,
zd.month,
zd.day,
zd.hour,
zd.minute,
zd.second,
),
sz,
fn,
)
for zd, sz, fn in realt
]
virs = [("????-??-?? ??:??:??", 0, k + "/") for k in vfs_virt.keys()]
ls = virs + reals
if "*" not in vn.axs.udot:
names = set(exclude_dotfiles([x[2] for x in ls]))
ls = [x for x in ls if x[2] in names]
try:
biggest = max([x[1] for x in ls])
except:
biggest = 0
perms = []
if "*" in vn.axs.uread:
perms.append("read")
if "*" in vn.axs.udot:
perms.append("hidden")
if "*" in vn.axs.uwrite:
if "*" in vn.axs.udel:
perms.append("overwrite")
else:
perms.append("write")
fmt = "{{}} {{:{},}} {{}}"
fmt = fmt.format(len("{:,}".format(biggest)))
retl = ["# permissions: %s" % (", ".join(perms),)]
retl += [fmt.format(*x) for x in ls]
ret = "\n".join(retl).encode("utf-8", "replace")
return BytesIO(ret)
def _open(self, vpath: str, mode: str, *a: Any, **ka: Any) -> Any:
rd = wr = False
if mode == "rb":
@ -151,6 +215,9 @@ class Tftpd(object):
):
yeet("blocked by xbu server config: " + vpath)
if not self.args.tftp_nols and bos.path.isdir(ap):
return self._ls(vpath, "", 0, True)
return open(ap, mode, *a, **ka)
def _mkdir(self, vpath: str, *a) -> None:
@ -162,9 +229,7 @@ class Tftpd(object):
def _unlink(self, vpath: str) -> None:
# return bos.unlink(self._v2a("stat", vpath, *a)[1])
vfs, ap = self._v2a(
"delete", vpath, [True, False, False, True]
)
vfs, ap = self._v2a("delete", vpath, [True, False, False, True])
try:
inf = bos.stat(ap)
@ -238,6 +303,7 @@ class Tftpd(object):
fos.path.islink = self._hook
fos.path.realpath = self._hook
def yeet(msg: str) -> None:
warning(msg)
raise TftpException(msg)

View file

@ -431,13 +431,24 @@ except:
PY_DESC = py_desc()
VERSIONS = "copyparty v{} ({})\n{}\n sqlite {} | jinja {} | pyftpd {} | tftp {}".format(
VERSIONS = (
"copyparty v{} ({})\n{}\n sqlite {} | jinja {} | pyftpd {} | tftp {}".format(
S_VERSION, S_BUILD_DT, PY_DESC, SQLITE_VER, JINJA_VER, PYFTPD_VER, PARTFTPY_VER
)
)
_: Any = (mp, BytesIO, quote, unquote, SQLITE_VER, JINJA_VER, PYFTPD_VER, PARTFTPY_VER)
__all__ = ["mp", "BytesIO", "quote", "unquote", "SQLITE_VER", "JINJA_VER", "PYFTPD_VER", "PARTFTPY_VER"]
__all__ = [
"mp",
"BytesIO",
"quote",
"unquote",
"SQLITE_VER",
"JINJA_VER",
"PYFTPD_VER",
"PARTFTPY_VER",
]
class Daemon(threading.Thread):

View file

@ -11,8 +11,8 @@ import unittest
from copyparty.authsrv import AuthSrv
from copyparty.httpcli import HttpCli
from copyparty.up2k import Up2k
from copyparty.u2idx import U2idx
from copyparty.up2k import Up2k
from tests import util as tu
from tests.util import Cfg

View file

@ -43,8 +43,8 @@ if MACOS:
from copyparty.__init__ import E
from copyparty.__main__ import init_E
from copyparty.util import FHC, Garda, Unrecv
from copyparty.u2idx import U2idx
from copyparty.util import FHC, Garda, Unrecv
init_E(E)