mirror of
				https://github.com/9001/copyparty.git
				synced 2025-10-30 12:12:20 -06:00 
			
		
		
		
	
		
			
				
	
	
		
			153 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			153 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # coding: utf-8
 | |
| from __future__ import print_function, unicode_literals
 | |
| 
 | |
| import os
 | |
| import re
 | |
| import time
 | |
| 
 | |
| from .__init__ import ANYWIN, MACOS
 | |
| from .authsrv import AXS, VFS
 | |
| from .bos import bos
 | |
| from .util import chkcmd, min_ex
 | |
| 
 | |
| if True:  # pylint: disable=using-constant-test
 | |
|     from typing import Optional, Union
 | |
| 
 | |
|     from .util import RootLogger
 | |
| 
 | |
| 
 | |
| class Fstab(object):
 | |
|     def __init__(self, log: "RootLogger"):
 | |
|         self.log_func = log
 | |
| 
 | |
|         self.trusted = False
 | |
|         self.tab: Optional[VFS] = None
 | |
|         self.cache: dict[str, str] = {}
 | |
|         self.age = 0.0
 | |
| 
 | |
|     def log(self, msg: str, c: Union[int, str] = 0) -> None:
 | |
|         self.log_func("fstab", msg, c)
 | |
| 
 | |
|     def get(self, path: str) -> str:
 | |
|         if len(self.cache) > 9000:
 | |
|             self.age = time.time()
 | |
|             self.tab = None
 | |
|             self.cache = {}
 | |
| 
 | |
|         fs = "ext4"
 | |
|         msg = "failed to determine filesystem at [{}]; assuming {}\n{}"
 | |
| 
 | |
|         if ANYWIN:
 | |
|             fs = "vfat"
 | |
|             try:
 | |
|                 path = self._winpath(path)
 | |
|             except:
 | |
|                 self.log(msg.format(path, fs, min_ex()), 3)
 | |
|                 return fs
 | |
| 
 | |
|         path = path.lstrip("/")
 | |
|         try:
 | |
|             return self.cache[path]
 | |
|         except:
 | |
|             pass
 | |
| 
 | |
|         try:
 | |
|             fs = self.get_w32(path) if ANYWIN else self.get_unix(path)
 | |
|         except:
 | |
|             self.log(msg.format(path, fs, min_ex()), 3)
 | |
| 
 | |
|         fs = fs.lower()
 | |
|         self.cache[path] = fs
 | |
|         self.log("found {} at {}".format(fs, path))
 | |
|         return fs
 | |
| 
 | |
|     def _winpath(self, path: str) -> str:
 | |
|         # try to combine volume-label + st_dev (vsn)
 | |
|         path = path.replace("/", "\\")
 | |
|         vid = path.split(":", 1)[0].strip("\\").split("\\", 1)[0]
 | |
|         try:
 | |
|             return "{}*{}".format(vid, bos.stat(path).st_dev)
 | |
|         except:
 | |
|             return vid
 | |
| 
 | |
|     def build_fallback(self) -> None:
 | |
|         self.tab = VFS(self.log_func, "idk", "/", AXS(), {})
 | |
|         self.trusted = False
 | |
| 
 | |
|     def build_tab(self) -> None:
 | |
|         self.log("building tab")
 | |
| 
 | |
|         sptn = r"^.*? on (.*) type ([^ ]+) \(.*"
 | |
|         if MACOS:
 | |
|             sptn = r"^.*? on (.*) \(([^ ]+), .*"
 | |
| 
 | |
|         ptn = re.compile(sptn)
 | |
|         so, _ = chkcmd(["mount"])
 | |
|         tab1: list[tuple[str, str]] = []
 | |
|         for ln in so.split("\n"):
 | |
|             m = ptn.match(ln)
 | |
|             if not m:
 | |
|                 continue
 | |
| 
 | |
|             zs1, zs2 = m.groups()
 | |
|             tab1.append((str(zs1), str(zs2)))
 | |
| 
 | |
|         tab1.sort(key=lambda x: (len(x[0]), x[0]))
 | |
|         path1, fs1 = tab1[0]
 | |
|         tab = VFS(self.log_func, fs1, path1, AXS(), {})
 | |
|         for path, fs in tab1[1:]:
 | |
|             tab.add(fs, path.lstrip("/"))
 | |
| 
 | |
|         self.tab = tab
 | |
| 
 | |
|     def relabel(self, path: str, nval: str) -> None:
 | |
|         assert self.tab
 | |
|         self.cache = {}
 | |
|         if ANYWIN:
 | |
|             path = self._winpath(path)
 | |
| 
 | |
|         path = path.lstrip("/")
 | |
|         ptn = re.compile(r"^[^\\/]*")
 | |
|         vn, rem = self.tab._find(path)
 | |
|         if not self.trusted:
 | |
|             # no mtab access; have to build as we go
 | |
|             if "/" in rem:
 | |
|                 self.tab.add("idk", os.path.join(vn.vpath, rem.split("/")[0]))
 | |
|             if rem:
 | |
|                 self.tab.add(nval, path)
 | |
|             else:
 | |
|                 vn.realpath = nval
 | |
| 
 | |
|             return
 | |
| 
 | |
|         visit = [vn]
 | |
|         while visit:
 | |
|             vn = visit.pop()
 | |
|             vn.realpath = ptn.sub(nval, vn.realpath)
 | |
|             visit.extend(list(vn.nodes.values()))
 | |
| 
 | |
|     def get_unix(self, path: str) -> str:
 | |
|         if not self.tab:
 | |
|             try:
 | |
|                 self.build_tab()
 | |
|                 self.trusted = True
 | |
|             except:
 | |
|                 # prisonparty or other restrictive environment
 | |
|                 self.log("failed to build tab:\n{}".format(min_ex()), 3)
 | |
|                 self.build_fallback()
 | |
| 
 | |
|         assert self.tab
 | |
|         ret = self.tab._find(path)[0]
 | |
|         if self.trusted or path == ret.vpath:
 | |
|             return ret.realpath.split("/")[0]
 | |
|         else:
 | |
|             return "idk"
 | |
| 
 | |
|     def get_w32(self, path: str) -> str:
 | |
|         if not self.tab:
 | |
|             self.build_fallback()
 | |
| 
 | |
|         assert self.tab
 | |
|         ret = self.tab._find(path)[0]
 | |
|         return ret.realpath
 |