mirror of
https://github.com/9001/copyparty.git
synced 2025-08-18 01:22:13 -06:00
prevent index loss on mid-write crash
This commit is contained in:
parent
cc076c1be1
commit
fb853edbe3
|
@ -16,7 +16,7 @@ import threading
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
|
|
||||||
from .__init__ import WINDOWS, PY2
|
from .__init__ import WINDOWS, PY2
|
||||||
from .util import Pebkac, Queue, fsdec, fsenc, sanitize_fn, ren_open
|
from .util import Pebkac, Queue, fsdec, fsenc, sanitize_fn, ren_open, atomic_move
|
||||||
|
|
||||||
HAVE_SQLITE3 = False
|
HAVE_SQLITE3 = False
|
||||||
try:
|
try:
|
||||||
|
@ -423,11 +423,7 @@ class Up2k(object):
|
||||||
if ret > 0:
|
if ret > 0:
|
||||||
return ret, src
|
return ret, src
|
||||||
|
|
||||||
if not PY2:
|
atomic_move(src, dst)
|
||||||
os.replace(src, dst)
|
|
||||||
else:
|
|
||||||
os.unlink(dst)
|
|
||||||
os.rename(src, dst)
|
|
||||||
|
|
||||||
if WINDOWS:
|
if WINDOWS:
|
||||||
self.lastmod_q.put([dst, (int(time.time()), int(job["lmod"]))])
|
self.lastmod_q.put([dst, (int(time.time()), int(job["lmod"]))])
|
||||||
|
@ -545,14 +541,14 @@ class Up2k(object):
|
||||||
self.log("lmod", "failed to utime ({}, {})".format(path, times))
|
self.log("lmod", "failed to utime ({}, {})".format(path, times))
|
||||||
|
|
||||||
def _snapshot(self):
|
def _snapshot(self):
|
||||||
|
persist_interval = 30 # persist unfinished uploads index every 30 sec
|
||||||
|
discard_interval = 3600 # drop unfinished uploads after 1 hour inactivity
|
||||||
prev = {}
|
prev = {}
|
||||||
while True:
|
while True:
|
||||||
# persist pending uploads every 60sec
|
time.sleep(persist_interval)
|
||||||
time.sleep(30)
|
|
||||||
for k, reg in self.registry.items():
|
for k, reg in self.registry.items():
|
||||||
now = time.time()
|
now = time.time()
|
||||||
# discard abandoned uploads (idle >= 1h)
|
rm = [x for x in reg.values() if now - x["poke"] > discard_interval]
|
||||||
rm = [x for x in reg.values() if now - x["poke"] > 60]
|
|
||||||
if rm:
|
if rm:
|
||||||
m = "dropping {} abandoned uploads in {}".format(len(rm), k)
|
m = "dropping {} abandoned uploads in {}".format(len(rm), k)
|
||||||
vis = [self._vis_job_progress(x) for x in rm]
|
vis = [self._vis_job_progress(x) for x in rm]
|
||||||
|
@ -560,22 +556,33 @@ class Up2k(object):
|
||||||
for job in rm:
|
for job in rm:
|
||||||
del reg[job["wark"]]
|
del reg[job["wark"]]
|
||||||
try:
|
try:
|
||||||
# try to remove the placeholder zero-byte file too
|
# remove the placeholder zero-byte file (keep the PARTIAL)
|
||||||
path = os.path.join(job["ptop"], job["prel"], job["name"])
|
path = os.path.join(job["ptop"], job["prel"], job["name"])
|
||||||
if os.path.getsize(path) == 0:
|
if os.path.getsize(path) == 0:
|
||||||
os.unlink(path)
|
os.unlink(path)
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
newest = max(x["poke"] for _, x in reg.items()) if reg else 0
|
path = os.path.join(k, ".hist", "up2k.snap")
|
||||||
etag = [len(reg), newest]
|
if not reg:
|
||||||
if etag == prev.get(k, None) or not reg:
|
if k not in prev or prev[k] is not None:
|
||||||
|
prev[k] = None
|
||||||
|
if os.path.exists(path):
|
||||||
|
os.unlink(path)
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
newest = max(x["poke"] for _, x in reg.items()) if reg else 0
|
||||||
|
etag = [len(reg), newest]
|
||||||
|
if etag == prev.get(k, None):
|
||||||
|
continue
|
||||||
|
|
||||||
|
path2 = "{}.{}".format(path, os.getpid())
|
||||||
j = json.dumps(reg, indent=2, sort_keys=True).encode("utf-8")
|
j = json.dumps(reg, indent=2, sort_keys=True).encode("utf-8")
|
||||||
path = os.path.join(k, ".hist", "up2k.snap")
|
with gzip.GzipFile(path2, "wb") as f:
|
||||||
with gzip.GzipFile(path, "wb") as f:
|
|
||||||
f.write(j)
|
f.write(j)
|
||||||
|
|
||||||
|
atomic_move(path2, path)
|
||||||
|
|
||||||
self.log("up2k", "snap: {} |{}|".format(path, len(reg.keys())))
|
self.log("up2k", "snap: {} |{}|".format(path, len(reg.keys())))
|
||||||
prev[k] = etag
|
prev[k] = etag
|
||||||
|
|
|
@ -549,6 +549,16 @@ else:
|
||||||
fsdec = w8dec
|
fsdec = w8dec
|
||||||
|
|
||||||
|
|
||||||
|
def atomic_move(src, dst):
|
||||||
|
if not PY2:
|
||||||
|
os.replace(src, dst)
|
||||||
|
else:
|
||||||
|
if os.path.exists(dst):
|
||||||
|
os.unlink(dst)
|
||||||
|
|
||||||
|
os.rename(src, dst)
|
||||||
|
|
||||||
|
|
||||||
def read_socket(sr, total_size):
|
def read_socket(sr, total_size):
|
||||||
remains = total_size
|
remains = total_size
|
||||||
while remains > 0:
|
while remains > 0:
|
||||||
|
|
Loading…
Reference in a new issue