From c98fff16472cf26dd894c65da2106beb3246612f Mon Sep 17 00:00:00 2001 From: ed Date: Sun, 5 Mar 2023 19:45:50 +0000 Subject: [PATCH] fix chunkpost-handshake race (affects --no-dedup only); a handshake arriving in the middle of the final chunk could cause dupes to become empty -- worst case leading to loss of data --- bin/up2k.py | 8 +++-- copyparty/httpcli.py | 17 +++------- copyparty/svchub.py | 9 ++---- copyparty/up2k.py | 72 ++++++++++++++++--------------------------- copyparty/util.py | 3 ++ copyparty/web/up2k.js | 5 +-- 6 files changed, 47 insertions(+), 67 deletions(-) diff --git a/bin/up2k.py b/bin/up2k.py index 4f01edd3..1cf94be7 100755 --- a/bin/up2k.py +++ b/bin/up2k.py @@ -3,7 +3,7 @@ from __future__ import print_function, unicode_literals """ up2k.py: upload to copyparty -2023-01-13, v1.2, ed , MIT-Licensed +2023-03-05, v1.3, ed , MIT-Licensed https://github.com/9001/copyparty/blob/hovudstraum/bin/up2k.py - dependencies: requests @@ -520,7 +520,11 @@ def handshake(ar, file, search): except Exception as ex: em = str(ex).split("SSLError(")[-1].split("\nURL: ")[0].strip() - if sc == 422 or "
partial upload exists at a different" in txt:
+            if (
+                sc == 422
+                or "
partial upload exists at a different" in txt
+                or "
source file busy; please try again" in txt
+            ):
                 file.recheck = True
                 return [], False
             elif sc == 409 or "
upload rejected, file already exists" in txt:
diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py
index df37eafe..c15514e6 100644
--- a/copyparty/httpcli.py
+++ b/copyparty/httpcli.py
@@ -1714,7 +1714,7 @@ class HttpCli(object):
             except:
                 raise Pebkac(500, min_ex())
 
-        x = self.conn.hsrv.broker.ask("up2k.handle_json", body)
+        x = self.conn.hsrv.broker.ask("up2k.handle_json", body, self.u2fh.aps)
         ret = x.get()
         if self.is_vproxied:
             if "purl" in ret:
@@ -1884,17 +1884,10 @@ class HttpCli(object):
             with self.mutex:
                 self.u2fh.close(path)
 
-        # windows cant rename open files
-        if ANYWIN and path != fin_path and not self.args.nw:
-            self.conn.hsrv.broker.ask("up2k.finish_upload", ptop, wark).get()
-
-        if not ANYWIN and not num_left:
-            times = (int(time.time()), int(lastmod))
-            self.log("no more chunks, setting times {}".format(times))
-            try:
-                bos.utime(fin_path, times)
-            except:
-                self.log("failed to utime ({}, {})".format(fin_path, times))
+        if not num_left and not self.args.nw:
+            self.conn.hsrv.broker.ask(
+                "up2k.finish_upload", ptop, wark, self.u2fh.aps
+            ).get()
 
         cinf = self.headers.get("x-up2k-stat", "")
 
diff --git a/copyparty/svchub.py b/copyparty/svchub.py
index 2440af8c..c3a74f47 100644
--- a/copyparty/svchub.py
+++ b/copyparty/svchub.py
@@ -149,12 +149,9 @@ class SvcHub(object):
             self.log("root", t.format(args.j))
 
         if not args.no_fpool and args.j != 1:
-            t = "WARNING: --use-fpool combined with multithreading is untested and can probably cause undefined behavior"
-            if ANYWIN:
-                t = 'windows cannot do multithreading without --no-fpool, so enabling that -- note that upload performance will suffer if you have microsoft defender "real-time protection" enabled, so you probably want to use -j 1 instead'
-                args.no_fpool = True
-
-            self.log("root", t, c=3)
+            t = "WARNING: ignoring --use-fpool because multithreading (-j{}) is enabled"
+            self.log("root", t.format(args.j), c=3)
+            args.no_fpool = True
 
         bri = "zy"[args.theme % 2 :][:1]
         ch = "abcdefghijklmnopqrstuvwx"[int(args.theme / 2)]
diff --git a/copyparty/up2k.py b/copyparty/up2k.py
index 5f6f2f9c..7d9ea017 100644
--- a/copyparty/up2k.py
+++ b/copyparty/up2k.py
@@ -124,6 +124,7 @@ class Up2k(object):
         self.droppable: dict[str, list[str]] = {}
         self.volstate: dict[str, str] = {}
         self.vol_act: dict[str, float] = {}
+        self.busy_aps: set[str] = set()
         self.dupesched: dict[str, list[tuple[str, str, float]]] = {}
         self.snap_persist_interval = 300  # persist unfinished index every 5 min
         self.snap_discard_interval = 21600  # drop unfinished after 6 hours inactivity
@@ -161,12 +162,6 @@ class Up2k(object):
             t = "could not initialize sqlite3, will use in-memory registry only"
             self.log(t, 3)
 
-        if ANYWIN:
-            # usually fails to set lastmod too quickly
-            self.lastmod_q: list[tuple[str, int, tuple[int, int], bool]] = []
-            self.lastmod_q2 = self.lastmod_q[:]
-            Daemon(self._lastmodder, "up2k-lastmod")
-
         self.fstab = Fstab(self.log_func)
         self.gen_fk = self._gen_fk if self.args.log_fk else gen_filekey
 
@@ -2113,7 +2108,8 @@ class Up2k(object):
             if cj["ptop"] not in self.registry:
                 raise Pebkac(410, "location unavailable")
 
-    def handle_json(self, cj: dict[str, Any]) -> dict[str, Any]:
+    def handle_json(self, cj: dict[str, Any], busy_aps: set[str]) -> dict[str, Any]:
+        self.busy_aps = busy_aps
         try:
             # bit expensive; 3.9=10x 3.11=2x
             if self.mutex.acquire(timeout=10):
@@ -2287,6 +2283,13 @@ class Up2k(object):
                     else:
                         # symlink to the client-provided name,
                         # returning the previous upload info
+                        if src in self.busy_aps or (
+                            wark in reg and "done" not in reg[wark]
+                        ):
+                            raise Pebkac(
+                                422, "source file busy; please try again later"
+                            )
+
                         job = deepcopy(job)
                         job["wark"] = wark
                         job["at"] = cj.get("at") or time.time()
@@ -2505,10 +2508,7 @@ class Up2k(object):
 
         if lmod and (not linked or SYMTIME):
             times = (int(time.time()), int(lmod))
-            if ANYWIN:
-                self.lastmod_q.append((dst, 0, times, False))
-            else:
-                bos.utime(dst, times, False)
+            bos.utime(dst, times, False)
 
     def handle_chunk(
         self, ptop: str, wark: str, chash: str
@@ -2589,13 +2589,10 @@ class Up2k(object):
                 self.regdrop(ptop, wark)
                 return ret, dst
 
-            # windows cant rename open files
-            if not ANYWIN or src == dst:
-                self._finish_upload(ptop, wark)
-
         return ret, dst
 
-    def finish_upload(self, ptop: str, wark: str) -> None:
+    def finish_upload(self, ptop: str, wark: str, busy_aps: set[str]) -> None:
+        self.busy_aps = busy_aps
         with self.mutex:
             self._finish_upload(ptop, wark)
 
@@ -2608,6 +2605,10 @@ class Up2k(object):
         except Exception as ex:
             raise Pebkac(500, "finish_upload, wark, " + repr(ex))
 
+        if job["need"]:
+            t = "finish_upload {} with remaining chunks {}"
+            raise Pebkac(500, t.format(wark, job["need"]))
+
         # self.log("--- " + wark + "  " + dst + " finish_upload atomic " + dst, 4)
         atomic_move(src, dst)
 
@@ -2615,14 +2616,15 @@ class Up2k(object):
         vflags = self.flags[ptop]
 
         times = (int(time.time()), int(job["lmod"]))
-        if ANYWIN:
-            z1 = (dst, job["size"], times, job["sprs"])
-            self.lastmod_q.append(z1)
-        elif not job["hash"]:
-            try:
-                bos.utime(dst, times)
-            except:
-                pass
+        self.log(
+            "no more chunks, setting times {} ({}) on {}".format(
+                times, bos.path.getsize(dst), dst
+            )
+        )
+        try:
+            bos.utime(dst, times)
+        except:
+            self.log("failed to utime ({}, {})".format(dst, times))
 
         zs = "prel name lmod size ptop vtop wark host user addr"
         z2 = [job[x] for x in zs.split()]
@@ -2643,6 +2645,7 @@ class Up2k(object):
         if self.idx_wark(vflags, *z2):
             del self.registry[ptop][wark]
         else:
+            self.registry[ptop][wark]["done"] = 1
             self.regdrop(ptop, wark)
 
         if wake_sr:
@@ -3426,27 +3429,6 @@ class Up2k(object):
         if not job["hash"]:
             self._finish_upload(job["ptop"], job["wark"])
 
-    def _lastmodder(self) -> None:
-        while True:
-            ready = self.lastmod_q2
-            self.lastmod_q2 = self.lastmod_q
-            self.lastmod_q = []
-
-            time.sleep(1)
-            for path, sz, times, sparse in ready:
-                self.log("lmod: setting times {} on {}".format(times, path))
-                try:
-                    bos.utime(path, times, False)
-                except:
-                    t = "lmod: failed to utime ({}, {}):\n{}"
-                    self.log(t.format(path, times, min_ex()))
-
-                if sparse and self.args.sparse and self.args.sparse * 1024 * 1024 <= sz:
-                    try:
-                        sp.check_call(["fsutil", "sparse", "setflag", path, "0"])
-                    except:
-                        self.log("could not unsparse [{}]".format(path), 3)
-
     def _snapshot(self) -> None:
         slp = self.snap_persist_interval
         while True:
diff --git a/copyparty/util.py b/copyparty/util.py
index 60c3b0e0..b618021f 100644
--- a/copyparty/util.py
+++ b/copyparty/util.py
@@ -668,6 +668,7 @@ class FHC(object):
 
     def __init__(self) -> None:
         self.cache: dict[str, FHC.CE] = {}
+        self.aps: set[str] = set()
 
     def close(self, path: str) -> None:
         try:
@@ -679,6 +680,7 @@ class FHC(object):
             fh.close()
 
         del self.cache[path]
+        self.aps.remove(path)
 
     def clean(self) -> None:
         if not self.cache:
@@ -699,6 +701,7 @@ class FHC(object):
         return self.cache[path].fhs.pop()
 
     def put(self, path: str, fh: typing.BinaryIO) -> None:
+        self.aps.add(path)
         try:
             ce = self.cache[path]
             ce.fhs.append(fh)
diff --git a/copyparty/web/up2k.js b/copyparty/web/up2k.js
index 96651460..2499a29f 100644
--- a/copyparty/web/up2k.js
+++ b/copyparty/web/up2k.js
@@ -2382,16 +2382,17 @@ function up2k_init(subtle) {
                 }
 
                 var err_pend = rsp.indexOf('partial upload exists at a different') + 1,
+                    err_srcb = rsp.indexOf('source file busy; please try again') + 1,
                     err_plug = rsp.indexOf('upload blocked by x') + 1,
                     err_dupe = rsp.indexOf('upload rejected, file already exists') + 1;
 
-                if (err_pend || err_plug || err_dupe) {
+                if (err_pend || err_srcb || err_plug || err_dupe) {
                     err = rsp;
                     ofs = err.indexOf('\n/');
                     if (ofs !== -1) {
                         err = err.slice(0, ofs + 1) + linksplit(err.slice(ofs + 2).trimEnd()).join(' ');
                     }
-                    if (!t.rechecks && err_pend) {
+                    if (!t.rechecks && (err_pend || err_srcb)) {
                         t.rechecks = 0;
                         t.want_recheck = true;
                     }