mirror of
https://github.com/9001/copyparty.git
synced 2025-08-17 09:02:15 -06:00
153 lines
4.1 KiB
Python
153 lines
4.1 KiB
Python
# coding: utf-8
|
|
from __future__ import print_function, unicode_literals
|
|
|
|
import os
|
|
import re
|
|
import time
|
|
|
|
from .__init__ import ANYWIN, MACOS
|
|
from .authsrv import AXS, VFS
|
|
from .bos import bos
|
|
from .util import chkcmd, min_ex
|
|
|
|
if True: # pylint: disable=using-constant-test
|
|
from typing import Optional, Union
|
|
|
|
from .util import RootLogger
|
|
|
|
|
|
class Fstab(object):
|
|
def __init__(self, log: "RootLogger"):
|
|
self.log_func = log
|
|
|
|
self.trusted = False
|
|
self.tab: Optional[VFS] = None
|
|
self.cache: dict[str, str] = {}
|
|
self.age = 0.0
|
|
|
|
def log(self, msg: str, c: Union[int, str] = 0) -> None:
|
|
self.log_func("fstab", msg, c)
|
|
|
|
def get(self, path: str) -> str:
|
|
if len(self.cache) > 9000:
|
|
self.age = time.time()
|
|
self.tab = None
|
|
self.cache = {}
|
|
|
|
fs = "ext4"
|
|
msg = "failed to determine filesystem at [{}]; assuming {}\n{}"
|
|
|
|
if ANYWIN:
|
|
fs = "vfat"
|
|
try:
|
|
path = self._winpath(path)
|
|
except:
|
|
self.log(msg.format(path, fs, min_ex()), 3)
|
|
return fs
|
|
|
|
path = path.lstrip("/")
|
|
try:
|
|
return self.cache[path]
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
fs = self.get_w32(path) if ANYWIN else self.get_unix(path)
|
|
except:
|
|
self.log(msg.format(path, fs, min_ex()), 3)
|
|
|
|
fs = fs.lower()
|
|
self.cache[path] = fs
|
|
self.log("found {} at {}".format(fs, path))
|
|
return fs
|
|
|
|
def _winpath(self, path: str) -> str:
|
|
# try to combine volume-label + st_dev (vsn)
|
|
path = path.replace("/", "\\")
|
|
vid = path.split(":", 1)[0].strip("\\").split("\\", 1)[0]
|
|
try:
|
|
return "{}*{}".format(vid, bos.stat(path).st_dev)
|
|
except:
|
|
return vid
|
|
|
|
def build_fallback(self) -> None:
|
|
self.tab = VFS(self.log_func, "idk", "/", AXS(), {})
|
|
self.trusted = False
|
|
|
|
def build_tab(self) -> None:
|
|
self.log("building tab")
|
|
|
|
sptn = r"^.*? on (.*) type ([^ ]+) \(.*"
|
|
if MACOS:
|
|
sptn = r"^.*? on (.*) \(([^ ]+), .*"
|
|
|
|
ptn = re.compile(sptn)
|
|
so, _ = chkcmd(["mount"])
|
|
tab1: list[tuple[str, str]] = []
|
|
for ln in so.split("\n"):
|
|
m = ptn.match(ln)
|
|
if not m:
|
|
continue
|
|
|
|
zs1, zs2 = m.groups()
|
|
tab1.append((str(zs1), str(zs2)))
|
|
|
|
tab1.sort(key=lambda x: (len(x[0]), x[0]))
|
|
path1, fs1 = tab1[0]
|
|
tab = VFS(self.log_func, fs1, path1, AXS(), {})
|
|
for path, fs in tab1[1:]:
|
|
tab.add(fs, path.lstrip("/"))
|
|
|
|
self.tab = tab
|
|
|
|
def relabel(self, path: str, nval: str) -> None:
|
|
assert self.tab
|
|
self.cache = {}
|
|
if ANYWIN:
|
|
path = self._winpath(path)
|
|
|
|
path = path.lstrip("/")
|
|
ptn = re.compile(r"^[^\\/]*")
|
|
vn, rem = self.tab._find(path)
|
|
if not self.trusted:
|
|
# no mtab access; have to build as we go
|
|
if "/" in rem:
|
|
self.tab.add("idk", os.path.join(vn.vpath, rem.split("/")[0]))
|
|
if rem:
|
|
self.tab.add(nval, path)
|
|
else:
|
|
vn.realpath = nval
|
|
|
|
return
|
|
|
|
visit = [vn]
|
|
while visit:
|
|
vn = visit.pop()
|
|
vn.realpath = ptn.sub(nval, vn.realpath)
|
|
visit.extend(list(vn.nodes.values()))
|
|
|
|
def get_unix(self, path: str) -> str:
|
|
if not self.tab:
|
|
try:
|
|
self.build_tab()
|
|
self.trusted = True
|
|
except:
|
|
# prisonparty or other restrictive environment
|
|
self.log("failed to build tab:\n{}".format(min_ex()), 3)
|
|
self.build_fallback()
|
|
|
|
assert self.tab
|
|
ret = self.tab._find(path)[0]
|
|
if self.trusted or path == ret.vpath:
|
|
return ret.realpath.split("/")[0]
|
|
else:
|
|
return "idk"
|
|
|
|
def get_w32(self, path: str) -> str:
|
|
if not self.tab:
|
|
self.build_fallback()
|
|
|
|
assert self.tab
|
|
ret = self.tab._find(path)[0]
|
|
return ret.realpath
|