fix chunkpost-handshake race (affects --no-dedup only);

a handshake arriving in the middle of the final chunk could cause
dupes to become empty -- worst case leading to loss of data
This commit is contained in:
ed 2023-03-05 19:45:50 +00:00
parent 0009e31bd3
commit c98fff1647
6 changed files with 47 additions and 67 deletions

View file

@ -3,7 +3,7 @@ from __future__ import print_function, unicode_literals
"""
up2k.py: upload to copyparty
2023-01-13, v1.2, ed <irc.rizon.net>, MIT-Licensed
2023-03-05, v1.3, ed <irc.rizon.net>, MIT-Licensed
https://github.com/9001/copyparty/blob/hovudstraum/bin/up2k.py
- dependencies: requests
@ -520,7 +520,11 @@ def handshake(ar, file, search):
except Exception as ex:
em = str(ex).split("SSLError(")[-1].split("\nURL: ")[0].strip()
if sc == 422 or "<pre>partial upload exists at a different" in txt:
if (
sc == 422
or "<pre>partial upload exists at a different" in txt
or "<pre>source file busy; please try again" in txt
):
file.recheck = True
return [], False
elif sc == 409 or "<pre>upload rejected, file already exists" in txt:

View file

@ -1714,7 +1714,7 @@ class HttpCli(object):
except:
raise Pebkac(500, min_ex())
x = self.conn.hsrv.broker.ask("up2k.handle_json", body)
x = self.conn.hsrv.broker.ask("up2k.handle_json", body, self.u2fh.aps)
ret = x.get()
if self.is_vproxied:
if "purl" in ret:
@ -1884,17 +1884,10 @@ class HttpCli(object):
with self.mutex:
self.u2fh.close(path)
# windows cant rename open files
if ANYWIN and path != fin_path and not self.args.nw:
self.conn.hsrv.broker.ask("up2k.finish_upload", ptop, wark).get()
if not ANYWIN and not num_left:
times = (int(time.time()), int(lastmod))
self.log("no more chunks, setting times {}".format(times))
try:
bos.utime(fin_path, times)
except:
self.log("failed to utime ({}, {})".format(fin_path, times))
if not num_left and not self.args.nw:
self.conn.hsrv.broker.ask(
"up2k.finish_upload", ptop, wark, self.u2fh.aps
).get()
cinf = self.headers.get("x-up2k-stat", "")

View file

@ -149,12 +149,9 @@ class SvcHub(object):
self.log("root", t.format(args.j))
if not args.no_fpool and args.j != 1:
t = "WARNING: --use-fpool combined with multithreading is untested and can probably cause undefined behavior"
if ANYWIN:
t = 'windows cannot do multithreading without --no-fpool, so enabling that -- note that upload performance will suffer if you have microsoft defender "real-time protection" enabled, so you probably want to use -j 1 instead'
args.no_fpool = True
self.log("root", t, c=3)
t = "WARNING: ignoring --use-fpool because multithreading (-j{}) is enabled"
self.log("root", t.format(args.j), c=3)
args.no_fpool = True
bri = "zy"[args.theme % 2 :][:1]
ch = "abcdefghijklmnopqrstuvwx"[int(args.theme / 2)]

View file

@ -124,6 +124,7 @@ class Up2k(object):
self.droppable: dict[str, list[str]] = {}
self.volstate: dict[str, str] = {}
self.vol_act: dict[str, float] = {}
self.busy_aps: set[str] = set()
self.dupesched: dict[str, list[tuple[str, str, float]]] = {}
self.snap_persist_interval = 300 # persist unfinished index every 5 min
self.snap_discard_interval = 21600 # drop unfinished after 6 hours inactivity
@ -161,12 +162,6 @@ class Up2k(object):
t = "could not initialize sqlite3, will use in-memory registry only"
self.log(t, 3)
if ANYWIN:
# usually fails to set lastmod too quickly
self.lastmod_q: list[tuple[str, int, tuple[int, int], bool]] = []
self.lastmod_q2 = self.lastmod_q[:]
Daemon(self._lastmodder, "up2k-lastmod")
self.fstab = Fstab(self.log_func)
self.gen_fk = self._gen_fk if self.args.log_fk else gen_filekey
@ -2113,7 +2108,8 @@ class Up2k(object):
if cj["ptop"] not in self.registry:
raise Pebkac(410, "location unavailable")
def handle_json(self, cj: dict[str, Any]) -> dict[str, Any]:
def handle_json(self, cj: dict[str, Any], busy_aps: set[str]) -> dict[str, Any]:
self.busy_aps = busy_aps
try:
# bit expensive; 3.9=10x 3.11=2x
if self.mutex.acquire(timeout=10):
@ -2287,6 +2283,13 @@ class Up2k(object):
else:
# symlink to the client-provided name,
# returning the previous upload info
if src in self.busy_aps or (
wark in reg and "done" not in reg[wark]
):
raise Pebkac(
422, "source file busy; please try again later"
)
job = deepcopy(job)
job["wark"] = wark
job["at"] = cj.get("at") or time.time()
@ -2505,10 +2508,7 @@ class Up2k(object):
if lmod and (not linked or SYMTIME):
times = (int(time.time()), int(lmod))
if ANYWIN:
self.lastmod_q.append((dst, 0, times, False))
else:
bos.utime(dst, times, False)
bos.utime(dst, times, False)
def handle_chunk(
self, ptop: str, wark: str, chash: str
@ -2589,13 +2589,10 @@ class Up2k(object):
self.regdrop(ptop, wark)
return ret, dst
# windows cant rename open files
if not ANYWIN or src == dst:
self._finish_upload(ptop, wark)
return ret, dst
def finish_upload(self, ptop: str, wark: str) -> None:
def finish_upload(self, ptop: str, wark: str, busy_aps: set[str]) -> None:
self.busy_aps = busy_aps
with self.mutex:
self._finish_upload(ptop, wark)
@ -2608,6 +2605,10 @@ class Up2k(object):
except Exception as ex:
raise Pebkac(500, "finish_upload, wark, " + repr(ex))
if job["need"]:
t = "finish_upload {} with remaining chunks {}"
raise Pebkac(500, t.format(wark, job["need"]))
# self.log("--- " + wark + " " + dst + " finish_upload atomic " + dst, 4)
atomic_move(src, dst)
@ -2615,14 +2616,15 @@ class Up2k(object):
vflags = self.flags[ptop]
times = (int(time.time()), int(job["lmod"]))
if ANYWIN:
z1 = (dst, job["size"], times, job["sprs"])
self.lastmod_q.append(z1)
elif not job["hash"]:
try:
bos.utime(dst, times)
except:
pass
self.log(
"no more chunks, setting times {} ({}) on {}".format(
times, bos.path.getsize(dst), dst
)
)
try:
bos.utime(dst, times)
except:
self.log("failed to utime ({}, {})".format(dst, times))
zs = "prel name lmod size ptop vtop wark host user addr"
z2 = [job[x] for x in zs.split()]
@ -2643,6 +2645,7 @@ class Up2k(object):
if self.idx_wark(vflags, *z2):
del self.registry[ptop][wark]
else:
self.registry[ptop][wark]["done"] = 1
self.regdrop(ptop, wark)
if wake_sr:
@ -3426,27 +3429,6 @@ class Up2k(object):
if not job["hash"]:
self._finish_upload(job["ptop"], job["wark"])
def _lastmodder(self) -> None:
while True:
ready = self.lastmod_q2
self.lastmod_q2 = self.lastmod_q
self.lastmod_q = []
time.sleep(1)
for path, sz, times, sparse in ready:
self.log("lmod: setting times {} on {}".format(times, path))
try:
bos.utime(path, times, False)
except:
t = "lmod: failed to utime ({}, {}):\n{}"
self.log(t.format(path, times, min_ex()))
if sparse and self.args.sparse and self.args.sparse * 1024 * 1024 <= sz:
try:
sp.check_call(["fsutil", "sparse", "setflag", path, "0"])
except:
self.log("could not unsparse [{}]".format(path), 3)
def _snapshot(self) -> None:
slp = self.snap_persist_interval
while True:

View file

@ -668,6 +668,7 @@ class FHC(object):
def __init__(self) -> None:
self.cache: dict[str, FHC.CE] = {}
self.aps: set[str] = set()
def close(self, path: str) -> None:
try:
@ -679,6 +680,7 @@ class FHC(object):
fh.close()
del self.cache[path]
self.aps.remove(path)
def clean(self) -> None:
if not self.cache:
@ -699,6 +701,7 @@ class FHC(object):
return self.cache[path].fhs.pop()
def put(self, path: str, fh: typing.BinaryIO) -> None:
self.aps.add(path)
try:
ce = self.cache[path]
ce.fhs.append(fh)

View file

@ -2382,16 +2382,17 @@ function up2k_init(subtle) {
}
var err_pend = rsp.indexOf('partial upload exists at a different') + 1,
err_srcb = rsp.indexOf('source file busy; please try again') + 1,
err_plug = rsp.indexOf('upload blocked by x') + 1,
err_dupe = rsp.indexOf('upload rejected, file already exists') + 1;
if (err_pend || err_plug || err_dupe) {
if (err_pend || err_srcb || err_plug || err_dupe) {
err = rsp;
ofs = err.indexOf('\n/');
if (ofs !== -1) {
err = err.slice(0, ofs + 1) + linksplit(err.slice(ofs + 2).trimEnd()).join(' ');
}
if (!t.rechecks && err_pend) {
if (!t.rechecks && (err_pend || err_srcb)) {
t.rechecks = 0;
t.want_recheck = true;
}