From 4c4b3790c71e85340c0723988a3d650c727c2ffe Mon Sep 17 00:00:00 2001 From: ed Date: Tue, 7 Jun 2022 19:02:52 +0200 Subject: [PATCH] fix read-spin on d/c during json post + errorhandling --- bin/up2k.py | 7 +- copyparty/__main__.py | 2 +- copyparty/httpcli.py | 9 +-- copyparty/httpconn.py | 5 +- copyparty/up2k.py | 2 +- copyparty/util.py | 145 +++++++++++++++++++++++++----------------- tests/test_httpcli.py | 1 + tests/test_vfs.py | 1 + tests/util.py | 2 +- 9 files changed, 103 insertions(+), 71 deletions(-) diff --git a/bin/up2k.py b/bin/up2k.py index d104cec2..8c0c32b5 100755 --- a/bin/up2k.py +++ b/bin/up2k.py @@ -3,7 +3,7 @@ from __future__ import print_function, unicode_literals """ up2k.py: upload to copyparty -2021-11-28, v0.13, ed , MIT-Licensed +2022-06-07, v0.14, ed , MIT-Licensed https://github.com/9001/copyparty/blob/hovudstraum/bin/up2k.py - dependencies: requests @@ -614,10 +614,11 @@ class Ctl(object): spd = humansize(spd) eta = str(datetime.timedelta(seconds=int(eta))) - left = humansize(self.nbytes - self.up_b) + sleft = humansize(self.nbytes - self.up_b) + nleft = self.nfiles - self.up_f tail = "\033[K\033[u" if VT100 else "\r" - m = "eta: {0} @ {1}/s, {2} left".format(eta, spd, left) + m = "{0} eta @ {1}/s, {2}, {3}# left".format(eta, spd, sleft, nleft) eprint(txt + "\033]0;{0}\033\\\r{1}{2}".format(m, m, tail)) def cleanup_vt100(self): diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 4458c00c..38b13f2e 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -440,7 +440,7 @@ def run_argparse(argv, formatter): ap2.add_argument("--hardlink", action="store_true", help="prefer hardlinks instead of symlinks when possible (within same filesystem)") ap2.add_argument("--never-symlink", action="store_true", help="do not fallback to symlinks when a hardlink cannot be made") ap2.add_argument("--no-dedup", action="store_true", help="disable symlink/hardlink creation; copy file contents instead") - ap2.add_argument("--reg-cap", metavar="N", type=int, default=9000, help="max number of uploads to keep in memory when running without -e2d") + ap2.add_argument("--reg-cap", metavar="N", type=int, default=38400, help="max number of uploads to keep in memory when running without -e2d; roughly 1 MiB RAM per 600") ap2.add_argument("--turbo", metavar="LVL", type=int, default=0, help="configure turbo-mode in up2k client; 0 = off and warn if enabled, 1 = off, 2 = on, 3 = on and disable datecheck") ap2 = ap.add_argument_group('network options') diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 0c4d37de..f662b917 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -784,9 +784,10 @@ class HttpCli(object): if "charset" in ctype: enc = ctype.split("charset")[1].strip(" =").split(";")[0].strip() - json_buf = b"" - while len(json_buf) < remains: - json_buf += self.sr.recv(32 * 1024) + try: + json_buf = self.sr.recv_ex(remains) + except UnrecvEOF: + raise Pebkac(422, "client disconnected while posting JSON") self.log("decoding {} bytes of {} json".format(len(json_buf), enc)) try: @@ -1201,7 +1202,7 @@ class HttpCli(object): status = "OK" if errmsg: - self.log(errmsg) + self.log(errmsg, 3) status = "ERROR" msg = "{} // {} bytes // {:.3f} MiB/s\n".format(status, sz_total, spd) diff --git a/copyparty/httpconn.py b/copyparty/httpconn.py index fb10b1b1..85f5f701 100644 --- a/copyparty/httpconn.py +++ b/copyparty/httpconn.py @@ -30,6 +30,7 @@ class HttpConn(object): def __init__(self, sck, addr, hsrv): self.s = sck + self.sr = None # Type: Unrecv self.addr = addr self.hsrv = hsrv @@ -95,7 +96,7 @@ class HttpConn(object): except AttributeError: # jython does not support msg_peek; forget about https method = self.s.recv(4) - self.sr = Unrecv(self.s) + self.sr = Unrecv(self.s, self.log) self.sr.buf = method # jython used to do this, they stopped since it's broken @@ -182,7 +183,7 @@ class HttpConn(object): return if not self.sr: - self.sr = Unrecv(self.s) + self.sr = Unrecv(self.s, self.log) while not self.stopping: self.nreq += 1 diff --git a/copyparty/up2k.py b/copyparty/up2k.py index a0968201..4f064437 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -1460,7 +1460,7 @@ class Up2k(object): fs2 = bos.stat(os.path.dirname(dst)).st_dev if fs1 == 0 or fs2 == 0: # py2 on winxp or other unsupported combination - raise OSError() + raise OSError(38, "filesystem does not have st_dev") elif fs1 == fs2: # same fs; make symlink as relative as possible v = [] diff --git a/copyparty/util.py b/copyparty/util.py index 7fcc246e..2fe5ec32 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -10,6 +10,7 @@ import base64 import select import struct import signal +import socket import hashlib import platform import traceback @@ -188,13 +189,18 @@ class Cooldown(object): return ret +class UnrecvEOF(OSError): + pass + + class _Unrecv(object): """ undo any number of socket recv ops """ - def __init__(self, s): - self.s = s + def __init__(self, s, log): + self.s = s # type: socket.socket + self.log = log self.buf = b"" def recv(self, nbytes): @@ -203,20 +209,27 @@ class _Unrecv(object): self.buf = self.buf[nbytes:] return ret - try: - return self.s.recv(nbytes) - except: - return b"" + ret = self.s.recv(nbytes) + if not ret: + raise UnrecvEOF("client stopped sending data") - def recv_ex(self, nbytes): + return ret + + def recv_ex(self, nbytes, raise_on_trunc=True): """read an exact number of bytes""" - ret = self.recv(nbytes) - while ret and len(ret) < nbytes: - buf = self.recv(nbytes - len(ret)) - if not buf: - break + ret = b"" + try: + while nbytes > len(ret): + ret += self.recv(nbytes - len(ret)) + except OSError: + m = "client only sent {} of {} expected bytes".format(len(ret), nbytes) + if len(ret) <= 16: + m += "; got {!r}".format(ret) - ret += buf + if raise_on_trunc: + raise UnrecvEOF(5, m) + elif self.log: + self.log(m, 3) return ret @@ -229,42 +242,55 @@ class _LUnrecv(object): with expensive debug logging """ - def __init__(self, s): + def __init__(self, s, log): self.s = s + self.log = log self.buf = b"" def recv(self, nbytes): if self.buf: ret = self.buf[:nbytes] self.buf = self.buf[nbytes:] - m = "\033[0;7mur:pop:\033[0;1;32m {}\n\033[0;7mur:rem:\033[0;1;35m {}\033[0m\n" - print(m.format(ret, self.buf), end="") + m = "\033[0;7mur:pop:\033[0;1;32m {}\n\033[0;7mur:rem:\033[0;1;35m {}\033[0m" + self.log(m.format(ret, self.buf)) return ret - try: - ret = self.s.recv(nbytes) - m = "\033[0;7mur:recv\033[0;1;33m {}\033[0m\n" - print(m.format(ret), end="") - return ret - except: - return b"" + ret = self.s.recv(nbytes) + m = "\033[0;7mur:recv\033[0;1;33m {}\033[0m" + self.log(m.format(ret)) + if not ret: + raise UnrecvEOF("client stopped sending data") - def recv_ex(self, nbytes): + return ret + + def recv_ex(self, nbytes, raise_on_trunc=True): """read an exact number of bytes""" - ret = self.recv(nbytes) - while ret and len(ret) < nbytes: - buf = self.recv(nbytes - len(ret)) - if not buf: - break + try: + ret = self.recv(nbytes) + err = False + except: + ret = b"" + err = True - ret += buf + while not err and len(ret) < nbytes: + try: + ret += self.recv(nbytes - len(ret)) + except OSError: + err = True + + if err: + m = "client only sent {} of {} expected bytes".format(len(ret), nbytes) + if raise_on_trunc: + raise UnrecvEOF(m) + elif self.log: + self.log(m, 3) return ret def unrecv(self, buf): self.buf = buf + self.buf - m = "\033[0;7mur:push\033[0;1;31m {}\n\033[0;7mur:rem:\033[0;1;35m {}\033[0m\n" - print(m.format(buf, self.buf), end="") + m = "\033[0;7mur:push\033[0;1;31m {}\n\033[0;7mur:rem:\033[0;1;35m {}\033[0m" + self.log(m.format(buf, self.buf)) Unrecv = _Unrecv @@ -577,7 +603,7 @@ def ren_open(fname, *args, **kwargs): class MultipartParser(object): def __init__(self, log_func, sr, http_headers): - self.sr = sr + self.sr = sr # type: Unrecv self.log = log_func self.headers = http_headers @@ -670,8 +696,9 @@ class MultipartParser(object): blen = len(self.boundary) bufsz = 32 * 1024 while True: - buf = self.sr.recv(bufsz) - if not buf: + try: + buf = self.sr.recv(bufsz) + except: # abort: client disconnected raise Pebkac(400, "client d/c during multipart post") @@ -704,13 +731,12 @@ class MultipartParser(object): yield buf[:-n] return - buf2 = self.sr.recv(bufsz) - if not buf2: + try: + buf += self.sr.recv(bufsz) + except: # abort: client disconnected raise Pebkac(400, "client d/c during multipart post") - buf += buf2 - yield buf def _run_gen(self): @@ -723,11 +749,11 @@ class MultipartParser(object): fieldname, filename = self._read_header() yield [fieldname, filename, self._read_data()] - tail = self.sr.recv_ex(2) + tail = self.sr.recv_ex(2, False) if tail == b"--": # EOF indicated by this immediately after final boundary - tail = self.sr.recv_ex(2) + tail = self.sr.recv_ex(2, False) run = False if tail != b"\r\n": @@ -793,8 +819,9 @@ def get_boundary(headers): def read_header(sr): ret = b"" while True: - buf = sr.recv(1024) - if not buf: + try: + ret += sr.recv(1024) + except: if not ret: return None @@ -804,7 +831,6 @@ def read_header(sr): + ret.decode("utf-8", "replace"), ) - ret += buf ofs = ret.find(b"\r\n\r\n") if ofs < 0: if len(ret) > 1024 * 64: @@ -1111,8 +1137,9 @@ def read_socket(sr, total_size): if bufsz > remains: bufsz = remains - buf = sr.recv(bufsz) - if not buf: + try: + buf = sr.recv(bufsz) + except OSError: m = "client d/c during binary post after {} bytes, {} bytes remaining" raise Pebkac(400, m.format(total_size - remains, remains)) @@ -1121,26 +1148,26 @@ def read_socket(sr, total_size): def read_socket_unbounded(sr): - while True: - buf = sr.recv(32 * 1024) - if not buf: - return - - yield buf + try: + while True: + yield sr.recv(32 * 1024) + except: + return def read_socket_chunked(sr, log=None): - err = "expected chunk length, got [{}] |{}| instead" + err = "upload aborted: expected chunk length, got [{}] |{}| instead" while True: buf = b"" while b"\r" not in buf: - rbuf = sr.recv(2) - if not rbuf or len(buf) > 16: + try: + buf += sr.recv(2) + if len(buf) > 16: + raise Exception() + except: err = err.format(buf.decode("utf-8", "replace"), len(buf)) raise Pebkac(400, err) - buf += rbuf - if not buf.endswith(b"\n"): sr.recv(1) @@ -1151,7 +1178,7 @@ def read_socket_chunked(sr, log=None): raise Pebkac(400, err) if chunklen == 0: - x = sr.recv_ex(2) + x = sr.recv_ex(2, False) if x == b"\r\n": return @@ -1164,7 +1191,7 @@ def read_socket_chunked(sr, log=None): for chunk in read_socket(sr, chunklen): yield chunk - x = sr.recv_ex(2) + x = sr.recv_ex(2, False) if x != b"\r\n": m = "protocol error in chunk separator: want b'\\r\\n', got {!r}" raise Pebkac(400, m.format(x)) diff --git a/tests/test_httpcli.py b/tests/test_httpcli.py index 4a5f1b3d..43d4b670 100644 --- a/tests/test_httpcli.py +++ b/tests/test_httpcli.py @@ -56,6 +56,7 @@ class Cfg(Namespace): textfiles="", doctitle="", html_head="", + lang="eng", theme=0, themes=0, turbo=0, diff --git a/tests/test_vfs.py b/tests/test_vfs.py index 9f904cc3..cb7f08fe 100644 --- a/tests/test_vfs.py +++ b/tests/test_vfs.py @@ -36,6 +36,7 @@ class Cfg(Namespace): "rsp_slp": 0, "s_wr_slp": 0, "s_wr_sz": 512 * 1024, + "lang": "eng", "theme": 0, "themes": 0, "turbo": 0, diff --git a/tests/util.py b/tests/util.py index 0a5f8cf7..55ec58ea 100644 --- a/tests/util.py +++ b/tests/util.py @@ -128,7 +128,7 @@ class VHttpSrv(object): class VHttpConn(object): def __init__(self, args, asrv, log, buf): self.s = VSock(buf) - self.sr = Unrecv(self.s) + self.sr = Unrecv(self.s, None) self.addr = ("127.0.0.1", "42069") self.args = args self.asrv = asrv