sizelimit registry + persist without e2d

This commit is contained in:
ed 2021-11-16 21:31:24 +01:00
parent aea3843cf2
commit c9b1c48c72
2 changed files with 45 additions and 12 deletions

View file

@ -376,6 +376,7 @@ def run_argparse(argv, formatter):
ap2.add_argument("--no-fpool", action="store_true", help="disable file-handle pooling -- instead, repeatedly close and reopen files during upload")
ap2.add_argument("--use-fpool", action="store_true", help="force file-handle pooling, even if copyparty thinks you're better off without")
ap2.add_argument("--no-symlink", action="store_true", help="duplicate file contents instead")
ap2.add_argument("--reg-cap", metavar="N", type=int, default=9000, help="max number of uploads to keep in memory when running without -e2d")
ap2 = ap.add_argument_group('network options')
ap2.add_argument("-i", metavar="IP", type=u, default="0.0.0.0", help="ip to bind (comma-sep.)")
@ -520,7 +521,7 @@ def main(argv=None):
if HAVE_SSL:
ensure_cert()
for k, v in zip(argv, argv[1:]):
for k, v in zip(argv[1:], argv[2:]):
if k == "-c":
supp = args_from_cfg(v)
argv.extend(supp)

View file

@ -73,6 +73,7 @@ class Up2k(object):
self.need_rescan = {}
self.dupesched = {}
self.registry = {}
self.droppable = {}
self.entags = {}
self.flags = {}
self.cur = {}
@ -125,11 +126,11 @@ class Up2k(object):
all_vols = self.asrv.vfs.all_vols
have_e2d = self.init_indexes(all_vols)
if have_e2d:
thr = threading.Thread(target=self._snapshot, name="up2k-snapshot")
thr.daemon = True
thr.start()
thr = threading.Thread(target=self._snapshot, name="up2k-snapshot")
thr.daemon = True
thr.start()
if have_e2d:
thr = threading.Thread(target=self._hasher, name="up2k-hasher")
thr.daemon = True
thr.start()
@ -295,7 +296,8 @@ class Up2k(object):
def _vis_reg_progress(self, reg):
ret = []
for _, job in reg.items():
ret.append(self._vis_job_progress(job))
if job["need"]:
ret.append(self._vis_job_progress(job))
return ret
@ -483,12 +485,19 @@ class Up2k(object):
self.log("/{} {}".format(vpath, " ".join(sorted(a))), "35")
reg = {}
drp = None
path = os.path.join(histpath, "up2k.snap")
if "e2d" in flags and bos.path.exists(path):
if bos.path.exists(path):
with gzip.GzipFile(path, "rb") as f:
j = f.read().decode("utf-8")
reg2 = json.loads(j)
try:
drp = reg2["droppable"]
reg2 = reg2["registry"]
except:
pass
for k, job in reg2.items():
path = os.path.join(job["ptop"], job["prel"], job["name"])
if bos.path.exists(path):
@ -498,12 +507,19 @@ class Up2k(object):
else:
self.log("ign deleted file in snap: [{}]".format(path))
m = "loaded snap {} |{}|".format(path, len(reg.keys()))
if drp is None:
drp = [k for k, v in reg.items() if not v.get("need", [])]
else:
drp = [x for x in drp if x in reg]
m = "loaded snap {} |{}| ({})".format(path, len(reg.keys()), len(drp or []))
m = [m] + self._vis_reg_progress(reg)
self.log("\n".join(m))
self.flags[ptop] = flags
self.registry[ptop] = reg
self.droppable[ptop] = drp or []
self.regdrop(ptop, None)
if not HAVE_SQLITE3 or "e2d" not in flags or "d2d" in flags:
return None
@ -1494,7 +1510,7 @@ class Up2k(object):
return ret, src
if self.args.nw:
# del self.registry[ptop][wark]
self.regdrop(ptop, wark)
return ret, dst
# windows cant rename open files
@ -1526,9 +1542,9 @@ class Up2k(object):
a = [job[x] for x in "ptop wark prel name lmod size addr".split()]
a += [job.get("at") or time.time()]
if self.idx_wark(*a):
# self.log("pop " + wark + " " + dst + " finish_upload idx_wark", 4)
del self.registry[ptop][wark]
# in-memory registry is reserved for unfinished uploads
else:
self.regdrop(ptop, wark)
dupes = self.dupesched.pop(dst, [])
if not dupes:
@ -1548,6 +1564,21 @@ class Up2k(object):
if cur:
cur.connection.commit()
def regdrop(self, ptop, wark):
t = self.droppable[ptop]
if wark:
t.append(wark)
if len(t) <= self.args.reg_cap:
return
n = len(t) - int(self.args.reg_cap / 2)
m = "up2k-registry [{}] has {} droppables; discarding {}"
self.log(m.format(ptop, len(t), n))
for k in t[:n]:
self.registry[ptop].pop(k, None)
self.droppable[ptop] = t[n:]
def idx_wark(self, ptop, wark, rd, fn, lmod, sz, ip, at):
cur = self.cur.get(ptop)
if not cur:
@ -2063,7 +2094,8 @@ class Up2k(object):
bos.makedirs(histpath)
path2 = "{}.{}".format(path, os.getpid())
j = json.dumps(reg, indent=2, sort_keys=True).encode("utf-8")
body = {"droppable": self.droppable[ptop], "registry": reg}
j = json.dumps(body, indent=2, sort_keys=True).encode("utf-8")
with gzip.GzipFile(path2, "wb") as f:
f.write(j)