fix read-spin on d/c during json post + errorhandling

This commit is contained in:
ed 2022-06-07 19:02:52 +02:00
parent bd60b464bb
commit 4c4b3790c7
9 changed files with 103 additions and 71 deletions

View file

@ -3,7 +3,7 @@ from __future__ import print_function, unicode_literals
""" """
up2k.py: upload to copyparty up2k.py: upload to copyparty
2021-11-28, v0.13, ed <irc.rizon.net>, MIT-Licensed 2022-06-07, v0.14, ed <irc.rizon.net>, MIT-Licensed
https://github.com/9001/copyparty/blob/hovudstraum/bin/up2k.py https://github.com/9001/copyparty/blob/hovudstraum/bin/up2k.py
- dependencies: requests - dependencies: requests
@ -614,10 +614,11 @@ class Ctl(object):
spd = humansize(spd) spd = humansize(spd)
eta = str(datetime.timedelta(seconds=int(eta))) eta = str(datetime.timedelta(seconds=int(eta)))
left = humansize(self.nbytes - self.up_b) sleft = humansize(self.nbytes - self.up_b)
nleft = self.nfiles - self.up_f
tail = "\033[K\033[u" if VT100 else "\r" tail = "\033[K\033[u" if VT100 else "\r"
m = "eta: {0} @ {1}/s, {2} left".format(eta, spd, left) m = "{0} eta @ {1}/s, {2}, {3}# left".format(eta, spd, sleft, nleft)
eprint(txt + "\033]0;{0}\033\\\r{1}{2}".format(m, m, tail)) eprint(txt + "\033]0;{0}\033\\\r{1}{2}".format(m, m, tail))
def cleanup_vt100(self): def cleanup_vt100(self):

View file

@ -440,7 +440,7 @@ def run_argparse(argv, formatter):
ap2.add_argument("--hardlink", action="store_true", help="prefer hardlinks instead of symlinks when possible (within same filesystem)") ap2.add_argument("--hardlink", action="store_true", help="prefer hardlinks instead of symlinks when possible (within same filesystem)")
ap2.add_argument("--never-symlink", action="store_true", help="do not fallback to symlinks when a hardlink cannot be made") ap2.add_argument("--never-symlink", action="store_true", help="do not fallback to symlinks when a hardlink cannot be made")
ap2.add_argument("--no-dedup", action="store_true", help="disable symlink/hardlink creation; copy file contents instead") ap2.add_argument("--no-dedup", action="store_true", help="disable symlink/hardlink creation; copy file contents instead")
ap2.add_argument("--reg-cap", metavar="N", type=int, default=9000, help="max number of uploads to keep in memory when running without -e2d") ap2.add_argument("--reg-cap", metavar="N", type=int, default=38400, help="max number of uploads to keep in memory when running without -e2d; roughly 1 MiB RAM per 600")
ap2.add_argument("--turbo", metavar="LVL", type=int, default=0, help="configure turbo-mode in up2k client; 0 = off and warn if enabled, 1 = off, 2 = on, 3 = on and disable datecheck") ap2.add_argument("--turbo", metavar="LVL", type=int, default=0, help="configure turbo-mode in up2k client; 0 = off and warn if enabled, 1 = off, 2 = on, 3 = on and disable datecheck")
ap2 = ap.add_argument_group('network options') ap2 = ap.add_argument_group('network options')

View file

@ -784,9 +784,10 @@ class HttpCli(object):
if "charset" in ctype: if "charset" in ctype:
enc = ctype.split("charset")[1].strip(" =").split(";")[0].strip() enc = ctype.split("charset")[1].strip(" =").split(";")[0].strip()
json_buf = b"" try:
while len(json_buf) < remains: json_buf = self.sr.recv_ex(remains)
json_buf += self.sr.recv(32 * 1024) except UnrecvEOF:
raise Pebkac(422, "client disconnected while posting JSON")
self.log("decoding {} bytes of {} json".format(len(json_buf), enc)) self.log("decoding {} bytes of {} json".format(len(json_buf), enc))
try: try:
@ -1201,7 +1202,7 @@ class HttpCli(object):
status = "OK" status = "OK"
if errmsg: if errmsg:
self.log(errmsg) self.log(errmsg, 3)
status = "ERROR" status = "ERROR"
msg = "{} // {} bytes // {:.3f} MiB/s\n".format(status, sz_total, spd) msg = "{} // {} bytes // {:.3f} MiB/s\n".format(status, sz_total, spd)

View file

@ -30,6 +30,7 @@ class HttpConn(object):
def __init__(self, sck, addr, hsrv): def __init__(self, sck, addr, hsrv):
self.s = sck self.s = sck
self.sr = None # Type: Unrecv
self.addr = addr self.addr = addr
self.hsrv = hsrv self.hsrv = hsrv
@ -95,7 +96,7 @@ class HttpConn(object):
except AttributeError: except AttributeError:
# jython does not support msg_peek; forget about https # jython does not support msg_peek; forget about https
method = self.s.recv(4) method = self.s.recv(4)
self.sr = Unrecv(self.s) self.sr = Unrecv(self.s, self.log)
self.sr.buf = method self.sr.buf = method
# jython used to do this, they stopped since it's broken # jython used to do this, they stopped since it's broken
@ -182,7 +183,7 @@ class HttpConn(object):
return return
if not self.sr: if not self.sr:
self.sr = Unrecv(self.s) self.sr = Unrecv(self.s, self.log)
while not self.stopping: while not self.stopping:
self.nreq += 1 self.nreq += 1

View file

@ -1460,7 +1460,7 @@ class Up2k(object):
fs2 = bos.stat(os.path.dirname(dst)).st_dev fs2 = bos.stat(os.path.dirname(dst)).st_dev
if fs1 == 0 or fs2 == 0: if fs1 == 0 or fs2 == 0:
# py2 on winxp or other unsupported combination # py2 on winxp or other unsupported combination
raise OSError() raise OSError(38, "filesystem does not have st_dev")
elif fs1 == fs2: elif fs1 == fs2:
# same fs; make symlink as relative as possible # same fs; make symlink as relative as possible
v = [] v = []

View file

@ -10,6 +10,7 @@ import base64
import select import select
import struct import struct
import signal import signal
import socket
import hashlib import hashlib
import platform import platform
import traceback import traceback
@ -188,13 +189,18 @@ class Cooldown(object):
return ret return ret
class UnrecvEOF(OSError):
pass
class _Unrecv(object): class _Unrecv(object):
""" """
undo any number of socket recv ops undo any number of socket recv ops
""" """
def __init__(self, s): def __init__(self, s, log):
self.s = s self.s = s # type: socket.socket
self.log = log
self.buf = b"" self.buf = b""
def recv(self, nbytes): def recv(self, nbytes):
@ -203,20 +209,27 @@ class _Unrecv(object):
self.buf = self.buf[nbytes:] self.buf = self.buf[nbytes:]
return ret return ret
try: ret = self.s.recv(nbytes)
return self.s.recv(nbytes) if not ret:
except: raise UnrecvEOF("client stopped sending data")
return b""
def recv_ex(self, nbytes): return ret
def recv_ex(self, nbytes, raise_on_trunc=True):
"""read an exact number of bytes""" """read an exact number of bytes"""
ret = self.recv(nbytes) ret = b""
while ret and len(ret) < nbytes: try:
buf = self.recv(nbytes - len(ret)) while nbytes > len(ret):
if not buf: ret += self.recv(nbytes - len(ret))
break except OSError:
m = "client only sent {} of {} expected bytes".format(len(ret), nbytes)
if len(ret) <= 16:
m += "; got {!r}".format(ret)
ret += buf if raise_on_trunc:
raise UnrecvEOF(5, m)
elif self.log:
self.log(m, 3)
return ret return ret
@ -229,42 +242,55 @@ class _LUnrecv(object):
with expensive debug logging with expensive debug logging
""" """
def __init__(self, s): def __init__(self, s, log):
self.s = s self.s = s
self.log = log
self.buf = b"" self.buf = b""
def recv(self, nbytes): def recv(self, nbytes):
if self.buf: if self.buf:
ret = self.buf[:nbytes] ret = self.buf[:nbytes]
self.buf = self.buf[nbytes:] self.buf = self.buf[nbytes:]
m = "\033[0;7mur:pop:\033[0;1;32m {}\n\033[0;7mur:rem:\033[0;1;35m {}\033[0m\n" m = "\033[0;7mur:pop:\033[0;1;32m {}\n\033[0;7mur:rem:\033[0;1;35m {}\033[0m"
print(m.format(ret, self.buf), end="") self.log(m.format(ret, self.buf))
return ret return ret
try: ret = self.s.recv(nbytes)
ret = self.s.recv(nbytes) m = "\033[0;7mur:recv\033[0;1;33m {}\033[0m"
m = "\033[0;7mur:recv\033[0;1;33m {}\033[0m\n" self.log(m.format(ret))
print(m.format(ret), end="") if not ret:
return ret raise UnrecvEOF("client stopped sending data")
except:
return b""
def recv_ex(self, nbytes): return ret
def recv_ex(self, nbytes, raise_on_trunc=True):
"""read an exact number of bytes""" """read an exact number of bytes"""
ret = self.recv(nbytes) try:
while ret and len(ret) < nbytes: ret = self.recv(nbytes)
buf = self.recv(nbytes - len(ret)) err = False
if not buf: except:
break ret = b""
err = True
ret += buf while not err and len(ret) < nbytes:
try:
ret += self.recv(nbytes - len(ret))
except OSError:
err = True
if err:
m = "client only sent {} of {} expected bytes".format(len(ret), nbytes)
if raise_on_trunc:
raise UnrecvEOF(m)
elif self.log:
self.log(m, 3)
return ret return ret
def unrecv(self, buf): def unrecv(self, buf):
self.buf = buf + self.buf self.buf = buf + self.buf
m = "\033[0;7mur:push\033[0;1;31m {}\n\033[0;7mur:rem:\033[0;1;35m {}\033[0m\n" m = "\033[0;7mur:push\033[0;1;31m {}\n\033[0;7mur:rem:\033[0;1;35m {}\033[0m"
print(m.format(buf, self.buf), end="") self.log(m.format(buf, self.buf))
Unrecv = _Unrecv Unrecv = _Unrecv
@ -577,7 +603,7 @@ def ren_open(fname, *args, **kwargs):
class MultipartParser(object): class MultipartParser(object):
def __init__(self, log_func, sr, http_headers): def __init__(self, log_func, sr, http_headers):
self.sr = sr self.sr = sr # type: Unrecv
self.log = log_func self.log = log_func
self.headers = http_headers self.headers = http_headers
@ -670,8 +696,9 @@ class MultipartParser(object):
blen = len(self.boundary) blen = len(self.boundary)
bufsz = 32 * 1024 bufsz = 32 * 1024
while True: while True:
buf = self.sr.recv(bufsz) try:
if not buf: buf = self.sr.recv(bufsz)
except:
# abort: client disconnected # abort: client disconnected
raise Pebkac(400, "client d/c during multipart post") raise Pebkac(400, "client d/c during multipart post")
@ -704,13 +731,12 @@ class MultipartParser(object):
yield buf[:-n] yield buf[:-n]
return return
buf2 = self.sr.recv(bufsz) try:
if not buf2: buf += self.sr.recv(bufsz)
except:
# abort: client disconnected # abort: client disconnected
raise Pebkac(400, "client d/c during multipart post") raise Pebkac(400, "client d/c during multipart post")
buf += buf2
yield buf yield buf
def _run_gen(self): def _run_gen(self):
@ -723,11 +749,11 @@ class MultipartParser(object):
fieldname, filename = self._read_header() fieldname, filename = self._read_header()
yield [fieldname, filename, self._read_data()] yield [fieldname, filename, self._read_data()]
tail = self.sr.recv_ex(2) tail = self.sr.recv_ex(2, False)
if tail == b"--": if tail == b"--":
# EOF indicated by this immediately after final boundary # EOF indicated by this immediately after final boundary
tail = self.sr.recv_ex(2) tail = self.sr.recv_ex(2, False)
run = False run = False
if tail != b"\r\n": if tail != b"\r\n":
@ -793,8 +819,9 @@ def get_boundary(headers):
def read_header(sr): def read_header(sr):
ret = b"" ret = b""
while True: while True:
buf = sr.recv(1024) try:
if not buf: ret += sr.recv(1024)
except:
if not ret: if not ret:
return None return None
@ -804,7 +831,6 @@ def read_header(sr):
+ ret.decode("utf-8", "replace"), + ret.decode("utf-8", "replace"),
) )
ret += buf
ofs = ret.find(b"\r\n\r\n") ofs = ret.find(b"\r\n\r\n")
if ofs < 0: if ofs < 0:
if len(ret) > 1024 * 64: if len(ret) > 1024 * 64:
@ -1111,8 +1137,9 @@ def read_socket(sr, total_size):
if bufsz > remains: if bufsz > remains:
bufsz = remains bufsz = remains
buf = sr.recv(bufsz) try:
if not buf: buf = sr.recv(bufsz)
except OSError:
m = "client d/c during binary post after {} bytes, {} bytes remaining" m = "client d/c during binary post after {} bytes, {} bytes remaining"
raise Pebkac(400, m.format(total_size - remains, remains)) raise Pebkac(400, m.format(total_size - remains, remains))
@ -1121,26 +1148,26 @@ def read_socket(sr, total_size):
def read_socket_unbounded(sr): def read_socket_unbounded(sr):
while True: try:
buf = sr.recv(32 * 1024) while True:
if not buf: yield sr.recv(32 * 1024)
return except:
return
yield buf
def read_socket_chunked(sr, log=None): def read_socket_chunked(sr, log=None):
err = "expected chunk length, got [{}] |{}| instead" err = "upload aborted: expected chunk length, got [{}] |{}| instead"
while True: while True:
buf = b"" buf = b""
while b"\r" not in buf: while b"\r" not in buf:
rbuf = sr.recv(2) try:
if not rbuf or len(buf) > 16: buf += sr.recv(2)
if len(buf) > 16:
raise Exception()
except:
err = err.format(buf.decode("utf-8", "replace"), len(buf)) err = err.format(buf.decode("utf-8", "replace"), len(buf))
raise Pebkac(400, err) raise Pebkac(400, err)
buf += rbuf
if not buf.endswith(b"\n"): if not buf.endswith(b"\n"):
sr.recv(1) sr.recv(1)
@ -1151,7 +1178,7 @@ def read_socket_chunked(sr, log=None):
raise Pebkac(400, err) raise Pebkac(400, err)
if chunklen == 0: if chunklen == 0:
x = sr.recv_ex(2) x = sr.recv_ex(2, False)
if x == b"\r\n": if x == b"\r\n":
return return
@ -1164,7 +1191,7 @@ def read_socket_chunked(sr, log=None):
for chunk in read_socket(sr, chunklen): for chunk in read_socket(sr, chunklen):
yield chunk yield chunk
x = sr.recv_ex(2) x = sr.recv_ex(2, False)
if x != b"\r\n": if x != b"\r\n":
m = "protocol error in chunk separator: want b'\\r\\n', got {!r}" m = "protocol error in chunk separator: want b'\\r\\n', got {!r}"
raise Pebkac(400, m.format(x)) raise Pebkac(400, m.format(x))

View file

@ -56,6 +56,7 @@ class Cfg(Namespace):
textfiles="", textfiles="",
doctitle="", doctitle="",
html_head="", html_head="",
lang="eng",
theme=0, theme=0,
themes=0, themes=0,
turbo=0, turbo=0,

View file

@ -36,6 +36,7 @@ class Cfg(Namespace):
"rsp_slp": 0, "rsp_slp": 0,
"s_wr_slp": 0, "s_wr_slp": 0,
"s_wr_sz": 512 * 1024, "s_wr_sz": 512 * 1024,
"lang": "eng",
"theme": 0, "theme": 0,
"themes": 0, "themes": 0,
"turbo": 0, "turbo": 0,

View file

@ -128,7 +128,7 @@ class VHttpSrv(object):
class VHttpConn(object): class VHttpConn(object):
def __init__(self, args, asrv, log, buf): def __init__(self, args, asrv, log, buf):
self.s = VSock(buf) self.s = VSock(buf)
self.sr = Unrecv(self.s) self.sr = Unrecv(self.s, None)
self.addr = ("127.0.0.1", "42069") self.addr = ("127.0.0.1", "42069")
self.args = args self.args = args
self.asrv = asrv self.asrv = asrv