From 088d08963f97f58fce5bc4300dd68ee66f2736eb Mon Sep 17 00:00:00 2001 From: ed Date: Fri, 1 Oct 2021 00:33:45 +0200 Subject: [PATCH] u2cli: add multithreading --- README.md | 4 +- bin/README.md | 6 ++ bin/up2k.py | 232 ++++++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 213 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index e6789ee1..ce7ed73a 100644 --- a/README.md +++ b/README.md @@ -770,8 +770,8 @@ interact with copyparty using non-browser clients `chunk , MIT-Licensed +2021-09-30, v0.4, ed , MIT-Licensed https://github.com/9001/copyparty/blob/hovudstraum/bin/up2k.py - dependencies: requests - supports python 2.7 and 3.3 through 3.10 -- no parallel hashing / uploads yet, so browsers are faster - almost zero error-handling - but if something breaks just try again and it'll autoresume """ @@ -54,37 +53,40 @@ class File(object): """an up2k upload task; represents a single file""" def __init__(self, top, rel, size, lmod): - self.top = top - self.rel = rel.replace(b"\\", b"/") - self.size = size - self.lmod = lmod + self.top = top # type: bytes + self.rel = rel.replace(b"\\", b"/") # type: bytes + self.size = size # type: int + self.lmod = lmod # type: float - self.abs = os.path.join(top, rel) - self.name = self.rel.split(b"/")[-1].decode("utf-8", "replace") + self.abs = os.path.join(top, rel) # type: bytes + self.name = self.rel.split(b"/")[-1].decode("utf-8", "replace") # type: str # set by get_hashlist - self.cids = [] # [ hash, ofs, sz ] - self.kchunks = {} # hash: [ ofs, sz ] + self.cids = [] # type: list[tuple[str, int, int]] # [ hash, ofs, sz ] + self.kchunks = {} # type: dict[str, tuple[int, int]] # hash: [ ofs, sz ] # set by handshake - self.ucids = [] # chunks which need to be uploaded - self.wark = None - self.url = None + self.ucids = [] # type: list[str] # chunks which need to be uploaded + self.wark = None # type: str + self.url = None # type: str # set by upload - self.uploading = [] # chunks currently being uploaded + self.up_b = 0 # type: int + self.up_c = 0 # type: int # m = "size({}) lmod({}) top({}) rel({}) abs({}) name({})" - # print(m.format(self.size, self.lmod, self.top, self.rel, self.abs, self.name)) + # eprint(m.format(self.size, self.lmod, self.top, self.rel, self.abs, self.name)) class FileSlice(object): """file-like object providing a fixed window into a file""" def __init__(self, file, cid): + # type: (File, str) -> FileSlice + self.car, self.len = file.kchunks[cid] self.cdr = self.car + self.len - self.ofs = 0 + self.ofs = 0 # type: int self.f = open(file.abs, "rb", 512 * 1024) self.f.seek(self.car) @@ -121,6 +123,16 @@ class FileSlice(object): return ret +def eprint(*a, **ka): + ka["file"] = sys.stderr + if not PY2: + ka["flush"] = True + + print(*a, **ka) + if PY2: + sys.stderr.flush() + + def statdir(top): """non-recursive listing of directory contents, along with stat() info""" if hasattr(os, "scandir"): @@ -215,6 +227,9 @@ def get_hashlist(file, pcb): file_ofs += chunk_sz file_rem -= chunk_sz + if pcb: + pcb(file, file_ofs) + file.cids = ret file.kchunks = {k: [v1, v2] for k, v1, v2 in ret} @@ -291,8 +306,8 @@ def upload(req_ses, file, cid, pw): class Daemon(threading.Thread): - def __init__(self): - threading.Thread.__init__(self) + def __init__(self, *a, **ka): + threading.Thread.__init__(self, *a, **ka) self.daemon = True @@ -309,7 +324,7 @@ class Ctl(object): os.path.abspath(os.path.realpath(x.encode("utf-8"))) for x in ar.files ] - print("\nscanning {} locations".format(len(ar.files))) + eprint("\nscanning {} locations".format(len(ar.files))) nfiles = 0 nbytes = 0 @@ -317,7 +332,7 @@ class Ctl(object): nfiles += 1 nbytes += inf.st_size - print("found {} files, {}\n".format(nfiles, humansize(nbytes))) + eprint("found {} files, {}\n".format(nfiles, humansize(nbytes))) self.nfiles = nfiles self.nbytes = nbytes @@ -327,9 +342,10 @@ class Ctl(object): req_ses.verify = ar.te self.filegen = walkdirs(ar.files) - ar.safe = True if ar.safe: - return self.safe() + self.safe() + else: + self.fancy() def safe(self): """minimal basic slow boring fallback codepath""" @@ -364,6 +380,166 @@ class Ctl(object): print(" ok!") + def fancy(self): + self.hash_f = 0 + self.hash_c = 0 + self.hash_b = 0 + self.up_f = 0 + self.up_c = 0 + self.up_b = 0 + self.hasher_busy = 1 + self.handshaker_busy = 0 + self.uploader_busy = 0 + + self.mutex = threading.Lock() + self.q_handshake = Queue() # type: Queue[File] + self.q_recheck = Queue() # type: Queue[File] # partial upload exists [...] + self.q_upload = Queue() # type: Queue[tuple[File, str]] + + Daemon(target=self.hasher).start() + for _ in range(self.ar.j): + Daemon(target=self.handshaker).start() + Daemon(target=self.uploader).start() + + while True: + time.sleep(0.1) + with self.mutex: + if ( + self.q_handshake.empty() + and self.q_upload.empty() + and not self.hasher_busy + and not self.handshaker_busy + and not self.uploader_busy + ): + break + + def cb_hasher(self, file, ofs): + eprint(".", end="") + + def hasher(self): + for nf, (top, rel, inf) in enumerate(self.filegen): + file = File(top, rel, inf.st_size, inf.st_mtime) + upath = file.abs.decode("utf-8", "replace") + while True: + with self.mutex: + if ( + self.hash_b - self.up_b < 1024 * 1024 * 128 + and self.hash_c - self.up_c < 64 + and ( + not self.ar.nh + or ( + self.q_upload.empty() + and self.q_handshake.empty() + and not self.uploader_busy + ) + ) + ): + break + + time.sleep(0.05) + + eprint("\n{:6d} hash {}\n".format(self.nfiles - nf, upath), end="") + get_hashlist(file, self.cb_hasher) + with self.mutex: + self.hash_f += 1 + self.hash_c += len(file.cids) + self.hash_b += file.size + + self.q_handshake.put(file) + + self.hasher_busy = 0 + + def handshaker(self): + search = self.ar.s + q = self.q_handshake + while True: + file = q.get() + if not file: + if q == self.q_handshake: + q = self.q_recheck + q.put(None) + continue + + self.q_upload.put(None) + break + + with self.mutex: + self.handshaker_busy += 1 + + upath = file.abs.decode("utf-8", "replace") + eprint("\n handshake {}\n".format(upath), end="") + + try: + hs = handshake(req_ses, self.ar.url, file, self.ar.a, search) + except Exception as ex: + if q == self.q_handshake and "
partial upload exists" in str(ex):
+                    self.q_recheck.put(file)
+                    hs = []
+                else:
+                    raise
+
+            if search:
+                if hs:
+                    for hit in hs:
+                        eprint("     found: {}{}".format(self.ar.url, hit["rp"]))
+                else:
+                    eprint("  NOT found {}".format(upath))
+
+                with self.mutex:
+                    self.up_f += 1
+                    self.up_c += len(file.cids)
+                    self.up_b += file.size
+                    self.handshaker_busy -= 1
+
+                continue
+
+            with self.mutex:
+                if not hs:
+                    # all chunks done
+                    self.up_f += 1
+                    self.up_c += len(file.cids) - file.up_c
+                    self.up_b += file.size - file.up_b
+
+                if hs and self.up_c:
+                    # some chunks failed
+                    self.up_c -= len(hs)
+                    file.up_c -= len(hs)
+                    for cid in hs:
+                        sz = file.kchunks[cid][1]
+                        self.up_b -= sz
+                        file.up_b -= sz
+
+                file.ucids = hs
+                self.handshaker_busy -= 1
+
+            for cid in hs:
+                self.q_upload.put([file, cid])
+
+    def uploader(self):
+        while True:
+            task = self.q_upload.get()
+            if not task:
+                break
+
+            eprint("*", end="")
+            with self.mutex:
+                self.uploader_busy += 1
+
+            file, cid = task
+            upload(req_ses, file, cid, self.ar.a)
+
+            with self.mutex:
+                sz = file.kchunks[cid][1]
+                file.ucids = [x for x in file.ucids if x != cid]
+                if not file.ucids:
+                    self.q_handshake.put(file)
+
+                file.up_b += sz
+                self.up_b += sz
+                file.up_c += 1
+                self.up_c += 1
+                self.uploader_busy -= 1
+
 
 def main():
     time.strptime("19970815", "%Y%m%d")  # python#7980
@@ -371,19 +547,21 @@ def main():
         os.system("rem")  # enables colors
 
     # fmt: off
-    ap = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+    ap = app = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
     ap.add_argument("url", type=unicode, help="server url, including destination folder")
     ap.add_argument("files", type=unicode, nargs="+", help="files and/or folders to process")
     ap.add_argument("-a", metavar="PASSWORD", help="password")
     ap.add_argument("-s", action="store_true", help="file-search (disables upload)")
+    ap = app.add_argument_group("performance tweaks")
+    ap.add_argument("-j", type=int, metavar="THREADS", default=4, help="parallel connections")
+    ap.add_argument("-nh", action="store_true", help="disable hashing while uploading")
+    ap.add_argument("--safe", action="store_true", help="use simple fallback approach")
+    ap = app.add_argument_group("tls")
     ap.add_argument("-te", metavar="PEM_FILE", help="certificate to expect/verify")
     ap.add_argument("-td", action="store_true", help="disable certificate check")
-    ap.add_argument("--safe", action="store_true", help="use simple fallback approach")
-    # ap.add_argument("-j", type=int, default=2, help="parallel connections")
-    # ap.add_argument("-nh", action="store_true", help="disable hashing while uploading")
     # fmt: on
 
-    Ctl(ap.parse_args())
+    Ctl(app.parse_args())
 
 
 if __name__ == "__main__":