diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 2fda3376..a973282f 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -461,7 +461,7 @@ class HttpCli(object): self.log("clone {} done".format(cstart[0])) x = self.conn.hsrv.broker.put(True, "up2k.confirm_chunk", ptop, wark, chash) - num_left = x.get() + num_left, path = x.get() if not WINDOWS and num_left == 0: times = (int(time.time()), int(lastmod)) @@ -929,7 +929,7 @@ class HttpCli(object): remains = sendfile_kern(lower, upper, f, self.s) else: remains = sendfile_py(lower, upper, f, self.s) - + if remains > 0: logmsg += " \033[31m" + str(upper - remains) + "\033[0m" @@ -1027,7 +1027,8 @@ class HttpCli(object): if abspath.endswith(".md") and "raw" not in self.uparam: return self.tx_md(abspath) - if abspath.endswith("{0}.hist{0}up2k.db".format(os.sep)): + bad = "{0}.hist{0}up2k.".format(os.sep) + if abspath.endswith(bad + "db") or abspath.endswith(bad + "snap"): raise Pebkac(403) return self.tx_file(abspath) diff --git a/copyparty/up2k.py b/copyparty/up2k.py index d9e13eb6..e71936c4 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -6,6 +6,8 @@ import os import re import time import math +import json +import gzip import stat import shutil import base64 @@ -13,7 +15,7 @@ import hashlib import threading from copy import deepcopy -from .__init__ import WINDOWS +from .__init__ import WINDOWS, PY2 from .util import Pebkac, Queue, fsdec, fsenc, sanitize_fn, ren_open HAVE_SQLITE3 = False @@ -54,6 +56,11 @@ class Up2k(object): thr.daemon = True thr.start() + if self.persist: + thr = threading.Thread(target=self._snapshot) + thr.daemon = True + thr.start() + # static self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$") @@ -61,12 +68,38 @@ class Up2k(object): m = "could not initialize sqlite3, will use in-memory registry only" self.log("up2k", m) + def _vis_job_progress(self, job): + perc = 100 - (len(job["need"]) * 100.0 / len(job["hash"])) + path = os.path.join(job["ptop"], job["prel"], job["name"]) + return "{:5.1f}% {}".format(perc, path) + + def _vis_reg_progress(self, reg): + ret = [] + for _, job in reg.items(): + ret.append(self._vis_job_progress(job)) + + return ret + def register_vpath(self, ptop): with self.mutex: if ptop in self.registry: return None - self.registry[ptop] = {} + reg = {} + path = os.path.join(ptop, ".hist", "up2k.snap") + if self.persist and os.path.exists(path): + with gzip.GzipFile(path, "rb") as f: + j = f.read().decode("utf-8") + + reg = json.loads(j) + for _, job in reg.items(): + job["poke"] = time.time() + + m = "loaded snap {} |{}|".format(path, len(reg.keys())) + m = [m] + self._vis_reg_progress(reg) + self.log("up2k", "\n".join(m)) + + self.registry[ptop] = reg if not self.persist or not HAVE_SQLITE3: return None @@ -107,7 +140,7 @@ class Up2k(object): except Exception as ex: self.log("up2k", "listdir: " + repr(ex)) return - + histdir = os.path.join(top, ".hist") for inode in inodes: abspath = os.path.join(cdir, inode) @@ -116,11 +149,11 @@ class Up2k(object): except Exception as ex: self.log("up2k", "stat: " + repr(ex)) continue - + if stat.S_ISDIR(inf.st_mode): if abspath in excl or abspath == histdir: continue - self.log("up2k", " dir: {}".format(abspath)) + # self.log("up2k", " dir: {}".format(abspath)) self._build_dir(dbw, top, excl, abspath) else: # self.log("up2k", "file: {}".format(abspath)) @@ -151,7 +184,7 @@ class Up2k(object): except Exception as ex: self.log("up2k", "hash: " + repr(ex)) continue - + wark = self._wark_from_hashlist(inf.st_size, hashes) self.db_add(dbw[0], wark, rp, inf.st_mtime, inf.st_size) dbw[1] += 1 @@ -221,6 +254,7 @@ class Up2k(object): def handle_json(self, cj): self.register_vpath(cj["ptop"]) cj["name"] = sanitize_fn(cj["name"]) + cj["poke"] = time.time() wark = self._get_wark(cj) now = time.time() job = None @@ -368,24 +402,35 @@ class Up2k(object): if not nchunk: raise Pebkac(400, "unknown chunk") + job["poke"] = time.time() + chunksize = self._get_chunksize(job["size"]) ofs = [chunksize * x for x in nchunk] - path = os.path.join(job["ptop"], job["prel"], job["name"]) + path = os.path.join(job["ptop"], job["prel"], job["tnam"]) return [chunksize, ofs, path, job["lmod"]] def confirm_chunk(self, ptop, wark, chash): with self.mutex: job = self.registry[ptop][wark] + pdir = os.path.join(job["ptop"], job["prel"]) + src = os.path.join(pdir, job["tnam"]) + dst = os.path.join(pdir, job["name"]) + job["need"].remove(chash) ret = len(job["need"]) if ret > 0: - return ret + return ret, src + + if not PY2: + os.replace(src, dst) + else: + os.unlink(dst) + os.rename(src, dst) if WINDOWS: - path = os.path.join(job["ptop"], job["prel"], job["name"]) - self.lastmod_q.put([path, (int(time.time()), int(job["lmod"]))]) + self.lastmod_q.put([dst, (int(time.time()), int(job["lmod"]))]) db = self.db.get(job["ptop"], None) if db: @@ -396,7 +441,7 @@ class Up2k(object): del self.registry[ptop][wark] # in-memory registry is reserved for unfinished uploads - return ret + return ret, dst def _get_chunksize(self, filesize): chunksize = 1024 * 1024 @@ -475,10 +520,13 @@ class Up2k(object): def _new_upload(self, job): self.registry[job["ptop"]][job["wark"]] = job - suffix = ".{:.6f}-{}".format(job["t0"], job["addr"]) pdir = os.path.join(job["ptop"], job["prel"]) - with ren_open(job["name"], "wb", fdir=pdir, suffix=suffix) as f: - f, job["name"] = f["orz"] + job["name"] = self._untaken(pdir, job["name"], job["t0"], job["addr"]) + + tnam = job["name"] + ".PARTIAL" + suffix = ".{:.6f}-{}".format(job["t0"], job["addr"]) + with ren_open(tnam, "wb", fdir=pdir, suffix=suffix) as f: + f, job["tnam"] = f["orz"] f.seek(job["size"] - 1) f.write(b"e") @@ -495,3 +543,39 @@ class Up2k(object): os.utime(fsenc(path), times) except: self.log("lmod", "failed to utime ({}, {})".format(path, times)) + + def _snapshot(self): + prev = {} + while True: + # persist pending uploads every 60sec + time.sleep(30) + for k, reg in self.registry.items(): + now = time.time() + # discard abandoned uploads (idle >= 1h) + rm = [x for x in reg.values() if now - x["poke"] > 60] + if rm: + m = "dropping {} abandoned uploads in {}".format(len(rm), k) + vis = [self._vis_job_progress(x) for x in rm] + self.log("up2k", "\n".join([m] + vis)) + for job in rm: + del reg[job["wark"]] + try: + # try to remove the placeholder zero-byte file too + path = os.path.join(job["ptop"], job["prel"], job["name"]) + if os.path.getsize(path) == 0: + os.unlink(path) + except: + pass + + newest = max(x["poke"] for _, x in reg.items()) if reg else 0 + etag = [len(reg), newest] + if etag == prev.get(k, None) or not reg: + continue + + j = json.dumps(reg, indent=2, sort_keys=True).encode("utf-8") + path = os.path.join(k, ".hist", "up2k.snap") + with gzip.GzipFile(path, "wb") as f: + f.write(j) + + self.log("up2k", "snap: {} |{}|".format(path, len(reg.keys()))) + prev[k] = etag