mirror of
https://github.com/9001/copyparty.git
synced 2025-08-17 09:02:15 -06:00
build fstab from relabels when mtab is unreadable
This commit is contained in:
parent
daff7ff158
commit
59b6e61816
|
@ -3,6 +3,7 @@ from __future__ import print_function, unicode_literals
|
|||
|
||||
import ctypes
|
||||
import re
|
||||
import os
|
||||
import time
|
||||
|
||||
from .__init__ import ANYWIN, MACOS
|
||||
|
@ -21,6 +22,7 @@ class Fstab(object):
|
|||
def __init__(self, log: RootLogger):
|
||||
self.log_func = log
|
||||
|
||||
self.trusted = False
|
||||
self.tab: Optional[VFS] = None
|
||||
self.cache: dict[str, str] = {}
|
||||
self.age = 0.0
|
||||
|
@ -49,6 +51,7 @@ class Fstab(object):
|
|||
self.log(msg.format(path, fs, min_ex()), 3)
|
||||
return fs
|
||||
|
||||
path = path.lstrip("/")
|
||||
try:
|
||||
return self.cache[path]
|
||||
except:
|
||||
|
@ -92,21 +95,44 @@ class Fstab(object):
|
|||
|
||||
def relabel(self, path: str, nval: str) -> None:
|
||||
assert self.tab
|
||||
self.cache = {}
|
||||
path = path.lstrip("/")
|
||||
ptn = re.compile(r"^[^\\/]*")
|
||||
vn, _ = self.tab._find(path)
|
||||
vn, rem = self.tab._find(path)
|
||||
if not self.trusted:
|
||||
# no mtab access; have to build as we go
|
||||
if "/" in rem:
|
||||
self.tab.add("idk", os.path.join(vn.vpath, rem.split("/")[0]))
|
||||
if rem:
|
||||
self.tab.add(nval, path)
|
||||
else:
|
||||
vn.realpath = nval
|
||||
|
||||
return
|
||||
|
||||
visit = [vn]
|
||||
while visit:
|
||||
vn = visit.pop()
|
||||
vn.realpath = ptn.sub(nval, vn.realpath)
|
||||
visit.extend(list(vn.nodes.values()))
|
||||
self.cache = {}
|
||||
|
||||
def get_unix(self, path: str) -> str:
|
||||
if not self.tab:
|
||||
self.build_tab()
|
||||
try:
|
||||
self.build_tab()
|
||||
self.trusted = True
|
||||
except:
|
||||
# prisonparty or other restrictive environment
|
||||
self.log("failed to build tab:\n{}".format(min_ex()), 3)
|
||||
self.tab = VFS(self.log_func, "idk", "/", AXS(), {})
|
||||
self.trusted = False
|
||||
|
||||
assert self.tab
|
||||
return self.tab._find(path)[0].realpath.split("/")[0]
|
||||
ret = self.tab._find(path)[0]
|
||||
if self.trusted or path == ret.vpath:
|
||||
return ret.realpath.split("/")[0]
|
||||
else:
|
||||
return "idk"
|
||||
|
||||
def get_w32(self, path: str) -> str:
|
||||
# list mountpoints: fsutil fsinfo drives
|
||||
|
|
Loading…
Reference in a new issue