be case-sensitive on windows/macos (closes #781);

on Windows and Macos, most filesystems are case-insensitive,
which can lead to dangerous situations

one example is when another program (not copyparty or its UI) wants to
rename a file from `Foo` to `foo`; the program will probably start by
checking if `foo` exists and then delete it, however this would match
`Foo` and confuse the program into deleting the wrong file

fix this by adding a VERY EXPENSIVE detector to prevent this,
by listing the parent folder and checking if the case matches

this check will auto-enable when a case-insensitive FS is detected on
startup, but option `casechk` (global or volflag) can override this
This commit is contained in:
ed 2025-09-14 23:39:46 +00:00
parent 3a2381ff2d
commit 8b66874b85
8 changed files with 103 additions and 7 deletions

View file

@ -1184,6 +1184,7 @@ def add_qr(ap, tty):
def add_fs(ap): def add_fs(ap):
ap2 = ap.add_argument_group("filesystem options") ap2 = ap.add_argument_group("filesystem options")
rm_re_def = "15/0.1" if ANYWIN else "0/0" rm_re_def = "15/0.1" if ANYWIN else "0/0"
ap2.add_argument("--casechk", metavar="N", type=u, default="auto", help="detect and prevent CI (case-insensitive) behavior if the underlying filesystem is CI? [\033[32my\033[0m] = detect and prevent, [\033[32mn\033[0m] = ignore and allow, [\033[32mauto\033[0m] = \033[32my\033[0m if CI fs detected. NOTE: \033[32my\033[0m is very slow but necessary for correct WebDAV behavior on Windows/Macos (volflag=casechk)")
ap2.add_argument("--rm-retry", metavar="T/R", type=u, default=rm_re_def, help="if a file cannot be deleted because it is busy, continue trying for \033[33mT\033[0m seconds, retry every \033[33mR\033[0m seconds; disable with 0/0 (volflag=rm_retry)") ap2.add_argument("--rm-retry", metavar="T/R", type=u, default=rm_re_def, help="if a file cannot be deleted because it is busy, continue trying for \033[33mT\033[0m seconds, retry every \033[33mR\033[0m seconds; disable with 0/0 (volflag=rm_retry)")
ap2.add_argument("--mv-retry", metavar="T/R", type=u, default=rm_re_def, help="if a file cannot be renamed because it is busy, continue trying for \033[33mT\033[0m seconds, retry every \033[33mR\033[0m seconds; disable with 0/0 (volflag=mv_retry)") ap2.add_argument("--mv-retry", metavar="T/R", type=u, default=rm_re_def, help="if a file cannot be renamed because it is busy, continue trying for \033[33mT\033[0m seconds, retry every \033[33mR\033[0m seconds; disable with 0/0 (volflag=mv_retry)")
ap2.add_argument("--iobuf", metavar="BYTES", type=int, default=256*1024, help="file I/O buffer-size; if your volumes are on a network drive, try increasing to \033[32m524288\033[0m or even \033[32m4194304\033[0m (and let me know if that improves your performance)") ap2.add_argument("--iobuf", metavar="BYTES", type=int, default=256*1024, help="file I/O buffer-size; if your volumes are on a network drive, try increasing to \033[32m524288\033[0m or even \033[32m4194304\033[0m (and let me know if that improves your performance)")

View file

@ -13,7 +13,7 @@ import threading
import time import time
from datetime import datetime from datetime import datetime
from .__init__ import ANYWIN, PY2, TYPE_CHECKING, WINDOWS, E from .__init__ import ANYWIN, MACOS, PY2, TYPE_CHECKING, WINDOWS, E
from .bos import bos from .bos import bos
from .cfg import flagdescs, permdescs, vf_bmap, vf_cmap, vf_vmap from .cfg import flagdescs, permdescs, vf_bmap, vf_cmap, vf_vmap
from .pwhash import PWHash from .pwhash import PWHash
@ -630,6 +630,28 @@ class VFS(object):
vrem = vjoin(self.vpath[len(dbv.vpath) :].lstrip("/"), vrem) vrem = vjoin(self.vpath[len(dbv.vpath) :].lstrip("/"), vrem)
return dbv, vrem return dbv, vrem
def casechk(self, rem: str, do_stat: bool) -> bool:
ap = self.canonical(rem, False)
if do_stat and not bos.path.exists(ap):
return True # doesn't exist at all; good to go
dp, fn = os.path.split(ap)
try:
fns = os.listdir(dp)
except:
return True # maybe chmod 111; assume ok
if fn in fns:
return True
hit = "<?>"
lfn = fn.lower()
for zs in fns:
if lfn == zs.lower():
hit = zs
break
if self.log:
t = "returning 404 due to underlying case-insensitive filesystem:\n http-req: %r\n local-fs: %r"
self.log("vfs", t % (fn, hit))
return False
def _canonical_null(self, rem: str, resolve: bool = True) -> str: def _canonical_null(self, rem: str, resolve: bool = True) -> str:
return "" return ""
@ -1247,8 +1269,8 @@ class AuthSrv(object):
self.log(t, c=3) self.log(t, c=3)
raise Exception(BAD_CFG) raise Exception(BAD_CFG)
if not bos.path.isdir(src): if not bos.path.exists(src):
self.log("warning: filesystem-path does not exist: {}".format(src), 3) self.log("warning: filesystem-path did not exist: %r" % (src,), 3)
mount[dst] = (src, dst0) mount[dst] = (src, dst0)
daxs[dst] = AXS() daxs[dst] = AXS()
@ -2552,6 +2574,47 @@ class AuthSrv(object):
self.log(t.format(vol.vpath, mtp), 1) self.log(t.format(vol.vpath, mtp), 1)
errors = True errors = True
for vol in vfs.all_nodes.values():
if not vol.realpath or os.path.isfile(vol.realpath):
continue
ccs = vol.flags["casechk"][:1].lower()
if ccs in ("y", "n"):
if ccs == "y":
vol.flags["bcasechk"] = True
continue
try:
bos.makedirs(vol.realpath, vf=vol.flags)
files = os.listdir(vol.realpath)
for fn in files:
fn2 = fn.lower()
if fn == fn2:
fn2 = fn.upper()
if fn == fn2 or fn2 in files:
continue
is_ci = os.path.exists(os.path.join(vol.realpath, fn2))
ccs = "y" if is_ci else "n"
break
if ccs not in ("y", "n"):
ap = os.path.join(vol.realpath, "casechk")
open(ap, "wb").close()
ccs = "y" if os.path.exists(ap[:-1] + "K") else "n"
os.unlink(ap)
except Exception as ex:
if ANYWIN:
zs = "Windows"
ccs = "y"
elif MACOS:
zs = "Macos"
ccs = "y"
else:
zs = "Linux"
ccs = "n"
t = "unable to determine if filesystem at %r is case-insensitive due to %r; assuming casechk=%s due to %s"
self.log(t % (vol.realpath, ex, ccs, zs), 3)
vol.flags["casechk"] = ccs
if ccs == "y":
vol.flags["bcasechk"] = True
tags = self.args.mtp or [] tags = self.args.mtp or []
tags = [x.split("=")[0] for x in tags] tags = [x.split("=")[0] for x in tags]
tags = [y for x in tags for y in x.split(",")] tags = [y for x in tags for y in x.split(",")]

View file

@ -81,6 +81,7 @@ def vf_vmap() -> dict[str, str]:
} }
for k in ( for k in (
"bup_ck", "bup_ck",
"casechk",
"chmod_d", "chmod_d",
"chmod_f", "chmod_f",
"dbd", "dbd",
@ -244,6 +245,7 @@ flagcats = {
"no_db_ip": "never store uploader-IP in the db; disables unpost", "no_db_ip": "never store uploader-IP in the db; disables unpost",
"fat32": "avoid excessive reindexing on android sdcardfs", "fat32": "avoid excessive reindexing on android sdcardfs",
"dbd=[acid|swal|wal|yolo]": "database speed-durability tradeoff", "dbd=[acid|swal|wal|yolo]": "database speed-durability tradeoff",
"casechk=auto": "actively prevent case-insensitive filesystem? y/n",
"xlink": "cross-volume dupe detection / linking (dangerous)", "xlink": "cross-volume dupe detection / linking (dangerous)",
"xdev": "do not descend into other filesystems", "xdev": "do not descend into other filesystems",
"xvol": "do not follow symlinks leaving the volume root", "xvol": "do not follow symlinks leaving the volume root",

View file

@ -202,6 +202,9 @@ class FtpFs(AbstractedFS):
if r and not cr or w and not cw or m and not cm or d and not cd: if r and not cr or w and not cw or m and not cm or d and not cd:
raise FSE(t.format(vpath), 1) raise FSE(t.format(vpath), 1)
if "bcasechk" in vfs.flags and not vfs.casechk(rem, True):
raise FSE("No such file or directory", 1)
return os.path.join(vfs.realpath, rem), vfs, rem return os.path.join(vfs.realpath, rem), vfs, rem
except Pebkac as ex: except Pebkac as ex:
raise FSE(str(ex)) raise FSE(str(ex))

View file

@ -735,6 +735,9 @@ class HttpCli(object):
else: else:
avn = vn avn = vn
if "bcasechk" in vn.flags and not vn.casechk(rem, True):
return self.tx_404() and False
( (
self.can_read, self.can_read,
self.can_write, self.can_write,

View file

@ -1140,7 +1140,7 @@ class Up2k(object):
ft = "\033[0;32m{}{:.0}" ft = "\033[0;32m{}{:.0}"
ff = "\033[0;35m{}{:.0}" ff = "\033[0;35m{}{:.0}"
fv = "\033[0;36m{}:\033[90m{}" fv = "\033[0;36m{}:\033[90m{}"
zs = "du_iwho ext_th_d html_head put_name2 mv_re_r mv_re_t rm_re_r rm_re_t srch_re_dots srch_re_nodot zipmax zipmaxn_v zipmaxs_v" zs = "bcasechk du_iwho ext_th_d html_head put_name2 mv_re_r mv_re_t rm_re_r rm_re_t srch_re_dots srch_re_nodot zipmax zipmaxn_v zipmaxs_v"
fx = set(zs.split()) fx = set(zs.split())
fd = vf_bmap() fd = vf_bmap()
fd.update(vf_cmap()) fd.update(vf_cmap())
@ -4146,6 +4146,9 @@ class Up2k(object):
except: except:
raise Pebkac(400, "file not found on disk (already deleted?)") raise Pebkac(400, "file not found on disk (already deleted?)")
if "bcasechk" in vn.flags and not vn.casechk(rem, False):
raise Pebkac(400, "file does not exist case-sensitively")
scandir = not self.args.no_scandir scandir = not self.args.no_scandir
if is_dir: if is_dir:
# note: deletion inside shares would require a rewrite here; # note: deletion inside shares would require a rewrite here;
@ -4270,6 +4273,9 @@ class Up2k(object):
self.db_act = self.vol_act[svn_dbv.realpath] = time.time() self.db_act = self.vol_act[svn_dbv.realpath] = time.time()
st = bos.stat(sabs) st = bos.stat(sabs)
if "bcasechk" in svn.flags and not svn.casechk(srem, False):
raise Pebkac(400, "file does not exist case-sensitively")
if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
with self.mutex: with self.mutex:
try: try:
@ -4488,6 +4494,9 @@ class Up2k(object):
raise Pebkac(400, "mv: cannot move a mountpoint") raise Pebkac(400, "mv: cannot move a mountpoint")
st = bos.lstat(sabs) st = bos.lstat(sabs)
if "bcasechk" in svn.flags and not svn.casechk(srem, False):
raise Pebkac(400, "file does not exist case-sensitively")
if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
with self.mutex: with self.mutex:
try: try:

View file

@ -57,8 +57,10 @@ class TestVFS(unittest.TestCase):
t2 = list(sorted(lst)) t2 = list(sorted(lst))
self.assertEqual(t1, t2) self.assertEqual(t1, t2)
def test(self): def wipe_vfs(self, td):
td = os.path.join(self.td, "vfs") os.chdir("..")
if os.path.exists(td):
shutil.rmtree(td)
os.mkdir(td) os.mkdir(td)
os.chdir(td) os.chdir(td)
@ -72,7 +74,12 @@ class TestVFS(unittest.TestCase):
with open(fn, "w") as f: with open(fn, "w") as f:
f.write(fn) f.write(fn)
def test(self):
td = os.path.join(self.td, "vfs")
self.wipe_vfs(td)
# defaults # defaults
self.wipe_vfs(td)
vfs = AuthSrv(Cfg(), self.log).vfs vfs = AuthSrv(Cfg(), self.log).vfs
self.assertEqual(vfs.nodes, {}) self.assertEqual(vfs.nodes, {})
self.assertEqual(vfs.vpath, "") self.assertEqual(vfs.vpath, "")
@ -81,6 +88,7 @@ class TestVFS(unittest.TestCase):
self.assertAxs(vfs.axs.uwrite, ["*"]) self.assertAxs(vfs.axs.uwrite, ["*"])
# single read-only rootfs (relative path) # single read-only rootfs (relative path)
self.wipe_vfs(td)
vfs = AuthSrv(Cfg(v=["a/ab/::r"]), self.log).vfs vfs = AuthSrv(Cfg(v=["a/ab/::r"]), self.log).vfs
self.assertEqual(vfs.nodes, {}) self.assertEqual(vfs.nodes, {})
self.assertEqual(vfs.vpath, "") self.assertEqual(vfs.vpath, "")
@ -89,6 +97,7 @@ class TestVFS(unittest.TestCase):
self.assertAxs(vfs.axs.uwrite, []) self.assertAxs(vfs.axs.uwrite, [])
# single read-only rootfs (absolute path) # single read-only rootfs (absolute path)
self.wipe_vfs(td)
vfs = AuthSrv(Cfg(v=[td + "//a/ac/../aa//::r"]), self.log).vfs vfs = AuthSrv(Cfg(v=[td + "//a/ac/../aa//::r"]), self.log).vfs
self.assertEqual(vfs.nodes, {}) self.assertEqual(vfs.nodes, {})
self.assertEqual(vfs.vpath, "") self.assertEqual(vfs.vpath, "")
@ -97,6 +106,7 @@ class TestVFS(unittest.TestCase):
self.assertAxs(vfs.axs.uwrite, []) self.assertAxs(vfs.axs.uwrite, [])
# read-only rootfs with write-only subdirectory (read-write for k) # read-only rootfs with write-only subdirectory (read-write for k)
self.wipe_vfs(td)
vfs = AuthSrv( vfs = AuthSrv(
Cfg(a=["k:k"], v=[".::r:rw,k", "a/ac/acb:a/ac/acb:w:rw,k"]), Cfg(a=["k:k"], v=[".::r:rw,k", "a/ac/acb:a/ac/acb:w:rw,k"]),
self.log, self.log,
@ -161,6 +171,7 @@ class TestVFS(unittest.TestCase):
self.assertEqual(list(virt), []) self.assertEqual(list(virt), [])
# admin-only rootfs with all-read-only subfolder # admin-only rootfs with all-read-only subfolder
self.wipe_vfs(td)
vfs = AuthSrv( vfs = AuthSrv(
Cfg(a=["k:k"], v=[".::rw,k", "a:a:r"]), Cfg(a=["k:k"], v=[".::rw,k", "a:a:r"]),
self.log, self.log,
@ -185,6 +196,7 @@ class TestVFS(unittest.TestCase):
self.assertEqual(vfs.can_access("/a", "k"), perm_ro) self.assertEqual(vfs.can_access("/a", "k"), perm_ro)
# breadth-first construction # breadth-first construction
self.wipe_vfs(td)
vfs = AuthSrv( vfs = AuthSrv(
Cfg( Cfg(
v=[ v=[
@ -207,6 +219,7 @@ class TestVFS(unittest.TestCase):
self.undot(vfs, "./.././foo/..", "") self.undot(vfs, "./.././foo/..", "")
# shadowing # shadowing
self.wipe_vfs(td)
vfs = AuthSrv(Cfg(v=[".::r", "b:a/ac:r"]), self.log).vfs vfs = AuthSrv(Cfg(v=[".::r", "b:a/ac:r"]), self.log).vfs
fsp, r1, v1 = self.ls(vfs, "", "*") fsp, r1, v1 = self.ls(vfs, "", "*")
@ -244,6 +257,7 @@ class TestVFS(unittest.TestCase):
).encode("utf-8") ).encode("utf-8")
) )
self.wipe_vfs(td)
au = AuthSrv(Cfg(c=[cfg_path]), self.log) au = AuthSrv(Cfg(c=[cfg_path]), self.log)
self.assertEqual(au.acct["a"], "123") self.assertEqual(au.acct["a"], "123")
self.assertEqual(au.acct["asd"], "fgh:jkl") self.assertEqual(au.acct["asd"], "fgh:jkl")

View file

@ -17,7 +17,7 @@ from argparse import Namespace
import jinja2 import jinja2
from copyparty.__init__ import MACOS, WINDOWS, E from copyparty.__init__ import ANYWIN, MACOS, WINDOWS, E
J2_ENV = jinja2.Environment(loader=jinja2.BaseLoader) # type: ignore J2_ENV = jinja2.Environment(loader=jinja2.BaseLoader) # type: ignore
J2_FILES = J2_ENV.from_string("{{ files|join('\n') }}\nJ2EOT") J2_FILES = J2_ENV.from_string("{{ files|join('\n') }}\nJ2EOT")
@ -185,6 +185,7 @@ class Cfg(Namespace):
E=E, E=E,
auth_ord="idp,ipu", auth_ord="idp,ipu",
bup_ck="sha512", bup_ck="sha512",
casechk="a" if ANYWIN or MACOS else "n",
chmod_d="755", chmod_d="755",
cookie_cmax=8192, cookie_cmax=8192,
cookie_nmax=50, cookie_nmax=50,