test dedup relinking

This commit is contained in:
ed 2024-09-08 12:55:27 +00:00
parent e5a836cb7d
commit 1111153f06

View file

@ -31,6 +31,15 @@ class TestDedup(unittest.TestCase):
os.chdir(td) os.chdir(td)
return td return td
def cinit(self):
if self.conn:
self.fstab = self.conn.hsrv.hub.up2k.fstab
self.conn.hsrv.hub.up2k.shutdown()
self.asrv = AuthSrv(self.args, self.log)
self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"", True)
if self.fstab:
self.conn.hsrv.hub.up2k.fstab = self.fstab
def test(self): def test(self):
quick = True # sufficient for regular smoketests quick = True # sufficient for regular smoketests
# quick = False # quick = False
@ -53,36 +62,68 @@ class TestDedup(unittest.TestCase):
self.ctr = 336 if quick else 2016 # estimated total num uploads self.ctr = 336 if quick else 2016 # estimated total num uploads
self.conn = None self.conn = None
fstab = None self.fstab = None
for e2d in [True, False]: for e2d in [True, False]:
self.args = Cfg(v=[".::A"], a=[], e2d=e2d) self.args = Cfg(v=[".::A"], a=[], e2d=e2d)
for dn1, fn1, f1 in product(dirnames, filenames, files): for cm1 in product(dirnames, filenames, files):
cm1 = (dn1, fn1, f1) for cm2 in product(dirnames, filenames, files):
for dn2, fn2, f2 in product(dirnames, filenames, files):
cm2 = (dn2, fn2, f2)
if cm1 == cm2: if cm1 == cm2:
continue continue
for dn3, fn3, f3 in product(dirnames, filenames, files): for cm3 in product(dirnames, filenames, files):
cm3 = (dn3, fn3, f3)
if cm3 in (cm1, cm2): if cm3 in (cm1, cm2):
continue continue
self.reset()
if self.conn:
fstab = self.conn.hsrv.hub.up2k.fstab
self.conn.hsrv.hub.up2k.shutdown()
self.asrv = AuthSrv(self.args, self.log)
self.conn = tu.VHttpConn(
self.args, self.asrv, self.log, b"", True
)
if fstab:
self.conn.hsrv.hub.up2k.fstab = fstab
self.do_post(dn1, fn1, f1, True) f1 = cm1[2]
self.do_post(dn2, fn2, f2, False) f2 = cm2[2]
self.do_post(dn3, fn3, f3, False) f3 = cm3[2]
if not e2d:
rms = [-1]
elif f1 == f2:
if f1 == f3:
rms = [0, 1, 2]
else:
rms = [0, 1]
elif f1 == f3:
rms = [0, 2]
else:
rms = [1, 2]
for rm in rms:
self.do_tc(cm1, cm2, cm3, rm)
if quick: if quick:
break break
def do_tc(self, cm1, cm2, cm3, irm):
dn1, fn1, f1 = cm1
dn2, fn2, f2 = cm2
dn3, fn3, f3 = cm3
self.reset()
self.cinit()
fn1 = self.do_post(dn1, fn1, f1, True)
fn2 = self.do_post(dn2, fn2, f2, False)
fn3 = self.do_post(dn3, fn3, f3, False)
if irm < 0:
return
cms = [(dn1, fn1, f1), (dn2, fn2, f2), (dn3, fn3, f3)]
rm = cms[irm]
dn, fn, _ = rm
h, b = self.curl("%s/%s?delete" % (dn, fn), meth="POST")
self.assertIn(" 200 OK", h)
self.assertIn("deleted 1 files", b)
h, b = self.curl("%s/%s" % (dn, fn))
self.assertIn(" 404 Not Fo", h)
for cm in cms:
if cm == rm:
continue
dn, fn, f = cm
h, b = self.curl("%s/%s" % (dn, fn))
self.assertEqual(b, f[0])
def do_post(self, dn, fn, fi, first): def do_post(self, dn, fn, fi, first):
print("\n\n# do_post", self.ctr, repr((dn, fn, fi, first))) print("\n\n# do_post", self.ctr, repr((dn, fn, fi, first)))
self.ctr -= 1 self.ctr -= 1
@ -107,6 +148,7 @@ class TestDedup(unittest.TestCase):
h, b = self.curl("%s/%s" % (dn, sfn)) h, b = self.curl("%s/%s" % (dn, sfn))
self.assertEqual(b, data) self.assertEqual(b, data)
return sfn
def handshake(self, dn, fn, fi): def handshake(self, dn, fn, fi):
hdr = "POST /%s/ HTTP/1.1\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: %d\r\n\r\n" hdr = "POST /%s/ HTTP/1.1\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: %d\r\n\r\n"
@ -136,9 +178,10 @@ class TestDedup(unittest.TestCase):
ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1) ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
self.assertEqual(ret[1], "thank") self.assertEqual(ret[1], "thank")
def curl(self, url, binary=False): def curl(self, url, binary=False, meth=None):
h = "GET /%s HTTP/1.1\r\nConnection: close\r\n\r\n" h = "%s /%s HTTP/1.1\r\nConnection: close\r\n\r\n"
HttpCli(self.conn.setbuf((h % (url,)).encode("utf-8"))).run() h = h % (meth or "GET", url)
HttpCli(self.conn.setbuf(h.encode("utf-8"))).run()
if binary: if binary:
h, b = self.conn.s._reply.split(b"\r\n\r\n", 1) h, b = self.conn.s._reply.split(b"\r\n\r\n", 1)
return [h.decode("utf-8"), b] return [h.decode("utf-8"), b]