diff --git a/copyparty/__main__.py b/copyparty/__main__.py index ac78f973..35862370 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -344,6 +344,8 @@ def run_argparse(argv, formatter): ap2.add_argument("--dotpart", action="store_true", help="dotfile incomplete uploads") ap2.add_argument("--sparse", metavar="MiB", type=int, default=4, help="up2k min.size threshold (mswin-only)") ap2.add_argument("--unpost", metavar="SEC", type=int, default=3600*12, help="grace period where uploads can be deleted by the uploader, even without delete permissions; 0=disabled") + ap2.add_argument("--no-fpool", action="store_true", help="disable file-handle pooling -- instead, repeatedly close and reopen files during upload") + ap2.add_argument("--use-fpool", action="store_true", help="force file-handle pooling, even if copyparty thinks you're better off without") ap2 = ap.add_argument_group('network options') ap2.add_argument("-i", metavar="IP", type=u, default="0.0.0.0", help="ip to bind (comma-sep.)") diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 5160f43d..462aa194 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -39,6 +39,7 @@ class HttpCli(object): def __init__(self, conn): self.t0 = time.time() self.conn = conn + self.mutex = conn.mutex self.s = conn.s # type: socket self.sr = conn.sr # type: Unrecv self.ip = conn.addr[0] @@ -47,6 +48,7 @@ class HttpCli(object): self.asrv = conn.asrv # type: AuthSrv self.ico = conn.ico self.thumbcli = conn.thumbcli + self.u2fh = conn.u2fh self.log_func = conn.log_func self.log_src = conn.log_src self.tls = hasattr(self.s, "cipher") @@ -835,7 +837,18 @@ class HttpCli(object): reader = read_socket(self.sr, remains) - with open(fsenc(path), "rb+", 512 * 1024) as f: + f = None + fpool = not self.args.no_fpool + if fpool: + with self.mutex: + try: + f = self.u2fh.pop(path) + except: + pass + + f = f or open(fsenc(path), "rb+", 512 * 1024) + + try: f.seek(cstart[0]) post_sz, _, sha_b64 = hashcopy(reader, f) @@ -865,22 +878,36 @@ class HttpCli(object): ofs += len(buf) self.log("clone {} done".format(cstart[0])) + finally: + if not fpool: + f.close() + else: + with self.mutex: + self.u2fh.put(path, f) x = self.conn.hsrv.broker.put(True, "up2k.confirm_chunk", ptop, wark, chash) x = x.get() try: - num_left, path = x + num_left, fin_path = x except: self.loud_reply(x, status=500) return False - if not ANYWIN and num_left == 0: + if not num_left and fpool: + with self.mutex: + self.u2fh.close(path) + + # windows cant rename open files + if ANYWIN and path != fin_path and not self.args.nw: + self.conn.hsrv.broker.put(True, "up2k.finish_upload", ptop, wark).get() + + if not ANYWIN and not num_left: times = (int(time.time()), int(lastmod)) self.log("no more chunks, setting times {}".format(times)) try: - bos.utime(path, times) + bos.utime(fin_path, times) except: - self.log("failed to utime ({}, {})".format(path, times)) + self.log("failed to utime ({}, {})".format(fin_path, times)) spd = self._spd(post_sz) self.log("{} thank".format(spd)) diff --git a/copyparty/httpconn.py b/copyparty/httpconn.py index 32681640..7967e481 100644 --- a/copyparty/httpconn.py +++ b/copyparty/httpconn.py @@ -32,9 +32,11 @@ class HttpConn(object): self.addr = addr self.hsrv = hsrv + self.mutex = hsrv.mutex self.args = hsrv.args self.asrv = hsrv.asrv self.cert_path = hsrv.cert_path + self.u2fh = hsrv.u2fh enth = HAVE_PIL and not self.args.no_thumb self.thumbcli = ThumbCli(hsrv.broker) if enth else None diff --git a/copyparty/httpsrv.py b/copyparty/httpsrv.py index fe0d5534..b227ab78 100644 --- a/copyparty/httpsrv.py +++ b/copyparty/httpsrv.py @@ -27,7 +27,7 @@ except ImportError: sys.exit(1) from .__init__ import E, PY2, MACOS -from .util import spack, min_ex, start_stackmon, start_log_thrs +from .util import FHC, spack, min_ex, start_stackmon, start_log_thrs from .bos import bos from .httpconn import HttpConn @@ -50,7 +50,10 @@ class HttpSrv(object): self.log = broker.log self.asrv = broker.asrv - self.name = "httpsrv" + ("-n{}-i{:x}".format(nid, os.getpid()) if nid else "") + nsuf = "-{}".format(nid) if nid else "" + nsuf2 = "-n{}-i{:x}".format(nid, os.getpid()) if nid else "" + + self.name = "hsrv" + nsuf2 self.mutex = threading.Lock() self.stopping = False @@ -59,6 +62,7 @@ class HttpSrv(object): self.tp_time = None # latest worker collect self.tp_q = None if self.args.no_htp else queue.LifoQueue() + self.u2fh = FHC() self.srvs = [] self.ncli = 0 # exact self.clients = {} # laggy @@ -82,11 +86,6 @@ class HttpSrv(object): if self.tp_q: self.start_threads(4) - name = "httpsrv-scaler" + ("-{}".format(nid) if nid else "") - t = threading.Thread(target=self.thr_scaler, name=name) - t.daemon = True - t.start() - if nid: if self.args.stackmon: start_stackmon(self.args.stackmon, nid) @@ -94,6 +93,10 @@ class HttpSrv(object): if self.args.log_thrs: start_log_thrs(self.log, self.args.log_thrs, nid) + t = threading.Thread(target=self.periodic, name="hsrv-pt" + nsuf) + t.daemon = True + t.start() + def start_threads(self, n): self.tp_nthr += n if self.args.log_htp: @@ -115,13 +118,15 @@ class HttpSrv(object): for _ in range(n): self.tp_q.put(None) - def thr_scaler(self): + def periodic(self): while True: - time.sleep(2 if self.tp_ncli else 30) + time.sleep(2 if self.tp_ncli else 10) with self.mutex: - self.tp_ncli = max(self.ncli, self.tp_ncli - 2) - if self.tp_nthr > self.tp_ncli + 8: - self.stop_threads(4) + self.u2fh.clean() + if self.tp_q: + self.tp_ncli = max(self.ncli, self.tp_ncli - 2) + if self.tp_nthr > self.tp_ncli + 8: + self.stop_threads(4) def listen(self, sck, nlisteners): ip, port = sck.getsockname() diff --git a/copyparty/svchub.py b/copyparty/svchub.py index 232f0c12..9b3b21f9 100644 --- a/copyparty/svchub.py +++ b/copyparty/svchub.py @@ -53,6 +53,17 @@ class SvcHub(object): if args.log_thrs: start_log_thrs(self.log, args.log_thrs, 0) + if not ANYWIN and not args.use_fpool: + args.no_fpool = True + + if not args.no_fpool and args.j != 1: + m = "WARNING: --use-fpool combined with multithreading is untested and can probably cause undefined behavior" + if ANYWIN: + m = "windows cannot do multithreading without --no-fpool, so enabling that -- note that upload performance will suffer if you have microsoft defender \"real-time protection\" enabled, so you probably want to use -j 1 instead" + args.no_fpool = True + + self.log("root", m, c=3) + # initiate all services to manage self.asrv = AuthSrv(self.args, self.log) if args.ls: diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 21c7fe11..ba3c8767 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -1336,6 +1336,22 @@ class Up2k(object): # del self.registry[ptop][wark] return ret, dst + # windows cant rename open files + if not ANYWIN or src == dst: + self.finish_upload(ptop, wark) + + return ret, dst + + def finish_upload(self, ptop, wark): + with self.mutex: + try: + job = self.registry[ptop][wark] + pdir = os.path.join(job["ptop"], job["prel"]) + src = os.path.join(pdir, job["tnam"]) + dst = os.path.join(pdir, job["name"]) + except Exception as ex: + return "finish_upload, wark, " + repr(ex) + atomic_move(src, dst) if ANYWIN: @@ -1348,8 +1364,6 @@ class Up2k(object): del self.registry[ptop][wark] # in-memory registry is reserved for unfinished uploads - return ret, dst - def idx_wark(self, ptop, wark, rd, fn, lmod, sz, ip, at): cur = self.cur.get(ptop) if not cur: diff --git a/copyparty/util.py b/copyparty/util.py index ffcc8a7c..abf63994 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -251,6 +251,55 @@ class _LUnrecv(object): Unrecv = _Unrecv +class FHC(object): + class CE(object): + def __init__(self, fh): + self.ts = 0 + self.fhs = [fh] + + def __init__(self): + self.cache = {} + + def close(self, path): + try: + ce = self.cache[path] + except: + return + + for fh in ce.fhs: + fh.close() + + del self.cache[path] + + def clean(self): + if not self.cache: + return + + keep = {} + now = time.time() + for path, ce in self.cache.items(): + if now < ce.ts + 5: + keep[path] = ce + else: + for fh in ce.fhs: + fh.close() + + self.cache = keep + + def pop(self, path): + return self.cache[path].fhs.pop() + + def put(self, path, fh): + try: + ce = self.cache[path] + ce.fhs.append(fh) + except: + ce = self.CE(fh) + self.cache[path] = ce + + ce.ts = time.time() + + class ProgressPrinter(threading.Thread): """ periodically print progress info without linefeeds