From 3a800585bcdfda105330bcc74d5f3a64de9b5892 Mon Sep 17 00:00:00 2001 From: ed Date: Tue, 29 Nov 2022 22:09:32 +0000 Subject: [PATCH] u2cli: server is allowed to reject dupes --- bin/up2k.py | 70 ++++++++++++++++++++++++++----------------- copyparty/up2k.py | 4 +-- copyparty/web/up2k.js | 4 +-- 3 files changed, 47 insertions(+), 31 deletions(-) diff --git a/bin/up2k.py b/bin/up2k.py index 2b284b9d..801f3f7c 100755 --- a/bin/up2k.py +++ b/bin/up2k.py @@ -3,7 +3,7 @@ from __future__ import print_function, unicode_literals """ up2k.py: upload to copyparty -2022-11-04, v0.21, ed , MIT-Licensed +2022-11-29, v0.22, ed , MIT-Licensed https://github.com/9001/copyparty/blob/hovudstraum/bin/up2k.py - dependencies: requests @@ -94,6 +94,7 @@ class File(object): self.kchunks = {} # type: dict[str, tuple[int, int]] # hash: [ ofs, sz ] # set by handshake + self.recheck = False # duplicate; redo handshake after all files done self.ucids = [] # type: list[str] # chunks which need to be uploaded self.wark = None # type: str self.url = None # type: str @@ -472,8 +473,8 @@ def get_hashlist(file, pcb, mth): file.kchunks[k] = [v1, v2] -def handshake(req_ses, url, file, pw, search): - # type: (requests.Session, str, File, any, bool) -> list[str] +def handshake(url, file, pw, search): + # type: (str, File, Any, bool) -> tuple[list[str], bool] """ performs a handshake with the server; reply is: if search, a list of search results @@ -507,6 +508,17 @@ def handshake(req_ses, url, file, pw, search): eprint("handshake failed, retrying: {0}\n {1}\n\n".format(file.name, em)) time.sleep(1) + sc = r.status_code + if sc >= 400: + txt = r.text + if sc == 422 or "
partial upload exists at a different" in txt:
+            file.recheck = True
+            return [], False
+        elif sc == 409 or "
upload rejected, file already exists" in txt:
+            return [], False
+
+        raise Exception("http {0}: {1}".format(sc, txt))
+
     try:
         r = r.json()
     except:
@@ -528,8 +540,8 @@ def handshake(req_ses, url, file, pw, search):
     return r["hash"], r["sprs"]
 
 
-def upload(req_ses, file, cid, pw):
-    # type: (requests.Session, File, str, any) -> None
+def upload(file, cid, pw):
+    # type: (File, str, Any) -> None
     """upload one specific chunk, `cid` (a chunk-hash)"""
 
     headers = {
@@ -626,8 +638,8 @@ class Ctl(object):
 
             self.mutex = threading.Lock()
             self.q_handshake = Queue()  # type: Queue[File]
-            self.q_recheck = Queue()  # type: Queue[File]  # partial upload exists [...]
             self.q_upload = Queue()  # type: Queue[tuple[File, str]]
+            self.recheck = []  # type: list[File]
 
             self.st_hash = [None, "(idle, starting...)"]  # type: tuple[File, int]
             self.st_up = [None, "(idle, starting...)"]  # type: tuple[File, int]
@@ -649,7 +661,7 @@ class Ctl(object):
             burl = self.ar.url[:12] + self.ar.url[8:].split("/")[0] + "/"
             while True:
                 print("  hs...")
-                hs, _ = handshake(req_ses, self.ar.url, file, self.ar.a, search)
+                hs, _ = handshake(self.ar.url, file, self.ar.a, search)
                 if search:
                     if hs:
                         for hit in hs:
@@ -666,9 +678,18 @@ class Ctl(object):
                 ncs = len(hs)
                 for nc, cid in enumerate(hs):
                     print("  {0} up {1}".format(ncs - nc, cid))
-                    upload(req_ses, file, cid, self.ar.a)
+                    upload(file, cid, self.ar.a)
 
             print("  ok!")
+            if file.recheck:
+                self.recheck.append(file)
+
+        if not self.recheck:
+            return
+
+        eprint("finalizing {0} duplicate files".format(len(self.recheck)))
+        for file in self.recheck:
+            handshake(self.ar.url, file, self.ar.a, search)
 
     def _fancy(self):
         if VT100:
@@ -740,6 +761,13 @@ class Ctl(object):
             t = "{0} eta @ {1}/s, {2}, {3}# left".format(eta, spd, sleft, nleft)
             eprint(txt + "\033]0;{0}\033\\\r{0}{1}".format(t, tail))
 
+        if not self.recheck:
+            return
+
+        eprint("finalizing {0} duplicate files".format(len(self.recheck)))
+        for file in self.recheck:
+            handshake(self.ar.url, file, self.ar.a, False)
+
     def cleanup_vt100(self):
         ss.scroll_region(None)
         eprint("\033[J\033]0;\033\\")
@@ -812,16 +840,10 @@ class Ctl(object):
 
     def handshaker(self):
         search = self.ar.s
-        q = self.q_handshake
         burl = self.ar.url[:8] + self.ar.url[8:].split("/")[0] + "/"
         while True:
-            file = q.get()
+            file = self.q_handshake.get()
             if not file:
-                if q == self.q_handshake:
-                    q = self.q_recheck
-                    q.put(None)
-                    continue
-
                 self.q_upload.put(None)
                 break
 
@@ -829,16 +851,7 @@ class Ctl(object):
                 self.handshaker_busy += 1
 
             upath = file.abs.decode("utf-8", "replace")
-
-            try:
-                hs, sprs = handshake(req_ses, self.ar.url, file, self.ar.a, search)
-            except Exception as ex:
-                if q == self.q_handshake and "
partial upload exists" in str(ex):
-                    self.q_recheck.put(file)
-                    hs = []
-                else:
-                    raise
-
+            hs, sprs = handshake(self.ar.url, file, self.ar.a, search)
             if search:
                 if hs:
                     for hit in hs:
@@ -855,8 +868,11 @@ class Ctl(object):
 
                 continue
 
+            if file.recheck:
+                self.recheck.append(file)
+
             with self.mutex:
-                if not sprs and not self.serialized:
+                if hs and not sprs and not self.serialized:
                     t = "server filesystem does not support sparse files; serializing uploads\n"
                     eprint(t)
                     self.serialized = True
@@ -899,7 +915,7 @@ class Ctl(object):
 
             file, cid = task
             try:
-                upload(req_ses, file, cid, self.ar.a)
+                upload(file, cid, self.ar.a)
             except:
                 eprint("upload failed, retrying: {0} #{1}\n".format(file.name, cid[:8]))
                 pass  # handshake will fix it
diff --git a/copyparty/up2k.py b/copyparty/up2k.py
index c3e1e4a0..7cebc177 100644
--- a/copyparty/up2k.py
+++ b/copyparty/up2k.py
@@ -2005,13 +2005,13 @@ class Up2k(object):
                             except:
                                 self.dupesched[src] = [dupe]
 
-                        raise Pebkac(400, err)
+                        raise Pebkac(422, err)
 
                     elif "nodupe" in self.flags[job["ptop"]]:
                         self.log("dupe-reject:\n  {0}\n  {1}".format(src, dst))
                         err = "upload rejected, file already exists:\n"
                         err += "/" + quotep(vsrc) + " "
-                        raise Pebkac(400, err)
+                        raise Pebkac(409, err)
                     else:
                         # symlink to the client-provided name,
                         # returning the previous upload info
diff --git a/copyparty/web/up2k.js b/copyparty/web/up2k.js
index ccaff43a..542937e1 100644
--- a/copyparty/web/up2k.js
+++ b/copyparty/web/up2k.js
@@ -2297,8 +2297,8 @@ function up2k_init(subtle) {
                     return;
                 }
 
-                var err_pend = rsp.indexOf('partial upload exists') + 1,
-                    err_dupe = rsp.indexOf('file already exists') + 1;
+                var err_pend = rsp.indexOf('partial upload exists at a different') + 1,
+                    err_dupe = rsp.indexOf('upload rejected, file already exists') + 1;
 
                 if (err_pend || err_dupe) {
                     err = rsp;