diff --git a/copyparty/authsrv.py b/copyparty/authsrv.py
index 9dff0dad..8f053f68 100644
--- a/copyparty/authsrv.py
+++ b/copyparty/authsrv.py
@@ -441,7 +441,7 @@ class VFS(object):
def _find(self, vpath: str) -> tuple["VFS", str]:
"""return [vfs,remainder]"""
- if vpath == "":
+ if not vpath:
return self, ""
if "/" in vpath:
@@ -451,7 +451,7 @@ class VFS(object):
rem = ""
if name in self.nodes:
- return self.nodes[name]._find(undot(rem))
+ return self.nodes[name]._find(rem)
return self, vpath
diff --git a/copyparty/fsutil.py b/copyparty/fsutil.py
index 5ba9df43..ec587bb8 100644
--- a/copyparty/fsutil.py
+++ b/copyparty/fsutil.py
@@ -14,7 +14,7 @@ from .util import chkcmd, min_ex
if True: # pylint: disable=using-constant-test
from typing import Optional, Union
- from .util import RootLogger
+ from .util import RootLogger, undot
class Fstab(object):
@@ -52,7 +52,7 @@ class Fstab(object):
self.log(msg.format(path, fs, min_ex()), 3)
return fs
- path = path.lstrip("/")
+ path = undot(path)
try:
return self.cache[path]
except:
@@ -124,7 +124,7 @@ class Fstab(object):
if ANYWIN:
path = self._winpath(path)
- path = path.lstrip("/")
+ path = undot(path)
ptn = re.compile(r"^[^\\/]*")
vn, rem = self.tab._find(path)
if not self.trusted:
diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py
index 54ce564e..3689872f 100644
--- a/copyparty/httpcli.py
+++ b/copyparty/httpcli.py
@@ -19,7 +19,7 @@ import threading # typechk
import time
import uuid
from datetime import datetime
-from email.utils import formatdate, parsedate
+from email.utils import parsedate
from operator import itemgetter
import jinja2 # typechk
@@ -54,6 +54,7 @@ from .util import (
alltrace,
atomic_move,
exclude_dotfiles,
+ formatdate,
fsenc,
gen_filekey,
gen_filekey_dbg,
@@ -787,7 +788,7 @@ class HttpCli(object):
# close if unknown length, otherwise take client's preference
response.append("Connection: " + ("Keep-Alive" if self.keepalive else "Close"))
- response.append("Date: " + formatdate(usegmt=True))
+ response.append("Date: " + formatdate())
# headers{} overrides anything set previously
if headers:
@@ -811,9 +812,9 @@ class HttpCli(object):
self.cbonk(self.conn.hsrv.gmal, zs, "cc_hdr", "Cc in out-hdr")
raise Pebkac(999)
+ response.append("\r\n")
try:
- # best practice to separate headers and body into different packets
- self.s.sendall("\r\n".join(response).encode("utf-8") + b"\r\n\r\n")
+ self.s.sendall("\r\n".join(response).encode("utf-8"))
except:
raise Pebkac(400, "client d/c while replying headers")
@@ -1146,7 +1147,7 @@ class HttpCli(object):
return self.tx_mounts()
# conditional redirect to single volumes
- if self.vpath == "" and not self.ouparam:
+ if not self.vpath and not self.ouparam:
nread = len(self.rvol)
nwrite = len(self.wvol)
if nread + nwrite == 1 or (self.rvol == self.wvol and nread == 1):
@@ -1305,7 +1306,7 @@ class HttpCli(object):
pvs: dict[str, str] = {
"displayname": html_escape(rp.split("/")[-1]),
- "getlastmodified": formatdate(mtime, usegmt=True),
+ "getlastmodified": formatdate(mtime),
"resourcetype": '' if isdir else "",
"supportedlock": '',
}
@@ -2952,7 +2953,7 @@ class HttpCli(object):
return True
def _chk_lastmod(self, file_ts: int) -> tuple[str, bool]:
- file_lastmod = formatdate(file_ts, usegmt=True)
+ file_lastmod = formatdate(file_ts)
cli_lastmod = self.headers.get("if-modified-since")
if cli_lastmod:
try:
@@ -3034,8 +3035,8 @@ class HttpCli(object):
for n, fn in enumerate([".prologue.html", ".epilogue.html"]):
if lnames is not None and fn not in lnames:
continue
- fn = os.path.join(abspath, fn)
- if bos.path.exists(fn):
+ fn = "%s/%s" % (abspath, fn)
+ if bos.path.isfile(fn):
with open(fsenc(fn), "rb") as f:
logues[n] = f.read().decode("utf-8")
if "exp" in vn.flags:
@@ -3053,7 +3054,7 @@ class HttpCli(object):
fns = []
for fn in fns:
- fn = os.path.join(abspath, fn)
+ fn = "%s/%s" % (abspath, fn)
if bos.path.isfile(fn):
with open(fsenc(fn), "rb") as f:
readme = f.read().decode("utf-8")
@@ -3588,7 +3589,7 @@ class HttpCli(object):
# (useragent-sniffing kinshi due to caching proxies)
mime, ico = self.ico.get(txt, not small, "raster" in self.uparam)
- lm = formatdate(self.E.t0, usegmt=True)
+ lm = formatdate(self.E.t0)
self.reply(ico, mime=mime, headers={"Last-Modified": lm})
return True
diff --git a/copyparty/ssdp.py b/copyparty/ssdp.py
index f663d99b..5f8de79d 100644
--- a/copyparty/ssdp.py
+++ b/copyparty/ssdp.py
@@ -5,11 +5,11 @@ import errno
import re
import select
import socket
-from email.utils import formatdate
+import time
from .__init__ import TYPE_CHECKING
from .multicast import MC_Sck, MCast
-from .util import CachedSet, html_escape, min_ex
+from .util import CachedSet, formatdate, html_escape, min_ex
if TYPE_CHECKING:
from .broker_util import BrokerCli
@@ -229,7 +229,7 @@ CONFIGID.UPNP.ORG: 1
"""
v4 = srv.ip.replace("::ffff:", "")
- zs = zs.format(formatdate(usegmt=True), v4, srv.hport, self.args.zsid)
+ zs = zs.format(formatdate(), v4, srv.hport, self.args.zsid)
zb = zs[1:].replace("\n", "\r\n").encode("utf-8", "replace")
srv.sck.sendto(zb, addr[:2])
diff --git a/copyparty/szip.py b/copyparty/szip.py
index 02b51b5c..ec37016a 100644
--- a/copyparty/szip.py
+++ b/copyparty/szip.py
@@ -37,9 +37,7 @@ def dostime2unix(buf: bytes) -> int:
def unixtime2dos(ts: int) -> bytes:
- tt = time.gmtime(ts + 1)
- dy, dm, dd, th, tm, ts = list(tt)[:6]
-
+ dy, dm, dd, th, tm, ts, _, _, _ = time.gmtime(ts + 1)
bd = ((dy - 1980) << 9) + (dm << 5) + dd
bt = (th << 11) + (tm << 5) + ts // 2
try:
diff --git a/copyparty/tftpd.py b/copyparty/tftpd.py
index 04409f9d..785a4b5b 100644
--- a/copyparty/tftpd.py
+++ b/copyparty/tftpd.py
@@ -36,7 +36,7 @@ from partftpy.TftpShared import TftpException
from .__init__ import EXE, PY2, TYPE_CHECKING
from .authsrv import VFS
from .bos import bos
-from .util import BytesIO, Daemon, ODict, exclude_dotfiles, min_ex, runhook, undot
+from .util import UTC, BytesIO, Daemon, ODict, exclude_dotfiles, min_ex, runhook, undot
if True: # pylint: disable=using-constant-test
from typing import Any, Union
@@ -262,7 +262,7 @@ class Tftpd(object):
dirs1 = [(v.st_mtime, v.st_size, k + "/") for k, v in vfs_ls if k in dnames]
fils1 = [(v.st_mtime, v.st_size, k) for k, v in vfs_ls if k not in dnames]
real1 = dirs1 + fils1
- realt = [(datetime.fromtimestamp(mt), sz, fn) for mt, sz, fn in real1]
+ realt = [(datetime.fromtimestamp(mt, UTC), sz, fn) for mt, sz, fn in real1]
reals = [
(
"%04d-%02d-%02d %02d:%02d:%02d"
diff --git a/copyparty/util.py b/copyparty/util.py
index 74e34f0c..2b69b8e5 100644
--- a/copyparty/util.py
+++ b/copyparty/util.py
@@ -26,7 +26,6 @@ import threading
import time
import traceback
from collections import Counter
-from email.utils import formatdate
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
from queue import Queue
@@ -1821,10 +1820,21 @@ def gen_filekey_dbg(
return ret
+WKDAYS = "Mon Tue Wed Thu Fri Sat Sun".split()
+MONTHS = "Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec".split()
+RFC2822 = "%s, %02d %s %04d %02d:%02d:%02d GMT"
+
+
+def formatdate(ts: Optional[int] = None) -> str:
+ # gmtime ~= datetime.fromtimestamp(ts, UTC).timetuple()
+ y, mo, d, h, mi, s, wd, _, _ = time.gmtime(ts)
+ return RFC2822 % (WKDAYS[wd], d, MONTHS[mo - 1], y, h, mi, s)
+
+
def gencookie(k: str, v: str, r: str, tls: bool, dur: int = 0, txt: str = "") -> str:
v = v.replace("%", "%25").replace(";", "%3B")
if dur:
- exp = formatdate(time.time() + dur, usegmt=True)
+ exp = formatdate(time.time() + dur)
else:
exp = "Fri, 15 Aug 1997 01:00:00 GMT"
@@ -1839,12 +1849,10 @@ def humansize(sz: float, terse: bool = False) -> str:
sz /= 1024.0
- ret = " ".join([str(sz)[:4].rstrip("."), unit])
-
- if not terse:
- return ret
-
- return ret.replace("iB", "").replace(" ", "")
+ if terse:
+ return "%s%s" % (str(sz)[:4].rstrip("."), unit[:1])
+ else:
+ return "%s %s" % (str(sz)[:4].rstrip("."), unit)
def unhumanize(sz: str) -> int:
@@ -1896,7 +1904,7 @@ def uncyg(path: str) -> str:
def undot(path: str) -> str:
ret: list[str] = []
for node in path.split("/"):
- if node in ["", "."]:
+ if node == "." or not node:
continue
if node == "..":
@@ -2709,30 +2717,30 @@ def rmdirs_up(top: str, stop: str) -> tuple[list[str], list[str]]:
def unescape_cookie(orig: str) -> str:
# mw=idk; doot=qwe%2Crty%3Basd+fgh%2Bjkl%25zxc%26vbn # qwe,rty;asd fgh+jkl%zxc&vbn
- ret = ""
+ ret = []
esc = ""
for ch in orig:
if ch == "%":
- if len(esc) > 0:
- ret += esc
+ if esc:
+ ret.append(esc)
esc = ch
- elif len(esc) > 0:
+ elif esc:
esc += ch
if len(esc) == 3:
try:
- ret += chr(int(esc[1:], 16))
+ ret.append(chr(int(esc[1:], 16)))
except:
- ret += esc
+ ret.append(esc)
esc = ""
else:
- ret += ch
+ ret.append(ch)
- if len(esc) > 0:
- ret += esc
+ if esc:
+ ret.append(esc)
- return ret
+ return "".join(ret)
def guess_mime(url: str, fallback: str = "application/octet-stream") -> str:
diff --git a/tests/test_httpcli.py b/tests/test_httpcli.py
index 4b09743b..513a301a 100644
--- a/tests/test_httpcli.py
+++ b/tests/test_httpcli.py
@@ -168,6 +168,11 @@ class TestHttpCli(unittest.TestCase):
h, ret = self.put(url)
res = h.startswith("HTTP/1.1 201 ")
self.assertEqual(res, wok)
+ if wok:
+ vp = h.split("\nLocation: http://a:1/")[1].split("\r")[0]
+ vn, rem = self.asrv.vfs.get(vp, "*", False, False)
+ ap = os.path.join(vn.realpath, rem)
+ os.unlink(ap)
def can_rw(self, fp):
# lowest non-neutral folder declares permissions