From 59b6e618166871a5feea46b72f1617048fba6afe Mon Sep 17 00:00:00 2001 From: ed Date: Wed, 6 Jul 2022 02:28:34 +0200 Subject: [PATCH] build fstab from relabels when mtab is unreadable --- copyparty/fsutil.py | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/copyparty/fsutil.py b/copyparty/fsutil.py index 06979ec9..b1d3f3cd 100644 --- a/copyparty/fsutil.py +++ b/copyparty/fsutil.py @@ -3,6 +3,7 @@ from __future__ import print_function, unicode_literals import ctypes import re +import os import time from .__init__ import ANYWIN, MACOS @@ -21,6 +22,7 @@ class Fstab(object): def __init__(self, log: RootLogger): self.log_func = log + self.trusted = False self.tab: Optional[VFS] = None self.cache: dict[str, str] = {} self.age = 0.0 @@ -49,6 +51,7 @@ class Fstab(object): self.log(msg.format(path, fs, min_ex()), 3) return fs + path = path.lstrip("/") try: return self.cache[path] except: @@ -92,21 +95,44 @@ class Fstab(object): def relabel(self, path: str, nval: str) -> None: assert self.tab + self.cache = {} + path = path.lstrip("/") ptn = re.compile(r"^[^\\/]*") - vn, _ = self.tab._find(path) + vn, rem = self.tab._find(path) + if not self.trusted: + # no mtab access; have to build as we go + if "/" in rem: + self.tab.add("idk", os.path.join(vn.vpath, rem.split("/")[0])) + if rem: + self.tab.add(nval, path) + else: + vn.realpath = nval + + return + visit = [vn] while visit: vn = visit.pop() vn.realpath = ptn.sub(nval, vn.realpath) visit.extend(list(vn.nodes.values())) - self.cache = {} def get_unix(self, path: str) -> str: if not self.tab: - self.build_tab() + try: + self.build_tab() + self.trusted = True + except: + # prisonparty or other restrictive environment + self.log("failed to build tab:\n{}".format(min_ex()), 3) + self.tab = VFS(self.log_func, "idk", "/", AXS(), {}) + self.trusted = False assert self.tab - return self.tab._find(path)[0].realpath.split("/")[0] + ret = self.tab._find(path)[0] + if self.trusted or path == ret.vpath: + return ret.realpath.split("/")[0] + else: + return "idk" def get_w32(self, path: str) -> str: # list mountpoints: fsutil fsinfo drives