diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 8fbaf4bf..6ff0439a 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -376,6 +376,7 @@ def run_argparse(argv, formatter): ap2.add_argument("--no-fpool", action="store_true", help="disable file-handle pooling -- instead, repeatedly close and reopen files during upload") ap2.add_argument("--use-fpool", action="store_true", help="force file-handle pooling, even if copyparty thinks you're better off without") ap2.add_argument("--no-symlink", action="store_true", help="duplicate file contents instead") + ap2.add_argument("--reg-cap", metavar="N", type=int, default=9000, help="max number of uploads to keep in memory when running without -e2d") ap2 = ap.add_argument_group('network options') ap2.add_argument("-i", metavar="IP", type=u, default="0.0.0.0", help="ip to bind (comma-sep.)") @@ -520,7 +521,7 @@ def main(argv=None): if HAVE_SSL: ensure_cert() - for k, v in zip(argv, argv[1:]): + for k, v in zip(argv[1:], argv[2:]): if k == "-c": supp = args_from_cfg(v) argv.extend(supp) diff --git a/copyparty/up2k.py b/copyparty/up2k.py index caeae0eb..58dea1bf 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -73,6 +73,7 @@ class Up2k(object): self.need_rescan = {} self.dupesched = {} self.registry = {} + self.droppable = {} self.entags = {} self.flags = {} self.cur = {} @@ -125,11 +126,11 @@ class Up2k(object): all_vols = self.asrv.vfs.all_vols have_e2d = self.init_indexes(all_vols) - if have_e2d: - thr = threading.Thread(target=self._snapshot, name="up2k-snapshot") - thr.daemon = True - thr.start() + thr = threading.Thread(target=self._snapshot, name="up2k-snapshot") + thr.daemon = True + thr.start() + if have_e2d: thr = threading.Thread(target=self._hasher, name="up2k-hasher") thr.daemon = True thr.start() @@ -295,7 +296,8 @@ class Up2k(object): def _vis_reg_progress(self, reg): ret = [] for _, job in reg.items(): - ret.append(self._vis_job_progress(job)) + if job["need"]: + ret.append(self._vis_job_progress(job)) return ret @@ -483,12 +485,19 @@ class Up2k(object): self.log("/{} {}".format(vpath, " ".join(sorted(a))), "35") reg = {} + drp = None path = os.path.join(histpath, "up2k.snap") - if "e2d" in flags and bos.path.exists(path): + if bos.path.exists(path): with gzip.GzipFile(path, "rb") as f: j = f.read().decode("utf-8") reg2 = json.loads(j) + try: + drp = reg2["droppable"] + reg2 = reg2["registry"] + except: + pass + for k, job in reg2.items(): path = os.path.join(job["ptop"], job["prel"], job["name"]) if bos.path.exists(path): @@ -498,12 +507,19 @@ class Up2k(object): else: self.log("ign deleted file in snap: [{}]".format(path)) - m = "loaded snap {} |{}|".format(path, len(reg.keys())) + if drp is None: + drp = [k for k, v in reg.items() if not v.get("need", [])] + else: + drp = [x for x in drp if x in reg] + + m = "loaded snap {} |{}| ({})".format(path, len(reg.keys()), len(drp or [])) m = [m] + self._vis_reg_progress(reg) self.log("\n".join(m)) self.flags[ptop] = flags self.registry[ptop] = reg + self.droppable[ptop] = drp or [] + self.regdrop(ptop, None) if not HAVE_SQLITE3 or "e2d" not in flags or "d2d" in flags: return None @@ -1494,7 +1510,7 @@ class Up2k(object): return ret, src if self.args.nw: - # del self.registry[ptop][wark] + self.regdrop(ptop, wark) return ret, dst # windows cant rename open files @@ -1526,9 +1542,9 @@ class Up2k(object): a = [job[x] for x in "ptop wark prel name lmod size addr".split()] a += [job.get("at") or time.time()] if self.idx_wark(*a): - # self.log("pop " + wark + " " + dst + " finish_upload idx_wark", 4) del self.registry[ptop][wark] - # in-memory registry is reserved for unfinished uploads + else: + self.regdrop(ptop, wark) dupes = self.dupesched.pop(dst, []) if not dupes: @@ -1548,6 +1564,21 @@ class Up2k(object): if cur: cur.connection.commit() + def regdrop(self, ptop, wark): + t = self.droppable[ptop] + if wark: + t.append(wark) + + if len(t) <= self.args.reg_cap: + return + + n = len(t) - int(self.args.reg_cap / 2) + m = "up2k-registry [{}] has {} droppables; discarding {}" + self.log(m.format(ptop, len(t), n)) + for k in t[:n]: + self.registry[ptop].pop(k, None) + self.droppable[ptop] = t[n:] + def idx_wark(self, ptop, wark, rd, fn, lmod, sz, ip, at): cur = self.cur.get(ptop) if not cur: @@ -2063,7 +2094,8 @@ class Up2k(object): bos.makedirs(histpath) path2 = "{}.{}".format(path, os.getpid()) - j = json.dumps(reg, indent=2, sort_keys=True).encode("utf-8") + body = {"droppable": self.droppable[ptop], "registry": reg} + j = json.dumps(body, indent=2, sort_keys=True).encode("utf-8") with gzip.GzipFile(path2, "wb") as f: f.write(j)