From 728dc62d0b2f18791a6246f313aa0fcb3e325df4 Mon Sep 17 00:00:00 2001 From: ed Date: Thu, 16 Jun 2022 17:51:42 +0200 Subject: [PATCH] optimize nonsparse uploads (fat32, exfat, hpfs) --- bin/up2k.py | 17 +++-- copyparty/__init__.py | 2 +- copyparty/fsutil.py | 155 ++++++++++++++++++++++++++++++++++++++ copyparty/httpcli.py | 4 +- copyparty/up2k.py | 56 ++++++++------ copyparty/web/browser.css | 6 +- copyparty/web/browser.js | 19 ++--- copyparty/web/up2k.js | 19 ++++- 8 files changed, 235 insertions(+), 43 deletions(-) create mode 100644 copyparty/fsutil.py diff --git a/bin/up2k.py b/bin/up2k.py index 23d35ea3..b1e58745 100755 --- a/bin/up2k.py +++ b/bin/up2k.py @@ -3,11 +3,11 @@ from __future__ import print_function, unicode_literals """ up2k.py: upload to copyparty -2022-06-07, v0.14, ed , MIT-Licensed +2022-06-16, v0.15, ed , MIT-Licensed https://github.com/9001/copyparty/blob/hovudstraum/bin/up2k.py - dependencies: requests -- supports python 2.6, 2.7, and 3.3 through 3.10 +- supports python 2.6, 2.7, and 3.3 through 3.11 - almost zero error-handling - but if something breaks just try again and it'll autoresume @@ -410,7 +410,7 @@ def handshake(req_ses, url, file, pw, search): file.name = r["name"] file.wark = r["wark"] - return r["hash"] + return r["hash"], r["sprs"] def upload(req_ses, file, cid, pw): @@ -502,6 +502,7 @@ class Ctl(object): self.hasher_busy = 1 self.handshaker_busy = 0 self.uploader_busy = 0 + self.serialized = False self.t0 = time.time() self.t0_up = None @@ -530,7 +531,7 @@ class Ctl(object): burl = self.ar.url[:12] + self.ar.url[8:].split("/")[0] + "/" while True: print(" hs...") - hs = handshake(req_ses, self.ar.url, file, self.ar.a, search) + hs, _ = handshake(req_ses, self.ar.url, file, self.ar.a, search) if search: if hs: for hit in hs: @@ -710,7 +711,7 @@ class Ctl(object): upath = file.abs.decode("utf-8", "replace") try: - hs = handshake(req_ses, self.ar.url, file, self.ar.a, search) + hs, sprs = handshake(req_ses, self.ar.url, file, self.ar.a, search) except Exception as ex: if q == self.q_handshake and "
partial upload exists" in str(ex):
                     self.q_recheck.put(file)
@@ -735,6 +736,12 @@ class Ctl(object):
                 continue
 
             with self.mutex:
+                if not sprs and not self.serialized:
+                    t = "server filesystem does not support sparse files; serializing uploads\n"
+                    eprint(t)
+                    self.serialized = True
+                    for _ in range(self.ar.j - 1):
+                        self.q_upload.put(None)
                 if not hs:
                     # all chunks done
                     self.up_f += 1
diff --git a/copyparty/__init__.py b/copyparty/__init__.py
index 594363eb..d2fdbdd9 100644
--- a/copyparty/__init__.py
+++ b/copyparty/__init__.py
@@ -29,7 +29,7 @@ WINDOWS: Any = (
 VT100 = not WINDOWS or WINDOWS >= [10, 0, 14393]
 # introduced in anniversary update
 
-ANYWIN = WINDOWS or sys.platform in ["msys"]
+ANYWIN = WINDOWS or sys.platform in ["msys", "cygwin"]
 
 MACOS = platform.system() == "Darwin"
 
diff --git a/copyparty/fsutil.py b/copyparty/fsutil.py
new file mode 100644
index 00000000..3cb91dc4
--- /dev/null
+++ b/copyparty/fsutil.py
@@ -0,0 +1,155 @@
+# coding: utf-8
+from __future__ import print_function, unicode_literals
+
+import ctypes
+import time
+import re
+
+from .__init__ import ANYWIN, MACOS
+from .util import RootLogger, min_ex, chkcmd
+from .authsrv import VFS, AXS
+
+try:
+    from typing import Optional, Union
+except:
+    pass
+
+
+class Fstab(object):
+    def __init__(self, log: RootLogger):
+        self.log_func = log
+
+        self.no_sparse = set(
+            [
+                "fuse",  # termux-sdcard
+                "vfat",  # linux-efi
+                "fat32",
+                "fat16",
+                "fat12",
+                "fat-32",
+                "fat-16",
+                "fat-12",
+                "fat 32",
+                "fat 16",
+                "fat 12",
+                "exfat",
+                "ex-fat",
+                "ex fat",
+                "hpfs",  # macos
+            ]
+        )
+        self.tab: Optional[VFS] = None
+        self.cache: dict[str, str] = {}
+        self.age = 0.0
+
+    def log(self, msg: str, c: Union[int, str] = 0) -> None:
+        self.log_func("fstab", msg + "\033[K", c)
+
+    def get(self, path: str):
+        if time.time() - self.age > 600 or len(self.cache) > 9000:
+            self.age = time.time()
+            self.tab = None
+            self.cache = {}
+
+        fs = "ext4"
+        msg = "failed to determine filesystem at [{}]; assuming {}\n{}"
+
+        if ANYWIN:
+            fs = "vfat"  # can smb do sparse files? gonna guess no
+            try:
+                # good enough
+                disk = path.split(":", 1)[0]
+                disk = "{}:\\".format(disk).lower()
+                assert len(disk) == 3
+                path = disk
+            except:
+                self.log(msg.format(path, fs, min_ex()), 3)
+                return fs
+
+        try:
+            return self.cache[path]
+        except:
+            pass
+
+        try:
+            fs = self.get_w32(path) if ANYWIN else self.get_unix(path)
+        except:
+            self.log(msg.format(path, fs, min_ex()), 3)
+
+        fs = fs.lower()
+        self.cache[path] = fs
+        self.log("found {} at {}".format(fs, path))
+        return fs
+
+    def build_tab(self):
+        self.log("building tab")
+
+        sptn = r"^.*? on (.*) type ([^ ]+) \(.*"
+        if MACOS:
+            sptn = r"^.*? on (.*) \(([^ ]+), .*"
+
+        ptn = re.compile(sptn)
+        so, _ = chkcmd(["mount"])
+        tab1: list[tuple[str, str]] = []
+        for ln in so.split("\n"):
+            m = ptn.match(ln)
+            if not m:
+                continue
+
+            tab1.append(m.groups())
+
+        tab1.sort(key=lambda x: (len(x[0]), x[0]))
+        path1, fs1 = tab1[0]
+        tab = VFS(self.log_func, fs1, path1, AXS(), {})
+        for path, fs in tab1[1:]:
+            tab.add(fs, path.lstrip("/"))
+
+        self.tab = tab
+
+    def get_unix(self, path: str):
+        if not self.tab:
+            self.build_tab()
+
+        return self.tab._find(path)[0].realpath.split("/")[0]
+
+    def get_w32(self, path: str):
+        # list mountpoints: fsutil fsinfo drives
+
+        from ctypes.wintypes import LPCWSTR, LPWSTR, DWORD, LPDWORD, BOOL, MAX_PATH
+
+        def echk(rc, fun, args):
+            if not rc:
+                raise ctypes.WinError(ctypes.get_last_error())
+            return None
+
+        k32 = ctypes.WinDLL("kernel32", use_last_error=True)
+        k32.GetVolumeInformationW.errcheck = echk
+        k32.GetVolumeInformationW.restype = BOOL
+        k32.GetVolumeInformationW.argtypes = (
+            LPCWSTR,
+            LPWSTR,
+            DWORD,
+            LPDWORD,
+            LPDWORD,
+            LPDWORD,
+            LPWSTR,
+            DWORD,
+        )
+
+        bvolname = ctypes.create_unicode_buffer(MAX_PATH + 1)
+        bfstype = ctypes.create_unicode_buffer(MAX_PATH + 1)
+        serial = DWORD()
+        max_name_len = DWORD()
+        fs_flags = DWORD()
+
+        k32.GetVolumeInformationW(
+            path,
+            bvolname,
+            ctypes.sizeof(bvolname),
+            ctypes.byref(serial),
+            ctypes.byref(max_name_len),
+            ctypes.byref(fs_flags),
+            bfstype,
+            ctypes.sizeof(bfstype),
+        )
+        return bfstype.value
diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py
index cfc2bbf7..b6df6773 100644
--- a/copyparty/httpcli.py
+++ b/copyparty/httpcli.py
@@ -1031,7 +1031,7 @@ class HttpCli(object):
 
         x = self.conn.hsrv.broker.ask("up2k.handle_chunk", ptop, wark, chash)
         response = x.get()
-        chunksize, cstart, path, lastmod = response
+        chunksize, cstart, path, lastmod, sprs = response
 
         try:
             if self.args.nw:
@@ -1045,7 +1045,7 @@ class HttpCli(object):
             reader = read_socket(self.sr, remains)
 
             f = None
-            fpool = not self.args.no_fpool
+            fpool = not self.args.no_fpool and (not ANYWIN or sprs)
             if fpool:
                 with self.mutex:
                     try:
diff --git a/copyparty/up2k.py b/copyparty/up2k.py
index aff68917..cb678486 100644
--- a/copyparty/up2k.py
+++ b/copyparty/up2k.py
@@ -22,6 +22,7 @@ from .__init__ import ANYWIN, PY2, TYPE_CHECKING, WINDOWS
 from .authsrv import LEELOO_DALLAS, VFS, AuthSrv
 from .bos import bos
 from .mtag import MParser, MTag
+from .fsutil import Fstab
 from .util import (
     HAVE_SQLITE3,
     SYMTIME,
@@ -81,10 +82,9 @@ class Up2k(object):
         self.args = hub.args
         self.log_func = hub.log
 
-        # config
         self.salt = self.args.salt
+        self.r_hash = re.compile("^[0-9a-zA-Z_-]{44}$")
 
-        # state
         self.gid = 0
         self.mutex = threading.Lock()
         self.pp: Optional[ProgressPrinter] = None
@@ -121,19 +121,17 @@ class Up2k(object):
             self.sqlite_ver = tuple([int(x) for x in sqlite3.sqlite_version.split(".")])
             if self.sqlite_ver < (3, 9):
                 self.no_expr_idx = True
+        else:
+            self.log("could not initialize sqlite3, will use in-memory registry only")
 
         if ANYWIN:
             # usually fails to set lastmod too quickly
-            self.lastmod_q: list[tuple[str, int, tuple[int, int]]] = []
+            self.lastmod_q: list[tuple[str, int, tuple[int, int], bool]] = []
             thr = threading.Thread(target=self._lastmodder, name="up2k-lastmod")
             thr.daemon = True
             thr.start()
 
-        # static
-        self.r_hash = re.compile("^[0-9a-zA-Z_-]{44}$")
-
-        if not HAVE_SQLITE3:
-            self.log("could not initialize sqlite3, will use in-memory registry only")
+        self.fstab = Fstab(self.log_func)
 
         if self.args.no_fastboot:
             self.deferred_init()
@@ -1320,11 +1318,16 @@ class Up2k(object):
         wark = self._get_wark(cj)
         now = time.time()
         job = None
+        pdir = os.path.join(cj["ptop"], cj["prel"])
         try:
-            dev = bos.stat(os.path.join(cj["ptop"], cj["prel"])).st_dev
+            dev = bos.stat(pdir).st_dev
         except:
             dev = 0
 
+        # check if filesystem supports sparse files;
+        # refuse out-of-order / multithreaded uploading if sprs False
+        sprs = self.fstab.get(pdir) not in self.fstab.no_sparse
+
         with self.mutex:
             cur = self.cur.get(cj["ptop"])
             reg = self.registry[cj["ptop"]]
@@ -1356,6 +1359,7 @@ class Up2k(object):
                         "prel": dp_dir,
                         "vtop": cj["vtop"],
                         "ptop": cj["ptop"],
+                        "sprs": sprs,
                         "size": dsize,
                         "lmod": dtime,
                         "addr": ip,
@@ -1453,6 +1457,7 @@ class Up2k(object):
                 job = {
                     "wark": wark,
                     "t0": now,
+                    "sprs": sprs,
                     "hash": deepcopy(cj["hash"]),
                     "need": [],
                     "busy": {},
@@ -1489,6 +1494,7 @@ class Up2k(object):
                 "purl": purl,
                 "size": job["size"],
                 "lmod": job["lmod"],
+                "sprs": sprs,
                 "hash": job["need"],
                 "wark": wark,
             }
@@ -1562,13 +1568,13 @@ class Up2k(object):
         if lmod and (not linked or SYMTIME):
             times = (int(time.time()), int(lmod))
             if ANYWIN:
-                self.lastmod_q.append((dst, 0, times))
+                self.lastmod_q.append((dst, 0, times, False))
             else:
                 bos.utime(dst, times, False)
 
     def handle_chunk(
         self, ptop: str, wark: str, chash: str
-    ) -> tuple[int, list[int], str, float]:
+    ) -> tuple[int, list[int], str, float, bool]:
         with self.mutex:
             job = self.registry[ptop].get(wark)
             if not job:
@@ -1592,16 +1598,23 @@ class Up2k(object):
                 t = "that chunk is already being written to:\n  {}\n  {} {}/{}\n  {}"
                 raise Pebkac(400, t.format(wark, chash, idx, nh, job["name"]))
 
+            path = os.path.join(job["ptop"], job["prel"], job["tnam"])
+
+            chunksize = up2k_chunksize(job["size"])
+            ofs = [chunksize * x for x in nchunk]
+
+            if not job["sprs"]:
+                cur_sz = bos.path.getsize(path)
+                if ofs[0] > cur_sz:
+                    t = "please upload sequentially using one thread;\nserver filesystem does not support sparse files.\n  file: {}\n  chunk: {}\n  cofs: {}\n  flen: {}"
+                    t = t.format(job["name"], nchunk[0], ofs[0], cur_sz)
+                    raise Pebkac(400, t)
+
             job["busy"][chash] = 1
 
         job["poke"] = time.time()
 
-        chunksize = up2k_chunksize(job["size"])
-        ofs = [chunksize * x for x in nchunk]
-
-        path = os.path.join(job["ptop"], job["prel"], job["tnam"])
-
-        return chunksize, ofs, path, job["lmod"]
+        return chunksize, ofs, path, job["lmod"], job["sprs"]
 
     def release_chunk(self, ptop: str, wark: str, chash: str) -> bool:
         with self.mutex:
@@ -1660,7 +1673,7 @@ class Up2k(object):
 
         times = (int(time.time()), int(job["lmod"]))
         if ANYWIN:
-            z1 = (dst, job["size"], times)
+            z1 = (dst, job["size"], times, job["sprs"])
             self.lastmod_q.append(z1)
         elif not job["hash"]:
             try:
@@ -2189,6 +2202,7 @@ class Up2k(object):
             f, job["tnam"] = zfw["orz"]
             if (
                 ANYWIN
+                and job["sprs"]
                 and self.args.sparse
                 and self.args.sparse * 1024 * 1024 <= job["size"]
             ):
@@ -2198,7 +2212,7 @@ class Up2k(object):
                 except:
                     self.log("could not sparse [{}]".format(fp), 3)
 
-            if job["hash"]:
+            if job["hash"] and job["sprs"]:
                 f.seek(job["size"] - 1)
                 f.write(b"e")
 
@@ -2212,7 +2226,7 @@ class Up2k(object):
 
             # self.log("lmod: got {}".format(len(ready)))
             time.sleep(5)
-            for path, sz, times in ready:
+            for path, sz, times, sparse in ready:
                 self.log("lmod: setting times {} on {}".format(times, path))
                 try:
                     bos.utime(path, times, False)
@@ -2220,7 +2234,7 @@ class Up2k(object):
                     t = "lmod: failed to utime ({}, {}):\n{}"
                     self.log(t.format(path, times, min_ex()))
 
-                if self.args.sparse and self.args.sparse * 1024 * 1024 <= sz:
+                if sparse and self.args.sparse and self.args.sparse * 1024 * 1024 <= sz:
                     try:
                         sp.check_call(["fsutil", "sparse", "setflag", path, "0"])
                     except:
diff --git a/copyparty/web/browser.css b/copyparty/web/browser.css
index 9dee65cf..2af59a52 100644
--- a/copyparty/web/browser.css
+++ b/copyparty/web/browser.css
@@ -1068,6 +1068,9 @@ html.y #widget.open {
 @keyframes spin {
 	100% {transform: rotate(360deg)}
 }
+@media (prefers-reduced-motion) {
+	@keyframes spin { }
+}
 @keyframes fadein {
 	0% {opacity: 0}
 	100% {opacity: 1}
@@ -1541,7 +1544,8 @@ html.y #tree.nowrap .ntree a+a:hover {
 	margin: 1em .3em 1em 1em;
 	padding: 0 1.2em 0 0;
 	font-size: 4em;
-	animation: spin 1s linear infinite, fadein .2s ease;
+	opacity: 0;
+	animation: 1s linear .05s infinite forwards spin, .2s ease .05s 1 forwards fadein;
 	position: absolute;
 	z-index: 9;
 }
diff --git a/copyparty/web/browser.js b/copyparty/web/browser.js
index 56759702..3ff56f62 100644
--- a/copyparty/web/browser.js
+++ b/copyparty/web/browser.js
@@ -622,7 +622,7 @@ var Ls = {
 		"u_hashing": 'les',
 		"u_upping": 'sender',
 		"u_cuerr": "kunne ikke laste opp del {0} av {1};\nsikkert harmløst, fortsetter\n\nfil: {2}",
-		"u_cuerr2": "server nektet opplastningen (del {0} of {1});\n\nfile: {2}\n\nerror ",
+		"u_cuerr2": "server nektet opplastningen (del {0} av {1});\n\nfile: {2}\n\nerror ",
 		"u_ehsfin": "server nektet forespørselen om å ferdigstille filen",
 		"u_ehssrch": "server nektet forespørselen om å utføre søk",
 		"u_ehsinit": "server nektet forespørselen om å begynne en ny opplastning",
@@ -4983,22 +4983,17 @@ var treectl = (function () {
 })();
 
 
-var enspin_timer = null;
 function enspin(sel) {
-	clearTimeout(enspin_timer);
-	enspin_timer = setTimeout(function () {
-		despin(sel);
-		var d = mknod('div');
-		d.className = 'dumb_loader_thing';
-		d.innerHTML = '🌲';
-		var tgt = QS(sel);
-		tgt.insertBefore(d, tgt.childNodes[0]);
-	}, 50);
+	despin(sel);
+	var d = mknod('div');
+	d.className = 'dumb_loader_thing';
+	d.innerHTML = '🌲';
+	var tgt = QS(sel);
+	tgt.insertBefore(d, tgt.childNodes[0]);
 }
 
 
 function despin(sel) {
-	clearTimeout(enspin_timer);
 	var o = QSA(sel + '>.dumb_loader_thing');
 	for (var a = o.length - 1; a >= 0; a--)
 		o[a].parentNode.removeChild(o[a]);
diff --git a/copyparty/web/up2k.js b/copyparty/web/up2k.js
index 15297286..2093c6f3 100644
--- a/copyparty/web/up2k.js
+++ b/copyparty/web/up2k.js
@@ -1332,7 +1332,8 @@ function up2k_init(subtle) {
                 }
 
                 if (st.todo.upload.length &&
-                    st.busy.upload.length < parallel_uploads) {
+                    st.busy.upload.length < parallel_uploads &&
+                    can_upload_next()) {
                     exec_upload();
                     mou_ikkai = true;
                 }
@@ -1673,6 +1674,8 @@ function up2k_init(subtle) {
                     return;
                 }
 
+                t.sprs = response.sprs;
+
                 var rsp_purl = url_enc(response.purl);
                 if (rsp_purl !== t.purl || response.name !== t.name) {
                     // server renamed us (file exists / path restrictions)
@@ -1824,6 +1827,20 @@ function up2k_init(subtle) {
     ///   upload
     //
 
+    function can_upload_next() {
+        var upt = st.todo.upload[0],
+            upf = st.files[upt.nfile];
+
+        if (upf.sprs)
+            return true;
+
+        for (var a = 0, aa = st.busy.upload.length; a < aa; a++)
+            if (st.busy.upload[a].nfile == upt.nfile)
+                return false;
+
+        return true;
+    }
+
     function exec_upload() {
         var upt = st.todo.upload.shift();
         st.busy.upload.push(upt);