up2k: improve upload retry/timeout

* `js:` make handshake retries more aggressive
* `u2c:` reduce chunks timeout + ^
* `main:` reduce tcp timeout to 128sec (js is 42s)
* `httpcli:` less confusing log messages
This commit is contained in:
ed 2024-10-18 16:24:31 +00:00
parent 5f91999512
commit a9b4436cdc
5 changed files with 36 additions and 24 deletions

View file

@ -1,8 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from __future__ import print_function, unicode_literals from __future__ import print_function, unicode_literals
S_VERSION = "2.4" S_VERSION = "2.5"
S_BUILD_DT = "2024-10-16" S_BUILD_DT = "2024-10-18"
""" """
u2c.py: upload to copyparty u2c.py: upload to copyparty
@ -154,6 +154,7 @@ class HCli(object):
self.tls = tls self.tls = tls
self.verify = ar.te or not ar.td self.verify = ar.te or not ar.td
self.conns = [] self.conns = []
self.hconns = []
if tls: if tls:
import ssl import ssl
@ -173,7 +174,7 @@ class HCli(object):
"User-Agent": "u2c/%s" % (S_VERSION,), "User-Agent": "u2c/%s" % (S_VERSION,),
} }
def _connect(self): def _connect(self, timeout):
args = {} args = {}
if PY37: if PY37:
args["blocksize"] = 1048576 args["blocksize"] = 1048576
@ -185,7 +186,7 @@ class HCli(object):
if self.ctx: if self.ctx:
args = {"context": self.ctx} args = {"context": self.ctx}
return C(self.addr, self.port, timeout=999, **args) return C(self.addr, self.port, timeout=timeout, **args)
def req(self, meth, vpath, hdrs, body=None, ctype=None): def req(self, meth, vpath, hdrs, body=None, ctype=None):
hdrs.update(self.base_hdrs) hdrs.update(self.base_hdrs)
@ -198,7 +199,9 @@ class HCli(object):
0 if not body else body.len if hasattr(body, "len") else len(body) 0 if not body else body.len if hasattr(body, "len") else len(body)
) )
c = self.conns.pop() if self.conns else self._connect() # large timeout for handshakes (safededup)
conns = self.hconns if ctype == MJ else self.conns
c = conns.pop() if conns else self._connect(999 if ctype == MJ else 128)
try: try:
c.request(meth, vpath, body, hdrs) c.request(meth, vpath, body, hdrs)
if PY27: if PY27:
@ -207,7 +210,7 @@ class HCli(object):
rsp = c.getresponse() rsp = c.getresponse()
data = rsp.read() data = rsp.read()
self.conns.append(c) conns.append(c)
return rsp.status, data.decode("utf-8") return rsp.status, data.decode("utf-8")
except: except:
c.close() c.close()
@ -870,9 +873,10 @@ def upload(fsl, stats, maxsz):
if sc >= 400: if sc >= 400:
raise Exception("http %s: %s" % (sc, txt)) raise Exception("http %s: %s" % (sc, txt))
finally: finally:
fsl.f.close() if fsl.f:
if nsub != -1: fsl.f.close()
fsl.unsub() if nsub != -1:
fsl.unsub()
class Ctl(object): class Ctl(object):

View file

@ -1037,7 +1037,7 @@ def add_network(ap):
else: else:
ap2.add_argument("--freebind", action="store_true", help="allow listening on IPs which do not yet exist, for example if the network interfaces haven't finished going up. Only makes sense for IPs other than '0.0.0.0', '127.0.0.1', '::', and '::1'. May require running as root (unless net.ipv6.ip_nonlocal_bind)") ap2.add_argument("--freebind", action="store_true", help="allow listening on IPs which do not yet exist, for example if the network interfaces haven't finished going up. Only makes sense for IPs other than '0.0.0.0', '127.0.0.1', '::', and '::1'. May require running as root (unless net.ipv6.ip_nonlocal_bind)")
ap2.add_argument("--s-thead", metavar="SEC", type=int, default=120, help="socket timeout (read request header)") ap2.add_argument("--s-thead", metavar="SEC", type=int, default=120, help="socket timeout (read request header)")
ap2.add_argument("--s-tbody", metavar="SEC", type=float, default=186.0, help="socket timeout (read/write request/response bodies). Use 60 on fast servers (default is extremely safe). Disable with 0 if reverse-proxied for a 2%% speed boost") ap2.add_argument("--s-tbody", metavar="SEC", type=float, default=128.0, help="socket timeout (read/write request/response bodies). Use 60 on fast servers (default is extremely safe). Disable with 0 if reverse-proxied for a 2%% speed boost")
ap2.add_argument("--s-rd-sz", metavar="B", type=int, default=256*1024, help="socket read size in bytes (indirectly affects filesystem writes; recommendation: keep equal-to or lower-than \033[33m--iobuf\033[0m)") ap2.add_argument("--s-rd-sz", metavar="B", type=int, default=256*1024, help="socket read size in bytes (indirectly affects filesystem writes; recommendation: keep equal-to or lower-than \033[33m--iobuf\033[0m)")
ap2.add_argument("--s-wr-sz", metavar="B", type=int, default=256*1024, help="socket write size in bytes") ap2.add_argument("--s-wr-sz", metavar="B", type=int, default=256*1024, help="socket write size in bytes")
ap2.add_argument("--s-wr-slp", metavar="SEC", type=float, default=0.0, help="debug: socket write delay in seconds") ap2.add_argument("--s-wr-slp", metavar="SEC", type=float, default=0.0, help="debug: socket write delay in seconds")

View file

@ -2492,6 +2492,7 @@ class HttpCli(object):
except: except:
# maybe busted handle (eg. disk went full) # maybe busted handle (eg. disk went full)
f.close() f.close()
chashes = [] # exception flag
raise raise
finally: finally:
if locked: if locked:
@ -2500,9 +2501,11 @@ class HttpCli(object):
num_left, t = x.get() num_left, t = x.get()
if num_left < 0: if num_left < 0:
self.loud_reply(t, status=500) self.loud_reply(t, status=500)
return False if chashes: # kills exception bubbling otherwise
t = "got %d more chunks, %d left" return False
self.log(t % (len(written), num_left), 6) else:
t = "got %d more chunks, %d left"
self.log(t % (len(written), num_left), 6)
if num_left < 0: if num_left < 0:
raise Pebkac(500, "unconfirmed; see serverlog") raise Pebkac(500, "unconfirmed; see serverlog")

View file

@ -3498,6 +3498,7 @@ class Up2k(object):
for chash in written: for chash in written:
job["need"].remove(chash) job["need"].remove(chash)
except Exception as ex: except Exception as ex:
# dead tcp connections can get here by timeout (OK)
return -2, "confirm_chunk, chash(%s) %r" % (chash, ex) # type: ignore return -2, "confirm_chunk, chash(%s) %r" % (chash, ex) # type: ignore
ret = len(job["need"]) ret = len(job["need"])

View file

@ -244,7 +244,7 @@ function U2pvis(act, btns, uc, st) {
p = bd * 100.0 / sz, p = bd * 100.0 / sz,
nb = bd - bd0, nb = bd - bd0,
spd = nb / (td / 1000), spd = nb / (td / 1000),
eta = (sz - bd) / spd; eta = spd ? (sz - bd) / spd : 3599;
return [p, s2ms(eta), spd / (1024 * 1024)]; return [p, s2ms(eta), spd / (1024 * 1024)];
}; };
@ -1874,10 +1874,12 @@ function up2k_init(subtle) {
function chill(t) { function chill(t) {
var now = Date.now(); var now = Date.now();
if ((t.coolmul || 0) < 2 || now - t.cooldown < t.coolmul * 700) if ((t.coolmul || 0) < 5 || now - t.cooldown < t.coolmul * 700)
t.coolmul = Math.min((t.coolmul || 0.5) * 2, 32); t.coolmul = Math.min((t.coolmul || 0.5) * 2, 32);
t.cooldown = Math.max(t.cooldown || 1, Date.now() + t.coolmul * 1000); var cd = now + 1000 * (t.coolmul + Math.random() * 4 + 2);
t.cooldown = Math.floor(Math.max(cd, t.cooldown || 1));
return t;
} }
///// /////
@ -2270,8 +2272,7 @@ function up2k_init(subtle) {
console.log('handshake onerror, retrying', t.name, t); console.log('handshake onerror, retrying', t.name, t);
apop(st.busy.handshake, t); apop(st.busy.handshake, t);
st.todo.handshake.unshift(t); st.todo.handshake.unshift(chill(t));
t.cooldown = Date.now() + 5000 + Math.floor(Math.random() * 3000);
t.keepalive = keepalive; t.keepalive = keepalive;
}; };
var orz = function (e) { var orz = function (e) {
@ -2284,8 +2285,7 @@ function up2k_init(subtle) {
} }
catch (ex) { catch (ex) {
apop(st.busy.handshake, t); apop(st.busy.handshake, t);
st.todo.handshake.unshift(t); st.todo.handshake.unshift(chill(t));
t.cooldown = Date.now() + 5000 + Math.floor(Math.random() * 3000);
var txt = t.t_uploading ? L.u_ehsfin : t.srch ? L.u_ehssrch : L.u_ehsinit; var txt = t.t_uploading ? L.u_ehsfin : t.srch ? L.u_ehssrch : L.u_ehsinit;
return toast.err(0, txt + '\n\n' + L.badreply + ':\n\n' + unpre(xhr.responseText)); return toast.err(0, txt + '\n\n' + L.badreply + ':\n\n' + unpre(xhr.responseText));
} }
@ -2464,6 +2464,7 @@ function up2k_init(subtle) {
else { else {
pvis.seth(t.n, 1, "ERROR"); pvis.seth(t.n, 1, "ERROR");
pvis.seth(t.n, 2, L.u_ehstmp, t); pvis.seth(t.n, 2, L.u_ehstmp, t);
apop(st.busy.handshake, t);
var err = "", var err = "",
cls = "ERROR", cls = "ERROR",
@ -2477,7 +2478,6 @@ function up2k_init(subtle) {
var penalty = rsp.replace(/.*rate-limit /, "").split(' ')[0]; var penalty = rsp.replace(/.*rate-limit /, "").split(' ')[0];
console.log("rate-limit: " + penalty); console.log("rate-limit: " + penalty);
t.cooldown = Date.now() + parseFloat(penalty) * 1000; t.cooldown = Date.now() + parseFloat(penalty) * 1000;
apop(st.busy.handshake, t);
st.todo.handshake.unshift(t); st.todo.handshake.unshift(t);
return; return;
} }
@ -2500,8 +2500,6 @@ function up2k_init(subtle) {
cls = 'defer'; cls = 'defer';
} }
} }
if (rsp.indexOf('server HDD is full') + 1)
return toast.err(0, L.u_ehsdf + "\n\n" + rsp.replace(/.*; /, ''));
if (err != "") { if (err != "") {
if (!t.t_uploading) if (!t.t_uploading)
@ -2511,10 +2509,15 @@ function up2k_init(subtle) {
pvis.seth(t.n, 2, err); pvis.seth(t.n, 2, err);
pvis.move(t.n, 'ng'); pvis.move(t.n, 'ng');
apop(st.busy.handshake, t);
tasker(); tasker();
return; return;
} }
st.todo.handshake.unshift(chill(t));
if (rsp.indexOf('server HDD is full') + 1)
return toast.err(0, L.u_ehsdf + "\n\n" + rsp.replace(/.*; /, ''));
err = t.t_uploading ? L.u_ehsfin : t.srch ? L.u_ehssrch : L.u_ehsinit; err = t.t_uploading ? L.u_ehsfin : t.srch ? L.u_ehssrch : L.u_ehsinit;
xhrchk(xhr, err + "\n\nfile: " + t.name + "\n\nerror ", "404, target folder not found", "warn", t); xhrchk(xhr, err + "\n\nfile: " + t.name + "\n\nerror ", "404, target folder not found", "warn", t);
} }
@ -2654,6 +2657,7 @@ function up2k_init(subtle) {
st.bytes.finished += cdr - car; st.bytes.finished += cdr - car;
st.bytes.uploaded += cdr - car; st.bytes.uploaded += cdr - car;
t.bytes_uploaded += cdr - car; t.bytes_uploaded += cdr - car;
t.cooldown = t.coolmul = 0;
st.etac.u++; st.etac.u++;
st.etac.t++; st.etac.t++;
} }