mirror of
https://github.com/9001/copyparty.git
synced 2025-08-17 09:02:15 -06:00
persist/timeout incomplete uploads too
This commit is contained in:
parent
98cc9a6755
commit
cc076c1be1
|
@ -461,7 +461,7 @@ class HttpCli(object):
|
||||||
self.log("clone {} done".format(cstart[0]))
|
self.log("clone {} done".format(cstart[0]))
|
||||||
|
|
||||||
x = self.conn.hsrv.broker.put(True, "up2k.confirm_chunk", ptop, wark, chash)
|
x = self.conn.hsrv.broker.put(True, "up2k.confirm_chunk", ptop, wark, chash)
|
||||||
num_left = x.get()
|
num_left, path = x.get()
|
||||||
|
|
||||||
if not WINDOWS and num_left == 0:
|
if not WINDOWS and num_left == 0:
|
||||||
times = (int(time.time()), int(lastmod))
|
times = (int(time.time()), int(lastmod))
|
||||||
|
@ -929,7 +929,7 @@ class HttpCli(object):
|
||||||
remains = sendfile_kern(lower, upper, f, self.s)
|
remains = sendfile_kern(lower, upper, f, self.s)
|
||||||
else:
|
else:
|
||||||
remains = sendfile_py(lower, upper, f, self.s)
|
remains = sendfile_py(lower, upper, f, self.s)
|
||||||
|
|
||||||
if remains > 0:
|
if remains > 0:
|
||||||
logmsg += " \033[31m" + str(upper - remains) + "\033[0m"
|
logmsg += " \033[31m" + str(upper - remains) + "\033[0m"
|
||||||
|
|
||||||
|
@ -1027,7 +1027,8 @@ class HttpCli(object):
|
||||||
if abspath.endswith(".md") and "raw" not in self.uparam:
|
if abspath.endswith(".md") and "raw" not in self.uparam:
|
||||||
return self.tx_md(abspath)
|
return self.tx_md(abspath)
|
||||||
|
|
||||||
if abspath.endswith("{0}.hist{0}up2k.db".format(os.sep)):
|
bad = "{0}.hist{0}up2k.".format(os.sep)
|
||||||
|
if abspath.endswith(bad + "db") or abspath.endswith(bad + "snap"):
|
||||||
raise Pebkac(403)
|
raise Pebkac(403)
|
||||||
|
|
||||||
return self.tx_file(abspath)
|
return self.tx_file(abspath)
|
||||||
|
|
|
@ -6,6 +6,8 @@ import os
|
||||||
import re
|
import re
|
||||||
import time
|
import time
|
||||||
import math
|
import math
|
||||||
|
import json
|
||||||
|
import gzip
|
||||||
import stat
|
import stat
|
||||||
import shutil
|
import shutil
|
||||||
import base64
|
import base64
|
||||||
|
@ -13,7 +15,7 @@ import hashlib
|
||||||
import threading
|
import threading
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
|
|
||||||
from .__init__ import WINDOWS
|
from .__init__ import WINDOWS, PY2
|
||||||
from .util import Pebkac, Queue, fsdec, fsenc, sanitize_fn, ren_open
|
from .util import Pebkac, Queue, fsdec, fsenc, sanitize_fn, ren_open
|
||||||
|
|
||||||
HAVE_SQLITE3 = False
|
HAVE_SQLITE3 = False
|
||||||
|
@ -54,6 +56,11 @@ class Up2k(object):
|
||||||
thr.daemon = True
|
thr.daemon = True
|
||||||
thr.start()
|
thr.start()
|
||||||
|
|
||||||
|
if self.persist:
|
||||||
|
thr = threading.Thread(target=self._snapshot)
|
||||||
|
thr.daemon = True
|
||||||
|
thr.start()
|
||||||
|
|
||||||
# static
|
# static
|
||||||
self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$")
|
self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$")
|
||||||
|
|
||||||
|
@ -61,12 +68,38 @@ class Up2k(object):
|
||||||
m = "could not initialize sqlite3, will use in-memory registry only"
|
m = "could not initialize sqlite3, will use in-memory registry only"
|
||||||
self.log("up2k", m)
|
self.log("up2k", m)
|
||||||
|
|
||||||
|
def _vis_job_progress(self, job):
|
||||||
|
perc = 100 - (len(job["need"]) * 100.0 / len(job["hash"]))
|
||||||
|
path = os.path.join(job["ptop"], job["prel"], job["name"])
|
||||||
|
return "{:5.1f}% {}".format(perc, path)
|
||||||
|
|
||||||
|
def _vis_reg_progress(self, reg):
|
||||||
|
ret = []
|
||||||
|
for _, job in reg.items():
|
||||||
|
ret.append(self._vis_job_progress(job))
|
||||||
|
|
||||||
|
return ret
|
||||||
|
|
||||||
def register_vpath(self, ptop):
|
def register_vpath(self, ptop):
|
||||||
with self.mutex:
|
with self.mutex:
|
||||||
if ptop in self.registry:
|
if ptop in self.registry:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
self.registry[ptop] = {}
|
reg = {}
|
||||||
|
path = os.path.join(ptop, ".hist", "up2k.snap")
|
||||||
|
if self.persist and os.path.exists(path):
|
||||||
|
with gzip.GzipFile(path, "rb") as f:
|
||||||
|
j = f.read().decode("utf-8")
|
||||||
|
|
||||||
|
reg = json.loads(j)
|
||||||
|
for _, job in reg.items():
|
||||||
|
job["poke"] = time.time()
|
||||||
|
|
||||||
|
m = "loaded snap {} |{}|".format(path, len(reg.keys()))
|
||||||
|
m = [m] + self._vis_reg_progress(reg)
|
||||||
|
self.log("up2k", "\n".join(m))
|
||||||
|
|
||||||
|
self.registry[ptop] = reg
|
||||||
if not self.persist or not HAVE_SQLITE3:
|
if not self.persist or not HAVE_SQLITE3:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -107,7 +140,7 @@ class Up2k(object):
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
self.log("up2k", "listdir: " + repr(ex))
|
self.log("up2k", "listdir: " + repr(ex))
|
||||||
return
|
return
|
||||||
|
|
||||||
histdir = os.path.join(top, ".hist")
|
histdir = os.path.join(top, ".hist")
|
||||||
for inode in inodes:
|
for inode in inodes:
|
||||||
abspath = os.path.join(cdir, inode)
|
abspath = os.path.join(cdir, inode)
|
||||||
|
@ -116,11 +149,11 @@ class Up2k(object):
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
self.log("up2k", "stat: " + repr(ex))
|
self.log("up2k", "stat: " + repr(ex))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if stat.S_ISDIR(inf.st_mode):
|
if stat.S_ISDIR(inf.st_mode):
|
||||||
if abspath in excl or abspath == histdir:
|
if abspath in excl or abspath == histdir:
|
||||||
continue
|
continue
|
||||||
self.log("up2k", " dir: {}".format(abspath))
|
# self.log("up2k", " dir: {}".format(abspath))
|
||||||
self._build_dir(dbw, top, excl, abspath)
|
self._build_dir(dbw, top, excl, abspath)
|
||||||
else:
|
else:
|
||||||
# self.log("up2k", "file: {}".format(abspath))
|
# self.log("up2k", "file: {}".format(abspath))
|
||||||
|
@ -151,7 +184,7 @@ class Up2k(object):
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
self.log("up2k", "hash: " + repr(ex))
|
self.log("up2k", "hash: " + repr(ex))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
wark = self._wark_from_hashlist(inf.st_size, hashes)
|
wark = self._wark_from_hashlist(inf.st_size, hashes)
|
||||||
self.db_add(dbw[0], wark, rp, inf.st_mtime, inf.st_size)
|
self.db_add(dbw[0], wark, rp, inf.st_mtime, inf.st_size)
|
||||||
dbw[1] += 1
|
dbw[1] += 1
|
||||||
|
@ -221,6 +254,7 @@ class Up2k(object):
|
||||||
def handle_json(self, cj):
|
def handle_json(self, cj):
|
||||||
self.register_vpath(cj["ptop"])
|
self.register_vpath(cj["ptop"])
|
||||||
cj["name"] = sanitize_fn(cj["name"])
|
cj["name"] = sanitize_fn(cj["name"])
|
||||||
|
cj["poke"] = time.time()
|
||||||
wark = self._get_wark(cj)
|
wark = self._get_wark(cj)
|
||||||
now = time.time()
|
now = time.time()
|
||||||
job = None
|
job = None
|
||||||
|
@ -368,24 +402,35 @@ class Up2k(object):
|
||||||
if not nchunk:
|
if not nchunk:
|
||||||
raise Pebkac(400, "unknown chunk")
|
raise Pebkac(400, "unknown chunk")
|
||||||
|
|
||||||
|
job["poke"] = time.time()
|
||||||
|
|
||||||
chunksize = self._get_chunksize(job["size"])
|
chunksize = self._get_chunksize(job["size"])
|
||||||
ofs = [chunksize * x for x in nchunk]
|
ofs = [chunksize * x for x in nchunk]
|
||||||
|
|
||||||
path = os.path.join(job["ptop"], job["prel"], job["name"])
|
path = os.path.join(job["ptop"], job["prel"], job["tnam"])
|
||||||
|
|
||||||
return [chunksize, ofs, path, job["lmod"]]
|
return [chunksize, ofs, path, job["lmod"]]
|
||||||
|
|
||||||
def confirm_chunk(self, ptop, wark, chash):
|
def confirm_chunk(self, ptop, wark, chash):
|
||||||
with self.mutex:
|
with self.mutex:
|
||||||
job = self.registry[ptop][wark]
|
job = self.registry[ptop][wark]
|
||||||
|
pdir = os.path.join(job["ptop"], job["prel"])
|
||||||
|
src = os.path.join(pdir, job["tnam"])
|
||||||
|
dst = os.path.join(pdir, job["name"])
|
||||||
|
|
||||||
job["need"].remove(chash)
|
job["need"].remove(chash)
|
||||||
ret = len(job["need"])
|
ret = len(job["need"])
|
||||||
if ret > 0:
|
if ret > 0:
|
||||||
return ret
|
return ret, src
|
||||||
|
|
||||||
|
if not PY2:
|
||||||
|
os.replace(src, dst)
|
||||||
|
else:
|
||||||
|
os.unlink(dst)
|
||||||
|
os.rename(src, dst)
|
||||||
|
|
||||||
if WINDOWS:
|
if WINDOWS:
|
||||||
path = os.path.join(job["ptop"], job["prel"], job["name"])
|
self.lastmod_q.put([dst, (int(time.time()), int(job["lmod"]))])
|
||||||
self.lastmod_q.put([path, (int(time.time()), int(job["lmod"]))])
|
|
||||||
|
|
||||||
db = self.db.get(job["ptop"], None)
|
db = self.db.get(job["ptop"], None)
|
||||||
if db:
|
if db:
|
||||||
|
@ -396,7 +441,7 @@ class Up2k(object):
|
||||||
del self.registry[ptop][wark]
|
del self.registry[ptop][wark]
|
||||||
# in-memory registry is reserved for unfinished uploads
|
# in-memory registry is reserved for unfinished uploads
|
||||||
|
|
||||||
return ret
|
return ret, dst
|
||||||
|
|
||||||
def _get_chunksize(self, filesize):
|
def _get_chunksize(self, filesize):
|
||||||
chunksize = 1024 * 1024
|
chunksize = 1024 * 1024
|
||||||
|
@ -475,10 +520,13 @@ class Up2k(object):
|
||||||
|
|
||||||
def _new_upload(self, job):
|
def _new_upload(self, job):
|
||||||
self.registry[job["ptop"]][job["wark"]] = job
|
self.registry[job["ptop"]][job["wark"]] = job
|
||||||
suffix = ".{:.6f}-{}".format(job["t0"], job["addr"])
|
|
||||||
pdir = os.path.join(job["ptop"], job["prel"])
|
pdir = os.path.join(job["ptop"], job["prel"])
|
||||||
with ren_open(job["name"], "wb", fdir=pdir, suffix=suffix) as f:
|
job["name"] = self._untaken(pdir, job["name"], job["t0"], job["addr"])
|
||||||
f, job["name"] = f["orz"]
|
|
||||||
|
tnam = job["name"] + ".PARTIAL"
|
||||||
|
suffix = ".{:.6f}-{}".format(job["t0"], job["addr"])
|
||||||
|
with ren_open(tnam, "wb", fdir=pdir, suffix=suffix) as f:
|
||||||
|
f, job["tnam"] = f["orz"]
|
||||||
f.seek(job["size"] - 1)
|
f.seek(job["size"] - 1)
|
||||||
f.write(b"e")
|
f.write(b"e")
|
||||||
|
|
||||||
|
@ -495,3 +543,39 @@ class Up2k(object):
|
||||||
os.utime(fsenc(path), times)
|
os.utime(fsenc(path), times)
|
||||||
except:
|
except:
|
||||||
self.log("lmod", "failed to utime ({}, {})".format(path, times))
|
self.log("lmod", "failed to utime ({}, {})".format(path, times))
|
||||||
|
|
||||||
|
def _snapshot(self):
|
||||||
|
prev = {}
|
||||||
|
while True:
|
||||||
|
# persist pending uploads every 60sec
|
||||||
|
time.sleep(30)
|
||||||
|
for k, reg in self.registry.items():
|
||||||
|
now = time.time()
|
||||||
|
# discard abandoned uploads (idle >= 1h)
|
||||||
|
rm = [x for x in reg.values() if now - x["poke"] > 60]
|
||||||
|
if rm:
|
||||||
|
m = "dropping {} abandoned uploads in {}".format(len(rm), k)
|
||||||
|
vis = [self._vis_job_progress(x) for x in rm]
|
||||||
|
self.log("up2k", "\n".join([m] + vis))
|
||||||
|
for job in rm:
|
||||||
|
del reg[job["wark"]]
|
||||||
|
try:
|
||||||
|
# try to remove the placeholder zero-byte file too
|
||||||
|
path = os.path.join(job["ptop"], job["prel"], job["name"])
|
||||||
|
if os.path.getsize(path) == 0:
|
||||||
|
os.unlink(path)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
newest = max(x["poke"] for _, x in reg.items()) if reg else 0
|
||||||
|
etag = [len(reg), newest]
|
||||||
|
if etag == prev.get(k, None) or not reg:
|
||||||
|
continue
|
||||||
|
|
||||||
|
j = json.dumps(reg, indent=2, sort_keys=True).encode("utf-8")
|
||||||
|
path = os.path.join(k, ".hist", "up2k.snap")
|
||||||
|
with gzip.GzipFile(path, "wb") as f:
|
||||||
|
f.write(j)
|
||||||
|
|
||||||
|
self.log("up2k", "snap: {} |{}|".format(path, len(reg.keys())))
|
||||||
|
prev[k] = etag
|
||||||
|
|
Loading…
Reference in a new issue