diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 561c0a78..251888c5 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -261,7 +261,7 @@ class Up2k(object): with self.mutex: db = self.db.get(cj["ptop"], None) reg = self.registry[cj["ptop"]] - if wark not in reg and db: + if db: cur = db.execute(r"select * from up where w = ?", (wark,)) for _, dtime, dsize, dp_rel in cur: dp_abs = os.path.join(cj["ptop"], dp_rel).replace("\\", "/") @@ -286,6 +286,9 @@ class Up2k(object): } break + if job and wark in reg: + del reg[wark] + if job or wark in reg: job = job or reg[wark] if job["prel"] != cj["prel"] or job["name"] != cj["name"]: @@ -546,43 +549,46 @@ class Up2k(object): prev = {} while True: time.sleep(persist_interval) - for k, reg in self.registry.items(): - now = time.time() - rm = [x for x in reg.values() if now - x["poke"] > discard_interval] - if rm: - m = "dropping {} abandoned uploads in {}".format(len(rm), k) - vis = [self._vis_job_progress(x) for x in rm] - self.log("up2k", "\n".join([m] + vis)) - for job in rm: - del reg[job["wark"]] - try: - # remove the placeholder zero-byte file (keep the PARTIAL) - path = os.path.join(job["ptop"], job["prel"], job["name"]) - if os.path.getsize(path) == 0: - os.unlink(path) - except: - pass + with self.mutex: + for k, reg in self.registry.items(): + self._snap_reg(prev, k, reg, discard_interval) - path = os.path.join(k, ".hist", "up2k.snap") - if not reg: - if k not in prev or prev[k] is not None: - prev[k] = None - if os.path.exists(path): - os.unlink(path) + def _snap_reg(self, prev, k, reg, discard_interval): + now = time.time() + rm = [x for x in reg.values() if now - x["poke"] > discard_interval] + if rm: + m = "dropping {} abandoned uploads in {}".format(len(rm), k) + vis = [self._vis_job_progress(x) for x in rm] + self.log("up2k", "\n".join([m] + vis)) + for job in rm: + del reg[job["wark"]] + try: + # remove the placeholder zero-byte file (keep the PARTIAL) + path = os.path.join(job["ptop"], job["prel"], job["name"]) + if os.path.getsize(path) == 0: + os.unlink(path) + except: + pass - continue + path = os.path.join(k, ".hist", "up2k.snap") + if not reg: + if k not in prev or prev[k] is not None: + prev[k] = None + if os.path.exists(path): + os.unlink(path) + return - newest = max(x["poke"] for _, x in reg.items()) if reg else 0 - etag = [len(reg), newest] - if etag == prev.get(k, None): - continue + newest = max(x["poke"] for _, x in reg.items()) if reg else 0 + etag = [len(reg), newest] + if etag == prev.get(k, None): + return - path2 = "{}.{}".format(path, os.getpid()) - j = json.dumps(reg, indent=2, sort_keys=True).encode("utf-8") - with gzip.GzipFile(path2, "wb") as f: - f.write(j) + path2 = "{}.{}".format(path, os.getpid()) + j = json.dumps(reg, indent=2, sort_keys=True).encode("utf-8") + with gzip.GzipFile(path2, "wb") as f: + f.write(j) - atomic_move(path2, path) + atomic_move(path2, path) - self.log("up2k", "snap: {} |{}|".format(path, len(reg.keys()))) - prev[k] = etag + self.log("up2k", "snap: {} |{}|".format(path, len(reg.keys()))) + prev[k] = etag