mirror of
https://github.com/9001/copyparty.git
synced 2025-08-17 17:12:13 -06:00
fix dedup bug as of v1.13.8:
* v1.13.8 broke collision resolving for non-identical files; the correct filename was reserved but not symlinked to the original file, leaving a zerobyte file instead. See v1.14.3 github release notes for remediation info * add sanchecks for early detection of index/fs desync; saves performance and gives less confusing logs
This commit is contained in:
parent
01233991f3
commit
3da62ec234
|
@ -106,6 +106,7 @@ class SvcHub(object):
|
|||
self.no_ansi = args.no_ansi
|
||||
self.logf: Optional[typing.TextIO] = None
|
||||
self.logf_base_fn = ""
|
||||
self.is_dut = False # running in unittest; always False
|
||||
self.stop_req = False
|
||||
self.stopping = False
|
||||
self.stopped = False
|
||||
|
|
|
@ -236,6 +236,9 @@ class Up2k(object):
|
|||
if not self.pp and self.args.exit == "idx":
|
||||
return self.hub.sigterm()
|
||||
|
||||
if self.hub.is_dut:
|
||||
return
|
||||
|
||||
Daemon(self._snapshot, "up2k-snapshot")
|
||||
if have_e2d:
|
||||
Daemon(self._hasher, "up2k-hasher")
|
||||
|
@ -1405,7 +1408,7 @@ class Up2k(object):
|
|||
if dts == lmod and dsz == sz and (nohash or dw[0] != "#" or not sz):
|
||||
continue
|
||||
|
||||
t = "reindex [{}] => [{}] ({}/{}) ({}/{})".format(
|
||||
t = "reindex [{}] => [{}] mtime({}/{}) size({}/{})".format(
|
||||
top, rp, dts, lmod, dsz, sz
|
||||
)
|
||||
self.log(t)
|
||||
|
@ -2664,11 +2667,19 @@ class Up2k(object):
|
|||
if stat.S_ISLNK(st.st_mode):
|
||||
# broken symlink
|
||||
raise Exception()
|
||||
except:
|
||||
if st.st_size != dsize:
|
||||
t = "candidate ignored (db/fs desync): {}, size fs={} db={}, mtime fs={} db={}, file: {}"
|
||||
t = t.format(
|
||||
wark, st.st_size, dsize, st.st_mtime, dtime, dp_abs
|
||||
)
|
||||
self.log(t)
|
||||
raise Exception("desync")
|
||||
except Exception as ex:
|
||||
if n4g:
|
||||
st = os.stat_result((0, -1, -1, 0, 0, 0, 0, 0, 0, 0))
|
||||
else:
|
||||
lost.append((cur, dp_dir, dp_fn))
|
||||
if str(ex) != "desync":
|
||||
lost.append((cur, dp_dir, dp_fn))
|
||||
continue
|
||||
|
||||
j = {
|
||||
|
@ -2726,13 +2737,16 @@ class Up2k(object):
|
|||
ptop = None # use cj or job as appropriate
|
||||
|
||||
if not job and wark in reg:
|
||||
# ensure the files haven't been deleted manually
|
||||
# ensure the files haven't been edited or deleted
|
||||
path = ""
|
||||
st = None
|
||||
rj = reg[wark]
|
||||
names = [rj[x] for x in ["name", "tnam"] if x in rj]
|
||||
for fn in names:
|
||||
path = djoin(rj["ptop"], rj["prel"], fn)
|
||||
try:
|
||||
if bos.path.getsize(path) > 0 or not rj["need"]:
|
||||
st = bos.stat(path)
|
||||
if st.st_size > 0 or not rj["need"]:
|
||||
# upload completed or both present
|
||||
break
|
||||
except:
|
||||
|
@ -2743,6 +2757,14 @@ class Up2k(object):
|
|||
del reg[wark]
|
||||
break
|
||||
|
||||
if st and not self.args.nw and not n4g and st.st_size != rj["size"]:
|
||||
t = "will not dedup (fs index desync): {}, size fs={} db={}, mtime fs={} db={}, file: {}"
|
||||
t = t.format(
|
||||
wark, st.st_size, rj["size"], st.st_mtime, rj["lmod"], path
|
||||
)
|
||||
self.log(t)
|
||||
del reg[wark]
|
||||
|
||||
if job or wark in reg:
|
||||
job = job or reg[wark]
|
||||
if (
|
||||
|
@ -2850,6 +2872,7 @@ class Up2k(object):
|
|||
return self._handle_json(job, depth + 1)
|
||||
|
||||
job["name"] = self._untaken(pdir, job, now)
|
||||
dst = djoin(job["ptop"], job["prel"], job["name"])
|
||||
|
||||
if not self.args.nw:
|
||||
dvf: dict[str, Any] = vfs.flags
|
||||
|
|
138
tests/test_dedup.py
Normal file
138
tests/test_dedup.py
Normal file
|
@ -0,0 +1,138 @@
|
|||
#!/usr/bin/env python3
|
||||
# coding: utf-8
|
||||
from __future__ import print_function, unicode_literals
|
||||
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
from itertools import product
|
||||
|
||||
from copyparty.authsrv import AuthSrv
|
||||
from copyparty.httpcli import HttpCli
|
||||
from tests import util as tu
|
||||
from tests.util import Cfg
|
||||
|
||||
|
||||
class TestDedup(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.td = tu.get_ramdisk()
|
||||
|
||||
def tearDown(self):
|
||||
os.chdir(tempfile.gettempdir())
|
||||
shutil.rmtree(self.td)
|
||||
|
||||
def reset(self):
|
||||
td = os.path.join(self.td, "vfs")
|
||||
if os.path.exists(td):
|
||||
shutil.rmtree(td)
|
||||
os.mkdir(td)
|
||||
os.chdir(td)
|
||||
return td
|
||||
|
||||
def test(self):
|
||||
quick = True # sufficient for regular smoketests
|
||||
# quick = False
|
||||
|
||||
dirnames = ["d1", "d2"]
|
||||
filenames = ["f1", "f2"]
|
||||
files = [
|
||||
(
|
||||
"one",
|
||||
"BfcDQQeKz2oG1CPSFyD5ZD1flTYm2IoCY23DqeeVgq6w",
|
||||
"XMbpLRqVdtGmgggqjUI6uSoNMTqZVX4K6zr74XA1BRKc",
|
||||
),
|
||||
(
|
||||
"two",
|
||||
"ko1Q0eJNq3zKYs_oT83Pn8aVFgonj5G1wK8itwnYL4qj",
|
||||
"fxvihWlnQIbVbUPr--TxyV41913kPLhXPD1ngXYxDfou",
|
||||
),
|
||||
]
|
||||
# (data, chash, wark)
|
||||
|
||||
# 3072 uploads in total
|
||||
self.ctr = 3072
|
||||
self.conn = None
|
||||
for e2d in [True, False]:
|
||||
for dn1, fn1, f1 in product(dirnames, filenames, files):
|
||||
for dn2, fn2, f2 in product(dirnames, filenames, files):
|
||||
for dn3, fn3, f3 in product(dirnames, filenames, files):
|
||||
self.reset()
|
||||
if self.conn:
|
||||
self.conn.hsrv.hub.up2k.shutdown()
|
||||
self.args = Cfg(v=[".::A"], a=[], e2d=e2d)
|
||||
self.asrv = AuthSrv(self.args, self.log)
|
||||
self.conn = tu.VHttpConn(
|
||||
self.args, self.asrv, self.log, b"", True
|
||||
)
|
||||
self.do_post(dn1, fn1, f1, True)
|
||||
self.do_post(dn2, fn2, f2, False)
|
||||
self.do_post(dn3, fn3, f3, False)
|
||||
if quick:
|
||||
break
|
||||
|
||||
def do_post(self, dn, fn, fi, first):
|
||||
print("\n\n# do_post", self.ctr, repr((dn, fn, fi, first)))
|
||||
self.ctr -= 1
|
||||
|
||||
data, chash, wark = fi
|
||||
hs = self.handshake(dn, fn, fi)
|
||||
self.assertEqual(hs["wark"], wark)
|
||||
|
||||
sfn = hs["name"]
|
||||
if sfn == fn:
|
||||
print("using original name " + fn)
|
||||
else:
|
||||
print(fn + " got renamed to " + sfn)
|
||||
if first:
|
||||
raise Exception("wait what")
|
||||
|
||||
if hs["hash"]:
|
||||
self.assertEqual(hs["hash"][0], chash)
|
||||
self.put_chunk(dn, wark, chash, data)
|
||||
elif first:
|
||||
raise Exception("found first; %r, %r" % ((dn, fn, fi), hs))
|
||||
|
||||
h, b = self.curl("%s/%s" % (dn, sfn))
|
||||
self.assertEqual(b, data)
|
||||
|
||||
def handshake(self, dn, fn, fi):
|
||||
hdr = "POST /%s/ HTTP/1.1\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: %d\r\n\r\n"
|
||||
msg = {"name": fn, "size": 3, "lmod": 1234567890, "life": 0, "hash": [fi[1]]}
|
||||
buf = json.dumps(msg).encode("utf-8")
|
||||
buf = (hdr % (dn, len(buf))).encode("utf-8") + buf
|
||||
print("HS -->", buf)
|
||||
HttpCli(self.conn.setbuf(buf)).run()
|
||||
ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
|
||||
print("HS <--", ret)
|
||||
return json.loads(ret[1])
|
||||
|
||||
def put_chunk(self, dn, wark, chash, data):
|
||||
msg = [
|
||||
"POST /%s/ HTTP/1.1" % (dn,),
|
||||
"Connection: close",
|
||||
"Content-Type: application/octet-stream",
|
||||
"Content-Length: 3",
|
||||
"X-Up2k-Hash: " + chash,
|
||||
"X-Up2k-Wark: " + wark,
|
||||
"",
|
||||
data,
|
||||
]
|
||||
buf = "\r\n".join(msg).encode("utf-8")
|
||||
print("PUT -->", buf)
|
||||
HttpCli(self.conn.setbuf(buf)).run()
|
||||
ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
|
||||
self.assertEqual(ret[1], "thank")
|
||||
|
||||
def curl(self, url, binary=False):
|
||||
h = "GET /%s HTTP/1.1\r\nConnection: close\r\n\r\n"
|
||||
HttpCli(self.conn.setbuf((h % (url,)).encode("utf-8"))).run()
|
||||
if binary:
|
||||
h, b = self.conn.s._reply.split(b"\r\n\r\n", 1)
|
||||
return [h.decode("utf-8"), b]
|
||||
|
||||
return self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
|
||||
|
||||
def log(self, src, msg, c=0):
|
||||
print(msg)
|
|
@ -24,6 +24,10 @@ def hdr(query, uname):
|
|||
|
||||
|
||||
class TestDots(unittest.TestCase):
|
||||
def __init__(self, *a, **ka):
|
||||
super(TestDots, self).__init__(*a, **ka)
|
||||
self.is_dut = True
|
||||
|
||||
def setUp(self):
|
||||
self.td = tu.get_ramdisk()
|
||||
|
||||
|
|
|
@ -3,7 +3,6 @@
|
|||
from __future__ import print_function, unicode_literals
|
||||
|
||||
import os
|
||||
import platform
|
||||
import re
|
||||
import shutil
|
||||
import socket
|
||||
|
@ -16,9 +15,7 @@ from argparse import Namespace
|
|||
|
||||
import jinja2
|
||||
|
||||
WINDOWS = platform.system() == "Windows"
|
||||
ANYWIN = WINDOWS or sys.platform in ["msys"]
|
||||
MACOS = platform.system() == "Darwin"
|
||||
from copyparty.__init__ import MACOS, WINDOWS, E
|
||||
|
||||
J2_ENV = jinja2.Environment(loader=jinja2.BaseLoader) # type: ignore
|
||||
J2_FILES = J2_ENV.from_string("{{ files|join('\n') }}\nJ2EOT")
|
||||
|
@ -42,10 +39,11 @@ if MACOS:
|
|||
# 25% faster; until any tests do symlink stuff
|
||||
|
||||
|
||||
from copyparty.__init__ import E
|
||||
from copyparty.__main__ import init_E
|
||||
from copyparty.broker_thr import BrokerThr
|
||||
from copyparty.ico import Ico
|
||||
from copyparty.u2idx import U2idx
|
||||
from copyparty.up2k import Up2k
|
||||
from copyparty.util import FHC, CachedDict, Garda, Unrecv
|
||||
|
||||
init_E(E)
|
||||
|
@ -119,10 +117,10 @@ class Cfg(Namespace):
|
|||
def __init__(self, a=None, v=None, c=None, **ka0):
|
||||
ka = {}
|
||||
|
||||
ex = "chpw daw dav_auth dav_inf dav_mac dav_rt e2d e2ds e2dsa e2t e2ts e2tsr e2v e2vu e2vp early_ban ed emp exp force_js getmod grid gsel hardlink ih ihead magic never_symlink nid nih no_acode no_athumb no_dav no_dedup no_del no_dupe no_lifetime no_logues no_mv no_pipe no_poll no_readme no_robots no_sb_md no_sb_lg no_scandir no_tarcmp no_thumb no_vthumb no_zip nrand nw og og_no_head og_s_title q rand smb srch_dbg stats uqe vague_403 vc ver write_uplog xdev xlink xvol"
|
||||
ex = "chpw daw dav_auth dav_inf dav_mac dav_rt e2d e2ds e2dsa e2t e2ts e2tsr e2v e2vu e2vp early_ban ed emp exp force_js getmod grid gsel hardlink ih ihead magic never_symlink nid nih no_acode no_athumb no_dav no_db_ip no_dedup no_del no_dupe no_lifetime no_logues no_mv no_pipe no_poll no_readme no_robots no_sb_md no_sb_lg no_scandir no_tarcmp no_thumb no_vthumb no_zip nrand nw og og_no_head og_s_title q rand smb srch_dbg stats uqe vague_403 vc ver write_uplog xdev xlink xvol zs"
|
||||
ka.update(**{k: False for k in ex.split()})
|
||||
|
||||
ex = "dotpart dotsrch hook_v no_dhash no_fastboot no_rescan no_sendfile no_snap no_voldump re_dhash plain_ip"
|
||||
ex = "dotpart dotsrch hook_v no_dhash no_fastboot no_fpool no_htp no_rescan no_sendfile no_snap no_voldump re_dhash plain_ip"
|
||||
ka.update(**{k: True for k in ex.split()})
|
||||
|
||||
ex = "ah_cli ah_gen css_browser hist js_browser js_other mime mimes no_forget no_hash no_idx nonsus_urls og_tpl og_ua"
|
||||
|
@ -137,9 +135,12 @@ class Cfg(Namespace):
|
|||
ex = "db_act k304 loris re_maxage rproxy rsp_jtr rsp_slp s_wr_slp snap_wri theme themes turbo"
|
||||
ka.update(**{k: 0 for k in ex.split()})
|
||||
|
||||
ex = "ah_alg bname chpw_db doctitle df exit favico idp_h_usr html_head lg_sbf log_fk md_sbf name og_desc og_site og_th og_title og_title_a og_title_v og_title_i shr tcolor textfiles unlist vname R RS SR"
|
||||
ex = "ah_alg bname chpw_db doctitle df exit favico idp_h_usr ipa html_head lg_sbf log_fk md_sbf name og_desc og_site og_th og_title og_title_a og_title_v og_title_i shr tcolor textfiles unlist vname xff_src R RS SR"
|
||||
ka.update(**{k: "" for k in ex.split()})
|
||||
|
||||
ex = "ban_403 ban_404 ban_422 ban_pw ban_url"
|
||||
ka.update(**{k: "no" for k in ex.split()})
|
||||
|
||||
ex = "grp on403 on404 xad xar xau xban xbd xbr xbu xiu xm"
|
||||
ka.update(**{k: [] for k in ex.split()})
|
||||
|
||||
|
@ -221,11 +222,29 @@ class VSock(object):
|
|||
pass
|
||||
|
||||
|
||||
class VHub(object):
|
||||
def __init__(self, args, asrv, log):
|
||||
self.args = args
|
||||
self.asrv = asrv
|
||||
self.log = log
|
||||
self.is_dut = True
|
||||
self.up2k = Up2k(self)
|
||||
|
||||
|
||||
class VBrokerThr(BrokerThr):
|
||||
def __init__(self, hub):
|
||||
self.hub = hub
|
||||
self.log = hub.log
|
||||
self.args = hub.args
|
||||
self.asrv = hub.asrv
|
||||
|
||||
|
||||
class VHttpSrv(object):
|
||||
def __init__(self, args, asrv, log):
|
||||
self.args = args
|
||||
self.asrv = asrv
|
||||
self.log = log
|
||||
self.hub = None
|
||||
|
||||
self.broker = NullBroker(args, asrv)
|
||||
self.prism = None
|
||||
|
@ -252,18 +271,25 @@ class VHttpSrv(object):
|
|||
return self.u2idx
|
||||
|
||||
|
||||
class VHttpSrvUp2k(VHttpSrv):
|
||||
def __init__(self, args, asrv, log):
|
||||
super(VHttpSrvUp2k, self).__init__(args, asrv, log)
|
||||
self.hub = VHub(args, asrv, log)
|
||||
self.broker = VBrokerThr(self.hub)
|
||||
|
||||
|
||||
class VHttpConn(object):
|
||||
def __init__(self, args, asrv, log, buf):
|
||||
def __init__(self, args, asrv, log, buf, use_up2k=False):
|
||||
self.t0 = time.time()
|
||||
self.s = VSock(buf)
|
||||
self.sr = Unrecv(self.s, None) # type: ignore
|
||||
self.aclose = {}
|
||||
self.addr = ("127.0.0.1", "42069")
|
||||
self.args = args
|
||||
self.asrv = asrv
|
||||
self.bans = {}
|
||||
self.freshen_pwd = 0.0
|
||||
self.hsrv = VHttpSrv(args, asrv, log)
|
||||
|
||||
Ctor = VHttpSrvUp2k if use_up2k else VHttpSrv
|
||||
self.hsrv = Ctor(args, asrv, log)
|
||||
self.ico = Ico(args)
|
||||
self.ipa_nm = None
|
||||
self.lf_url = None
|
||||
|
@ -279,6 +305,12 @@ class VHttpConn(object):
|
|||
self.u2fh = FHC()
|
||||
|
||||
self.get_u2idx = self.hsrv.get_u2idx
|
||||
self.setbuf(buf)
|
||||
|
||||
def setbuf(self, buf):
|
||||
self.s = VSock(buf)
|
||||
self.sr = Unrecv(self.s, None) # type: ignore
|
||||
return self
|
||||
|
||||
|
||||
if WINDOWS:
|
||||
|
|
Loading…
Reference in a new issue