prefer sqlite over registry snaps

This commit is contained in:
ed 2021-01-10 17:47:27 +01:00
parent fb853edbe3
commit d64e9b85a7

View file

@ -261,7 +261,7 @@ class Up2k(object):
with self.mutex: with self.mutex:
db = self.db.get(cj["ptop"], None) db = self.db.get(cj["ptop"], None)
reg = self.registry[cj["ptop"]] reg = self.registry[cj["ptop"]]
if wark not in reg and db: if db:
cur = db.execute(r"select * from up where w = ?", (wark,)) cur = db.execute(r"select * from up where w = ?", (wark,))
for _, dtime, dsize, dp_rel in cur: for _, dtime, dsize, dp_rel in cur:
dp_abs = os.path.join(cj["ptop"], dp_rel).replace("\\", "/") dp_abs = os.path.join(cj["ptop"], dp_rel).replace("\\", "/")
@ -286,6 +286,9 @@ class Up2k(object):
} }
break break
if job and wark in reg:
del reg[wark]
if job or wark in reg: if job or wark in reg:
job = job or reg[wark] job = job or reg[wark]
if job["prel"] != cj["prel"] or job["name"] != cj["name"]: if job["prel"] != cj["prel"] or job["name"] != cj["name"]:
@ -546,43 +549,46 @@ class Up2k(object):
prev = {} prev = {}
while True: while True:
time.sleep(persist_interval) time.sleep(persist_interval)
for k, reg in self.registry.items(): with self.mutex:
now = time.time() for k, reg in self.registry.items():
rm = [x for x in reg.values() if now - x["poke"] > discard_interval] self._snap_reg(prev, k, reg, discard_interval)
if rm:
m = "dropping {} abandoned uploads in {}".format(len(rm), k)
vis = [self._vis_job_progress(x) for x in rm]
self.log("up2k", "\n".join([m] + vis))
for job in rm:
del reg[job["wark"]]
try:
# remove the placeholder zero-byte file (keep the PARTIAL)
path = os.path.join(job["ptop"], job["prel"], job["name"])
if os.path.getsize(path) == 0:
os.unlink(path)
except:
pass
path = os.path.join(k, ".hist", "up2k.snap") def _snap_reg(self, prev, k, reg, discard_interval):
if not reg: now = time.time()
if k not in prev or prev[k] is not None: rm = [x for x in reg.values() if now - x["poke"] > discard_interval]
prev[k] = None if rm:
if os.path.exists(path): m = "dropping {} abandoned uploads in {}".format(len(rm), k)
os.unlink(path) vis = [self._vis_job_progress(x) for x in rm]
self.log("up2k", "\n".join([m] + vis))
for job in rm:
del reg[job["wark"]]
try:
# remove the placeholder zero-byte file (keep the PARTIAL)
path = os.path.join(job["ptop"], job["prel"], job["name"])
if os.path.getsize(path) == 0:
os.unlink(path)
except:
pass
continue path = os.path.join(k, ".hist", "up2k.snap")
if not reg:
if k not in prev or prev[k] is not None:
prev[k] = None
if os.path.exists(path):
os.unlink(path)
return
newest = max(x["poke"] for _, x in reg.items()) if reg else 0 newest = max(x["poke"] for _, x in reg.items()) if reg else 0
etag = [len(reg), newest] etag = [len(reg), newest]
if etag == prev.get(k, None): if etag == prev.get(k, None):
continue return
path2 = "{}.{}".format(path, os.getpid()) path2 = "{}.{}".format(path, os.getpid())
j = json.dumps(reg, indent=2, sort_keys=True).encode("utf-8") j = json.dumps(reg, indent=2, sort_keys=True).encode("utf-8")
with gzip.GzipFile(path2, "wb") as f: with gzip.GzipFile(path2, "wb") as f:
f.write(j) f.write(j)
atomic_move(path2, path) atomic_move(path2, path)
self.log("up2k", "snap: {} |{}|".format(path, len(reg.keys()))) self.log("up2k", "snap: {} |{}|".format(path, len(reg.keys())))
prev[k] = etag prev[k] = etag