mirror of
https://github.com/9001/copyparty.git
synced 2025-08-18 09:22:31 -06:00
411 lines
12 KiB
Python
411 lines
12 KiB
Python
# coding: utf-8
|
|
from __future__ import print_function, unicode_literals
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import stat
|
|
import sys
|
|
import time
|
|
|
|
from pyftpdlib.authorizers import AuthenticationFailed, DummyAuthorizer
|
|
from pyftpdlib.filesystems import AbstractedFS, FilesystemError
|
|
from pyftpdlib.handlers import FTPHandler
|
|
from pyftpdlib.log import config_logging
|
|
from pyftpdlib.servers import FTPServer
|
|
|
|
from .__init__ import PY2, TYPE_CHECKING, E
|
|
from .bos import bos
|
|
from .util import Daemon, Pebkac, exclude_dotfiles, fsenc
|
|
|
|
try:
|
|
from pyftpdlib.ioloop import IOLoop
|
|
except ImportError:
|
|
p = os.path.join(E.mod, "vend")
|
|
print("loading asynchat from " + p)
|
|
sys.path.append(p)
|
|
from pyftpdlib.ioloop import IOLoop
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
from .svchub import SvcHub
|
|
|
|
if True: # pylint: disable=using-constant-test
|
|
import typing
|
|
from typing import Any, Optional
|
|
|
|
|
|
class FtpAuth(DummyAuthorizer):
|
|
def __init__(self, hub: "SvcHub") -> None:
|
|
super(FtpAuth, self).__init__()
|
|
self.hub = hub
|
|
|
|
def validate_authentication(
|
|
self, username: str, password: str, handler: Any
|
|
) -> None:
|
|
asrv = self.hub.asrv
|
|
if username == "anonymous":
|
|
password = ""
|
|
|
|
uname = "*"
|
|
if password:
|
|
uname = asrv.iacct.get(password, "")
|
|
|
|
handler.username = uname
|
|
|
|
if (password and not uname) or not (
|
|
asrv.vfs.aread.get(uname) or asrv.vfs.awrite.get(uname)
|
|
):
|
|
raise AuthenticationFailed("Authentication failed.")
|
|
|
|
def get_home_dir(self, username: str) -> str:
|
|
return "/"
|
|
|
|
def has_user(self, username: str) -> bool:
|
|
asrv = self.hub.asrv
|
|
return username in asrv.acct
|
|
|
|
def has_perm(self, username: str, perm: int, path: Optional[str] = None) -> bool:
|
|
return True # handled at filesystem layer
|
|
|
|
def get_perms(self, username: str) -> str:
|
|
return "elradfmwMT"
|
|
|
|
def get_msg_login(self, username: str) -> str:
|
|
return "sup {}".format(username)
|
|
|
|
def get_msg_quit(self, username: str) -> str:
|
|
return "cya"
|
|
|
|
|
|
class FtpFs(AbstractedFS):
|
|
def __init__(
|
|
self, root: str, cmd_channel: Any
|
|
) -> None: # pylint: disable=super-init-not-called
|
|
self.h = self.cmd_channel = cmd_channel # type: FTPHandler
|
|
self.hub: "SvcHub" = cmd_channel.hub
|
|
self.args = cmd_channel.args
|
|
|
|
self.uname = self.hub.asrv.iacct.get(cmd_channel.password, "*")
|
|
|
|
self.cwd = "/" # pyftpdlib convention of leading slash
|
|
self.root = "/var/lib/empty"
|
|
|
|
self.can_read = self.can_write = self.can_move = False
|
|
self.can_delete = self.can_get = self.can_upget = False
|
|
|
|
self.listdirinfo = self.listdir
|
|
self.chdir(".")
|
|
|
|
def v2a(
|
|
self,
|
|
vpath: str,
|
|
r: bool = False,
|
|
w: bool = False,
|
|
m: bool = False,
|
|
d: bool = False,
|
|
) -> str:
|
|
try:
|
|
vpath = vpath.replace("\\", "/").lstrip("/")
|
|
vfs, rem = self.hub.asrv.vfs.get(vpath, self.uname, r, w, m, d)
|
|
if not vfs.realpath:
|
|
raise FilesystemError("no filesystem mounted at this path")
|
|
|
|
return os.path.join(vfs.realpath, rem)
|
|
except Pebkac as ex:
|
|
raise FilesystemError(str(ex))
|
|
|
|
def rv2a(
|
|
self,
|
|
vpath: str,
|
|
r: bool = False,
|
|
w: bool = False,
|
|
m: bool = False,
|
|
d: bool = False,
|
|
) -> str:
|
|
return self.v2a(os.path.join(self.cwd, vpath), r, w, m, d)
|
|
|
|
def ftp2fs(self, ftppath: str) -> str:
|
|
# return self.v2a(ftppath)
|
|
return ftppath # self.cwd must be vpath
|
|
|
|
def fs2ftp(self, fspath: str) -> str:
|
|
# raise NotImplementedError()
|
|
return fspath
|
|
|
|
def validpath(self, path: str) -> bool:
|
|
if "/.hist/" in path:
|
|
if "/up2k." in path or path.endswith("/dir.txt"):
|
|
raise FilesystemError("access to this file is forbidden")
|
|
|
|
return True
|
|
|
|
def open(self, filename: str, mode: str) -> typing.IO[Any]:
|
|
r = "r" in mode
|
|
w = "w" in mode or "a" in mode or "+" in mode
|
|
|
|
ap = self.rv2a(filename, r, w)
|
|
if w and bos.path.exists(ap):
|
|
raise FilesystemError("cannot open existing file for writing")
|
|
|
|
self.validpath(ap)
|
|
return open(fsenc(ap), mode)
|
|
|
|
def chdir(self, path: str) -> None:
|
|
self.cwd = join(self.cwd, path)
|
|
(
|
|
self.can_read,
|
|
self.can_write,
|
|
self.can_move,
|
|
self.can_delete,
|
|
self.can_get,
|
|
self.can_upget,
|
|
) = self.hub.asrv.vfs.can_access(self.cwd.lstrip("/"), self.h.username)
|
|
|
|
def mkdir(self, path: str) -> None:
|
|
ap = self.rv2a(path, w=True)
|
|
bos.mkdir(ap)
|
|
|
|
def listdir(self, path: str) -> list[str]:
|
|
vpath = join(self.cwd, path).lstrip("/")
|
|
try:
|
|
vfs, rem = self.hub.asrv.vfs.get(vpath, self.uname, True, False)
|
|
|
|
fsroot, vfs_ls1, vfs_virt = vfs.ls(
|
|
rem,
|
|
self.uname,
|
|
not self.args.no_scandir,
|
|
[[True, False], [False, True]],
|
|
)
|
|
vfs_ls = [x[0] for x in vfs_ls1]
|
|
vfs_ls.extend(vfs_virt.keys())
|
|
|
|
if not self.args.ed:
|
|
vfs_ls = exclude_dotfiles(vfs_ls)
|
|
|
|
vfs_ls.sort()
|
|
return vfs_ls
|
|
except:
|
|
if vpath:
|
|
# display write-only folders as empty
|
|
return []
|
|
|
|
# return list of volumes
|
|
r = {x.split("/")[0]: 1 for x in self.hub.asrv.vfs.all_vols.keys()}
|
|
return list(sorted(list(r.keys())))
|
|
|
|
def rmdir(self, path: str) -> None:
|
|
ap = self.rv2a(path, d=True)
|
|
bos.rmdir(ap)
|
|
|
|
def remove(self, path: str) -> None:
|
|
if self.args.no_del:
|
|
raise FilesystemError("the delete feature is disabled in server config")
|
|
|
|
vp = join(self.cwd, path).lstrip("/")
|
|
try:
|
|
self.hub.up2k.handle_rm(self.uname, self.h.remote_ip, [vp], [])
|
|
except Exception as ex:
|
|
raise FilesystemError(str(ex))
|
|
|
|
def rename(self, src: str, dst: str) -> None:
|
|
if not self.can_move:
|
|
raise FilesystemError("not allowed for user " + self.h.username)
|
|
|
|
if self.args.no_mv:
|
|
t = "the rename/move feature is disabled in server config"
|
|
raise FilesystemError(t)
|
|
|
|
svp = join(self.cwd, src).lstrip("/")
|
|
dvp = join(self.cwd, dst).lstrip("/")
|
|
try:
|
|
self.hub.up2k.handle_mv(self.uname, svp, dvp)
|
|
except Exception as ex:
|
|
raise FilesystemError(str(ex))
|
|
|
|
def chmod(self, path: str, mode: str) -> None:
|
|
pass
|
|
|
|
def stat(self, path: str) -> os.stat_result:
|
|
try:
|
|
ap = self.rv2a(path, r=True)
|
|
return bos.stat(ap)
|
|
except:
|
|
ap = self.rv2a(path)
|
|
st = bos.stat(ap)
|
|
if not stat.S_ISDIR(st.st_mode):
|
|
raise
|
|
|
|
return st
|
|
|
|
def utime(self, path: str, timeval: float) -> None:
|
|
ap = self.rv2a(path, w=True)
|
|
return bos.utime(ap, (timeval, timeval))
|
|
|
|
def lstat(self, path: str) -> os.stat_result:
|
|
ap = self.rv2a(path)
|
|
return bos.lstat(ap)
|
|
|
|
def isfile(self, path: str) -> bool:
|
|
st = self.stat(path)
|
|
return stat.S_ISREG(st.st_mode)
|
|
|
|
def islink(self, path: str) -> bool:
|
|
ap = self.rv2a(path)
|
|
return bos.path.islink(ap)
|
|
|
|
def isdir(self, path: str) -> bool:
|
|
try:
|
|
st = self.stat(path)
|
|
return stat.S_ISDIR(st.st_mode)
|
|
except:
|
|
return True
|
|
|
|
def getsize(self, path: str) -> int:
|
|
ap = self.rv2a(path)
|
|
return bos.path.getsize(ap)
|
|
|
|
def getmtime(self, path: str) -> float:
|
|
ap = self.rv2a(path)
|
|
return bos.path.getmtime(ap)
|
|
|
|
def realpath(self, path: str) -> str:
|
|
return path
|
|
|
|
def lexists(self, path: str) -> bool:
|
|
ap = self.rv2a(path)
|
|
return bos.path.lexists(ap)
|
|
|
|
def get_user_by_uid(self, uid: int) -> str:
|
|
return "root"
|
|
|
|
def get_group_by_uid(self, gid: int) -> str:
|
|
return "root"
|
|
|
|
|
|
class FtpHandler(FTPHandler):
|
|
abstracted_fs = FtpFs
|
|
hub: "SvcHub"
|
|
args: argparse.Namespace
|
|
|
|
def __init__(self, conn: Any, server: Any, ioloop: Any = None) -> None:
|
|
self.hub: "SvcHub" = FtpHandler.hub
|
|
self.args: argparse.Namespace = FtpHandler.args
|
|
|
|
if PY2:
|
|
FTPHandler.__init__(self, conn, server, ioloop)
|
|
else:
|
|
super(FtpHandler, self).__init__(conn, server, ioloop)
|
|
|
|
# abspath->vpath mapping to resolve log_transfer paths
|
|
self.vfs_map: dict[str, str] = {}
|
|
|
|
def ftp_STOR(self, file: str, mode: str = "w") -> Any:
|
|
# Optional[str]
|
|
vp = join(self.fs.cwd, file).lstrip("/")
|
|
ap = self.fs.v2a(vp)
|
|
self.vfs_map[ap] = vp
|
|
# print("ftp_STOR: {} {} => {}".format(vp, mode, ap))
|
|
ret = FTPHandler.ftp_STOR(self, file, mode)
|
|
# print("ftp_STOR: {} {} OK".format(vp, mode))
|
|
return ret
|
|
|
|
def log_transfer(
|
|
self,
|
|
cmd: str,
|
|
filename: bytes,
|
|
receive: bool,
|
|
completed: bool,
|
|
elapsed: float,
|
|
bytes: int,
|
|
) -> Any:
|
|
# None
|
|
ap = filename.decode("utf-8", "replace")
|
|
vp = self.vfs_map.pop(ap, None)
|
|
# print("xfer_end: {} => {}".format(ap, vp))
|
|
if vp:
|
|
vp, fn = os.path.split(vp)
|
|
vfs, rem = self.hub.asrv.vfs.get(vp, self.username, False, True)
|
|
vfs, rem = vfs.get_dbv(rem)
|
|
self.hub.up2k.hash_file(
|
|
vfs.realpath,
|
|
vfs.flags,
|
|
rem,
|
|
fn,
|
|
self.remote_ip,
|
|
time.time(),
|
|
)
|
|
|
|
return FTPHandler.log_transfer(
|
|
self, cmd, filename, receive, completed, elapsed, bytes
|
|
)
|
|
|
|
|
|
try:
|
|
from pyftpdlib.handlers import TLS_FTPHandler
|
|
|
|
class SftpHandler(FtpHandler, TLS_FTPHandler):
|
|
pass
|
|
|
|
except:
|
|
pass
|
|
|
|
|
|
class Ftpd(object):
|
|
def __init__(self, hub: "SvcHub") -> None:
|
|
self.hub = hub
|
|
self.args = hub.args
|
|
|
|
hs = []
|
|
if self.args.ftp:
|
|
hs.append([FtpHandler, self.args.ftp])
|
|
if self.args.ftps:
|
|
try:
|
|
h1 = SftpHandler
|
|
except:
|
|
t = "\nftps requires pyopenssl;\nplease run the following:\n\n {} -m pip install --user pyopenssl\n"
|
|
print(t.format(sys.executable))
|
|
sys.exit(1)
|
|
|
|
h1.certfile = os.path.join(self.args.E.cfg, "cert.pem")
|
|
h1.tls_control_required = True
|
|
h1.tls_data_required = True
|
|
|
|
hs.append([h1, self.args.ftps])
|
|
|
|
for h_lp in hs:
|
|
h2, lp = h_lp
|
|
h2.hub = hub
|
|
h2.args = hub.args
|
|
h2.authorizer = FtpAuth(hub)
|
|
|
|
if self.args.ftp_pr:
|
|
p1, p2 = [int(x) for x in self.args.ftp_pr.split("-")]
|
|
if self.args.ftp and self.args.ftps:
|
|
# divide port range in half
|
|
d = int((p2 - p1) / 2)
|
|
if lp == self.args.ftp:
|
|
p2 = p1 + d
|
|
else:
|
|
p1 += d + 1
|
|
|
|
h2.passive_ports = list(range(p1, p2 + 1))
|
|
|
|
if self.args.ftp_nat:
|
|
h2.masquerade_address = self.args.ftp_nat
|
|
|
|
if self.args.ftp_dbg:
|
|
config_logging(level=logging.DEBUG)
|
|
|
|
ioloop = IOLoop()
|
|
for ip in self.args.i:
|
|
for h, lp in hs:
|
|
FTPServer((ip, int(lp)), h, ioloop)
|
|
|
|
Daemon(ioloop.loop, "ftp")
|
|
|
|
|
|
def join(p1: str, p2: str) -> str:
|
|
w = os.path.join(p1, p2.replace("\\", "/"))
|
|
return os.path.normpath(w).replace("\\", "/")
|