diff --git a/copyparty/up2k.py b/copyparty/up2k.py index e71936c4..561c0a78 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -16,7 +16,7 @@ import threading from copy import deepcopy from .__init__ import WINDOWS, PY2 -from .util import Pebkac, Queue, fsdec, fsenc, sanitize_fn, ren_open +from .util import Pebkac, Queue, fsdec, fsenc, sanitize_fn, ren_open, atomic_move HAVE_SQLITE3 = False try: @@ -423,11 +423,7 @@ class Up2k(object): if ret > 0: return ret, src - if not PY2: - os.replace(src, dst) - else: - os.unlink(dst) - os.rename(src, dst) + atomic_move(src, dst) if WINDOWS: self.lastmod_q.put([dst, (int(time.time()), int(job["lmod"]))]) @@ -545,14 +541,14 @@ class Up2k(object): self.log("lmod", "failed to utime ({}, {})".format(path, times)) def _snapshot(self): + persist_interval = 30 # persist unfinished uploads index every 30 sec + discard_interval = 3600 # drop unfinished uploads after 1 hour inactivity prev = {} while True: - # persist pending uploads every 60sec - time.sleep(30) + time.sleep(persist_interval) for k, reg in self.registry.items(): now = time.time() - # discard abandoned uploads (idle >= 1h) - rm = [x for x in reg.values() if now - x["poke"] > 60] + rm = [x for x in reg.values() if now - x["poke"] > discard_interval] if rm: m = "dropping {} abandoned uploads in {}".format(len(rm), k) vis = [self._vis_job_progress(x) for x in rm] @@ -560,22 +556,33 @@ class Up2k(object): for job in rm: del reg[job["wark"]] try: - # try to remove the placeholder zero-byte file too + # remove the placeholder zero-byte file (keep the PARTIAL) path = os.path.join(job["ptop"], job["prel"], job["name"]) if os.path.getsize(path) == 0: os.unlink(path) except: pass - newest = max(x["poke"] for _, x in reg.items()) if reg else 0 - etag = [len(reg), newest] - if etag == prev.get(k, None) or not reg: + path = os.path.join(k, ".hist", "up2k.snap") + if not reg: + if k not in prev or prev[k] is not None: + prev[k] = None + if os.path.exists(path): + os.unlink(path) + continue + newest = max(x["poke"] for _, x in reg.items()) if reg else 0 + etag = [len(reg), newest] + if etag == prev.get(k, None): + continue + + path2 = "{}.{}".format(path, os.getpid()) j = json.dumps(reg, indent=2, sort_keys=True).encode("utf-8") - path = os.path.join(k, ".hist", "up2k.snap") - with gzip.GzipFile(path, "wb") as f: + with gzip.GzipFile(path2, "wb") as f: f.write(j) + atomic_move(path2, path) + self.log("up2k", "snap: {} |{}|".format(path, len(reg.keys()))) prev[k] = etag diff --git a/copyparty/util.py b/copyparty/util.py index a009a1c8..45e0c50f 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -549,6 +549,16 @@ else: fsdec = w8dec +def atomic_move(src, dst): + if not PY2: + os.replace(src, dst) + else: + if os.path.exists(dst): + os.unlink(dst) + + os.rename(src, dst) + + def read_socket(sr, total_size): remains = total_size while remains > 0: