optimize nonsparse uploads (fat32, exfat, hpfs)

This commit is contained in:
ed 2022-06-16 17:51:42 +02:00
parent 2dfe1b1c6b
commit 728dc62d0b
8 changed files with 235 additions and 43 deletions

View file

@ -3,11 +3,11 @@ from __future__ import print_function, unicode_literals
""" """
up2k.py: upload to copyparty up2k.py: upload to copyparty
2022-06-07, v0.14, ed <irc.rizon.net>, MIT-Licensed 2022-06-16, v0.15, ed <irc.rizon.net>, MIT-Licensed
https://github.com/9001/copyparty/blob/hovudstraum/bin/up2k.py https://github.com/9001/copyparty/blob/hovudstraum/bin/up2k.py
- dependencies: requests - dependencies: requests
- supports python 2.6, 2.7, and 3.3 through 3.10 - supports python 2.6, 2.7, and 3.3 through 3.11
- almost zero error-handling - almost zero error-handling
- but if something breaks just try again and it'll autoresume - but if something breaks just try again and it'll autoresume
@ -410,7 +410,7 @@ def handshake(req_ses, url, file, pw, search):
file.name = r["name"] file.name = r["name"]
file.wark = r["wark"] file.wark = r["wark"]
return r["hash"] return r["hash"], r["sprs"]
def upload(req_ses, file, cid, pw): def upload(req_ses, file, cid, pw):
@ -502,6 +502,7 @@ class Ctl(object):
self.hasher_busy = 1 self.hasher_busy = 1
self.handshaker_busy = 0 self.handshaker_busy = 0
self.uploader_busy = 0 self.uploader_busy = 0
self.serialized = False
self.t0 = time.time() self.t0 = time.time()
self.t0_up = None self.t0_up = None
@ -530,7 +531,7 @@ class Ctl(object):
burl = self.ar.url[:12] + self.ar.url[8:].split("/")[0] + "/" burl = self.ar.url[:12] + self.ar.url[8:].split("/")[0] + "/"
while True: while True:
print(" hs...") print(" hs...")
hs = handshake(req_ses, self.ar.url, file, self.ar.a, search) hs, _ = handshake(req_ses, self.ar.url, file, self.ar.a, search)
if search: if search:
if hs: if hs:
for hit in hs: for hit in hs:
@ -710,7 +711,7 @@ class Ctl(object):
upath = file.abs.decode("utf-8", "replace") upath = file.abs.decode("utf-8", "replace")
try: try:
hs = handshake(req_ses, self.ar.url, file, self.ar.a, search) hs, sprs = handshake(req_ses, self.ar.url, file, self.ar.a, search)
except Exception as ex: except Exception as ex:
if q == self.q_handshake and "<pre>partial upload exists" in str(ex): if q == self.q_handshake and "<pre>partial upload exists" in str(ex):
self.q_recheck.put(file) self.q_recheck.put(file)
@ -735,6 +736,12 @@ class Ctl(object):
continue continue
with self.mutex: with self.mutex:
if not sprs and not self.serialized:
t = "server filesystem does not support sparse files; serializing uploads\n"
eprint(t)
self.serialized = True
for _ in range(self.ar.j - 1):
self.q_upload.put(None)
if not hs: if not hs:
# all chunks done # all chunks done
self.up_f += 1 self.up_f += 1

View file

@ -29,7 +29,7 @@ WINDOWS: Any = (
VT100 = not WINDOWS or WINDOWS >= [10, 0, 14393] VT100 = not WINDOWS or WINDOWS >= [10, 0, 14393]
# introduced in anniversary update # introduced in anniversary update
ANYWIN = WINDOWS or sys.platform in ["msys"] ANYWIN = WINDOWS or sys.platform in ["msys", "cygwin"]
MACOS = platform.system() == "Darwin" MACOS = platform.system() == "Darwin"

155
copyparty/fsutil.py Normal file
View file

@ -0,0 +1,155 @@
# coding: utf-8
from __future__ import print_function, unicode_literals
import ctypes
import time
import re
from .__init__ import ANYWIN, MACOS
from .util import RootLogger, min_ex, chkcmd
from .authsrv import VFS, AXS
try:
from typing import Optional, Union
except:
pass
class Fstab(object):
def __init__(self, log: RootLogger):
self.log_func = log
self.no_sparse = set(
[
"fuse", # termux-sdcard
"vfat", # linux-efi
"fat32",
"fat16",
"fat12",
"fat-32",
"fat-16",
"fat-12",
"fat 32",
"fat 16",
"fat 12",
"exfat",
"ex-fat",
"ex fat",
"hpfs", # macos
]
)
self.tab: Optional[VFS] = None
self.cache: dict[str, str] = {}
self.age = 0.0
def log(self, msg: str, c: Union[int, str] = 0) -> None:
self.log_func("fstab", msg + "\033[K", c)
def get(self, path: str):
if time.time() - self.age > 600 or len(self.cache) > 9000:
self.age = time.time()
self.tab = None
self.cache = {}
fs = "ext4"
msg = "failed to determine filesystem at [{}]; assuming {}\n{}"
if ANYWIN:
fs = "vfat" # can smb do sparse files? gonna guess no
try:
# good enough
disk = path.split(":", 1)[0]
disk = "{}:\\".format(disk).lower()
assert len(disk) == 3
path = disk
except:
self.log(msg.format(path, fs, min_ex()), 3)
return fs
try:
return self.cache[path]
except:
pass
try:
fs = self.get_w32(path) if ANYWIN else self.get_unix(path)
except:
self.log(msg.format(path, fs, min_ex()), 3)
fs = fs.lower()
self.cache[path] = fs
self.log("found {} at {}".format(fs, path))
return fs
def build_tab(self):
self.log("building tab")
sptn = r"^.*? on (.*) type ([^ ]+) \(.*"
if MACOS:
sptn = r"^.*? on (.*) \(([^ ]+), .*"
ptn = re.compile(sptn)
so, _ = chkcmd(["mount"])
tab1: list[tuple[str, str]] = []
for ln in so.split("\n"):
m = ptn.match(ln)
if not m:
continue
tab1.append(m.groups())
tab1.sort(key=lambda x: (len(x[0]), x[0]))
path1, fs1 = tab1[0]
tab = VFS(self.log_func, fs1, path1, AXS(), {})
for path, fs in tab1[1:]:
tab.add(fs, path.lstrip("/"))
self.tab = tab
def get_unix(self, path: str):
if not self.tab:
self.build_tab()
return self.tab._find(path)[0].realpath.split("/")[0]
def get_w32(self, path: str):
# list mountpoints: fsutil fsinfo drives
from ctypes.wintypes import LPCWSTR, LPWSTR, DWORD, LPDWORD, BOOL, MAX_PATH
def echk(rc, fun, args):
if not rc:
raise ctypes.WinError(ctypes.get_last_error())
return None
k32 = ctypes.WinDLL("kernel32", use_last_error=True)
k32.GetVolumeInformationW.errcheck = echk
k32.GetVolumeInformationW.restype = BOOL
k32.GetVolumeInformationW.argtypes = (
LPCWSTR,
LPWSTR,
DWORD,
LPDWORD,
LPDWORD,
LPDWORD,
LPWSTR,
DWORD,
)
bvolname = ctypes.create_unicode_buffer(MAX_PATH + 1)
bfstype = ctypes.create_unicode_buffer(MAX_PATH + 1)
serial = DWORD()
max_name_len = DWORD()
fs_flags = DWORD()
k32.GetVolumeInformationW(
path,
bvolname,
ctypes.sizeof(bvolname),
ctypes.byref(serial),
ctypes.byref(max_name_len),
ctypes.byref(fs_flags),
bfstype,
ctypes.sizeof(bfstype),
)
return bfstype.value

View file

@ -1031,7 +1031,7 @@ class HttpCli(object):
x = self.conn.hsrv.broker.ask("up2k.handle_chunk", ptop, wark, chash) x = self.conn.hsrv.broker.ask("up2k.handle_chunk", ptop, wark, chash)
response = x.get() response = x.get()
chunksize, cstart, path, lastmod = response chunksize, cstart, path, lastmod, sprs = response
try: try:
if self.args.nw: if self.args.nw:
@ -1045,7 +1045,7 @@ class HttpCli(object):
reader = read_socket(self.sr, remains) reader = read_socket(self.sr, remains)
f = None f = None
fpool = not self.args.no_fpool fpool = not self.args.no_fpool and (not ANYWIN or sprs)
if fpool: if fpool:
with self.mutex: with self.mutex:
try: try:

View file

@ -22,6 +22,7 @@ from .__init__ import ANYWIN, PY2, TYPE_CHECKING, WINDOWS
from .authsrv import LEELOO_DALLAS, VFS, AuthSrv from .authsrv import LEELOO_DALLAS, VFS, AuthSrv
from .bos import bos from .bos import bos
from .mtag import MParser, MTag from .mtag import MParser, MTag
from .fsutil import Fstab
from .util import ( from .util import (
HAVE_SQLITE3, HAVE_SQLITE3,
SYMTIME, SYMTIME,
@ -81,10 +82,9 @@ class Up2k(object):
self.args = hub.args self.args = hub.args
self.log_func = hub.log self.log_func = hub.log
# config
self.salt = self.args.salt self.salt = self.args.salt
self.r_hash = re.compile("^[0-9a-zA-Z_-]{44}$")
# state
self.gid = 0 self.gid = 0
self.mutex = threading.Lock() self.mutex = threading.Lock()
self.pp: Optional[ProgressPrinter] = None self.pp: Optional[ProgressPrinter] = None
@ -121,19 +121,17 @@ class Up2k(object):
self.sqlite_ver = tuple([int(x) for x in sqlite3.sqlite_version.split(".")]) self.sqlite_ver = tuple([int(x) for x in sqlite3.sqlite_version.split(".")])
if self.sqlite_ver < (3, 9): if self.sqlite_ver < (3, 9):
self.no_expr_idx = True self.no_expr_idx = True
else:
self.log("could not initialize sqlite3, will use in-memory registry only")
if ANYWIN: if ANYWIN:
# usually fails to set lastmod too quickly # usually fails to set lastmod too quickly
self.lastmod_q: list[tuple[str, int, tuple[int, int]]] = [] self.lastmod_q: list[tuple[str, int, tuple[int, int], bool]] = []
thr = threading.Thread(target=self._lastmodder, name="up2k-lastmod") thr = threading.Thread(target=self._lastmodder, name="up2k-lastmod")
thr.daemon = True thr.daemon = True
thr.start() thr.start()
# static self.fstab = Fstab(self.log_func)
self.r_hash = re.compile("^[0-9a-zA-Z_-]{44}$")
if not HAVE_SQLITE3:
self.log("could not initialize sqlite3, will use in-memory registry only")
if self.args.no_fastboot: if self.args.no_fastboot:
self.deferred_init() self.deferred_init()
@ -1320,11 +1318,16 @@ class Up2k(object):
wark = self._get_wark(cj) wark = self._get_wark(cj)
now = time.time() now = time.time()
job = None job = None
pdir = os.path.join(cj["ptop"], cj["prel"])
try: try:
dev = bos.stat(os.path.join(cj["ptop"], cj["prel"])).st_dev dev = bos.stat(pdir).st_dev
except: except:
dev = 0 dev = 0
# check if filesystem supports sparse files;
# refuse out-of-order / multithreaded uploading if sprs False
sprs = self.fstab.get(pdir) not in self.fstab.no_sparse
with self.mutex: with self.mutex:
cur = self.cur.get(cj["ptop"]) cur = self.cur.get(cj["ptop"])
reg = self.registry[cj["ptop"]] reg = self.registry[cj["ptop"]]
@ -1356,6 +1359,7 @@ class Up2k(object):
"prel": dp_dir, "prel": dp_dir,
"vtop": cj["vtop"], "vtop": cj["vtop"],
"ptop": cj["ptop"], "ptop": cj["ptop"],
"sprs": sprs,
"size": dsize, "size": dsize,
"lmod": dtime, "lmod": dtime,
"addr": ip, "addr": ip,
@ -1453,6 +1457,7 @@ class Up2k(object):
job = { job = {
"wark": wark, "wark": wark,
"t0": now, "t0": now,
"sprs": sprs,
"hash": deepcopy(cj["hash"]), "hash": deepcopy(cj["hash"]),
"need": [], "need": [],
"busy": {}, "busy": {},
@ -1489,6 +1494,7 @@ class Up2k(object):
"purl": purl, "purl": purl,
"size": job["size"], "size": job["size"],
"lmod": job["lmod"], "lmod": job["lmod"],
"sprs": sprs,
"hash": job["need"], "hash": job["need"],
"wark": wark, "wark": wark,
} }
@ -1562,13 +1568,13 @@ class Up2k(object):
if lmod and (not linked or SYMTIME): if lmod and (not linked or SYMTIME):
times = (int(time.time()), int(lmod)) times = (int(time.time()), int(lmod))
if ANYWIN: if ANYWIN:
self.lastmod_q.append((dst, 0, times)) self.lastmod_q.append((dst, 0, times, False))
else: else:
bos.utime(dst, times, False) bos.utime(dst, times, False)
def handle_chunk( def handle_chunk(
self, ptop: str, wark: str, chash: str self, ptop: str, wark: str, chash: str
) -> tuple[int, list[int], str, float]: ) -> tuple[int, list[int], str, float, bool]:
with self.mutex: with self.mutex:
job = self.registry[ptop].get(wark) job = self.registry[ptop].get(wark)
if not job: if not job:
@ -1592,16 +1598,23 @@ class Up2k(object):
t = "that chunk is already being written to:\n {}\n {} {}/{}\n {}" t = "that chunk is already being written to:\n {}\n {} {}/{}\n {}"
raise Pebkac(400, t.format(wark, chash, idx, nh, job["name"])) raise Pebkac(400, t.format(wark, chash, idx, nh, job["name"]))
path = os.path.join(job["ptop"], job["prel"], job["tnam"])
chunksize = up2k_chunksize(job["size"])
ofs = [chunksize * x for x in nchunk]
if not job["sprs"]:
cur_sz = bos.path.getsize(path)
if ofs[0] > cur_sz:
t = "please upload sequentially using one thread;\nserver filesystem does not support sparse files.\n file: {}\n chunk: {}\n cofs: {}\n flen: {}"
t = t.format(job["name"], nchunk[0], ofs[0], cur_sz)
raise Pebkac(400, t)
job["busy"][chash] = 1 job["busy"][chash] = 1
job["poke"] = time.time() job["poke"] = time.time()
chunksize = up2k_chunksize(job["size"]) return chunksize, ofs, path, job["lmod"], job["sprs"]
ofs = [chunksize * x for x in nchunk]
path = os.path.join(job["ptop"], job["prel"], job["tnam"])
return chunksize, ofs, path, job["lmod"]
def release_chunk(self, ptop: str, wark: str, chash: str) -> bool: def release_chunk(self, ptop: str, wark: str, chash: str) -> bool:
with self.mutex: with self.mutex:
@ -1660,7 +1673,7 @@ class Up2k(object):
times = (int(time.time()), int(job["lmod"])) times = (int(time.time()), int(job["lmod"]))
if ANYWIN: if ANYWIN:
z1 = (dst, job["size"], times) z1 = (dst, job["size"], times, job["sprs"])
self.lastmod_q.append(z1) self.lastmod_q.append(z1)
elif not job["hash"]: elif not job["hash"]:
try: try:
@ -2189,6 +2202,7 @@ class Up2k(object):
f, job["tnam"] = zfw["orz"] f, job["tnam"] = zfw["orz"]
if ( if (
ANYWIN ANYWIN
and job["sprs"]
and self.args.sparse and self.args.sparse
and self.args.sparse * 1024 * 1024 <= job["size"] and self.args.sparse * 1024 * 1024 <= job["size"]
): ):
@ -2198,7 +2212,7 @@ class Up2k(object):
except: except:
self.log("could not sparse [{}]".format(fp), 3) self.log("could not sparse [{}]".format(fp), 3)
if job["hash"]: if job["hash"] and job["sprs"]:
f.seek(job["size"] - 1) f.seek(job["size"] - 1)
f.write(b"e") f.write(b"e")
@ -2212,7 +2226,7 @@ class Up2k(object):
# self.log("lmod: got {}".format(len(ready))) # self.log("lmod: got {}".format(len(ready)))
time.sleep(5) time.sleep(5)
for path, sz, times in ready: for path, sz, times, sparse in ready:
self.log("lmod: setting times {} on {}".format(times, path)) self.log("lmod: setting times {} on {}".format(times, path))
try: try:
bos.utime(path, times, False) bos.utime(path, times, False)
@ -2220,7 +2234,7 @@ class Up2k(object):
t = "lmod: failed to utime ({}, {}):\n{}" t = "lmod: failed to utime ({}, {}):\n{}"
self.log(t.format(path, times, min_ex())) self.log(t.format(path, times, min_ex()))
if self.args.sparse and self.args.sparse * 1024 * 1024 <= sz: if sparse and self.args.sparse and self.args.sparse * 1024 * 1024 <= sz:
try: try:
sp.check_call(["fsutil", "sparse", "setflag", path, "0"]) sp.check_call(["fsutil", "sparse", "setflag", path, "0"])
except: except:

View file

@ -1068,6 +1068,9 @@ html.y #widget.open {
@keyframes spin { @keyframes spin {
100% {transform: rotate(360deg)} 100% {transform: rotate(360deg)}
} }
@media (prefers-reduced-motion) {
@keyframes spin { }
}
@keyframes fadein { @keyframes fadein {
0% {opacity: 0} 0% {opacity: 0}
100% {opacity: 1} 100% {opacity: 1}
@ -1541,7 +1544,8 @@ html.y #tree.nowrap .ntree a+a:hover {
margin: 1em .3em 1em 1em; margin: 1em .3em 1em 1em;
padding: 0 1.2em 0 0; padding: 0 1.2em 0 0;
font-size: 4em; font-size: 4em;
animation: spin 1s linear infinite, fadein .2s ease; opacity: 0;
animation: 1s linear .05s infinite forwards spin, .2s ease .05s 1 forwards fadein;
position: absolute; position: absolute;
z-index: 9; z-index: 9;
} }

View file

@ -622,7 +622,7 @@ var Ls = {
"u_hashing": 'les', "u_hashing": 'les',
"u_upping": 'sender', "u_upping": 'sender',
"u_cuerr": "kunne ikke laste opp del {0} av {1};\nsikkert harmløst, fortsetter\n\nfil: {2}", "u_cuerr": "kunne ikke laste opp del {0} av {1};\nsikkert harmløst, fortsetter\n\nfil: {2}",
"u_cuerr2": "server nektet opplastningen (del {0} of {1});\n\nfile: {2}\n\nerror ", "u_cuerr2": "server nektet opplastningen (del {0} av {1});\n\nfile: {2}\n\nerror ",
"u_ehsfin": "server nektet forespørselen om å ferdigstille filen", "u_ehsfin": "server nektet forespørselen om å ferdigstille filen",
"u_ehssrch": "server nektet forespørselen om å utføre søk", "u_ehssrch": "server nektet forespørselen om å utføre søk",
"u_ehsinit": "server nektet forespørselen om å begynne en ny opplastning", "u_ehsinit": "server nektet forespørselen om å begynne en ny opplastning",
@ -4983,22 +4983,17 @@ var treectl = (function () {
})(); })();
var enspin_timer = null;
function enspin(sel) { function enspin(sel) {
clearTimeout(enspin_timer); despin(sel);
enspin_timer = setTimeout(function () { var d = mknod('div');
despin(sel); d.className = 'dumb_loader_thing';
var d = mknod('div'); d.innerHTML = '🌲';
d.className = 'dumb_loader_thing'; var tgt = QS(sel);
d.innerHTML = '🌲'; tgt.insertBefore(d, tgt.childNodes[0]);
var tgt = QS(sel);
tgt.insertBefore(d, tgt.childNodes[0]);
}, 50);
} }
function despin(sel) { function despin(sel) {
clearTimeout(enspin_timer);
var o = QSA(sel + '>.dumb_loader_thing'); var o = QSA(sel + '>.dumb_loader_thing');
for (var a = o.length - 1; a >= 0; a--) for (var a = o.length - 1; a >= 0; a--)
o[a].parentNode.removeChild(o[a]); o[a].parentNode.removeChild(o[a]);

View file

@ -1332,7 +1332,8 @@ function up2k_init(subtle) {
} }
if (st.todo.upload.length && if (st.todo.upload.length &&
st.busy.upload.length < parallel_uploads) { st.busy.upload.length < parallel_uploads &&
can_upload_next()) {
exec_upload(); exec_upload();
mou_ikkai = true; mou_ikkai = true;
} }
@ -1673,6 +1674,8 @@ function up2k_init(subtle) {
return; return;
} }
t.sprs = response.sprs;
var rsp_purl = url_enc(response.purl); var rsp_purl = url_enc(response.purl);
if (rsp_purl !== t.purl || response.name !== t.name) { if (rsp_purl !== t.purl || response.name !== t.name) {
// server renamed us (file exists / path restrictions) // server renamed us (file exists / path restrictions)
@ -1824,6 +1827,20 @@ function up2k_init(subtle) {
/// upload /// upload
// //
function can_upload_next() {
var upt = st.todo.upload[0],
upf = st.files[upt.nfile];
if (upf.sprs)
return true;
for (var a = 0, aa = st.busy.upload.length; a < aa; a++)
if (st.busy.upload[a].nfile == upt.nfile)
return false;
return true;
}
function exec_upload() { function exec_upload() {
var upt = st.todo.upload.shift(); var upt = st.todo.upload.shift();
st.busy.upload.push(upt); st.busy.upload.push(upt);