diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 1ba6dbf1..b53e9e1d 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -2846,7 +2846,7 @@ class Up2k(object): self.log(t) del reg[wark] - elif inc_ap != orig_ap and not data_ok: + elif inc_ap != orig_ap and not data_ok and "done" in reg[wark]: self.log("asserting contents of %s" % (orig_ap,)) dhashes, _ = self._hashlist_from_file(orig_ap) dwark = up2k_wark_from_hashlist(self.salt, st.st_size, dhashes) @@ -3107,7 +3107,22 @@ class Up2k(object): fp = djoin(fdir, fname) if job.get("replace") and bos.path.exists(fp): self.log("replacing existing file at {}".format(fp)) - wunlink(self.log, fp, self.flags.get(job["ptop"]) or {}) + cur = None + ptop = job["ptop"] + vf = self.flags.get(ptop) or {} + st = bos.stat(fp) + try: + vrel = vjoin(job["prel"], fname) + xlink = bool(vf.get("xlink")) + cur, wark, _, _, _, _ = self._find_from_vpath(ptop, vrel) + self._forget_file(ptop, vrel, cur, wark, True, st.st_size, xlink) + except Exception as ex: + self.log("skipping replace-relink: %r" % (ex,)) + finally: + if cur: + cur.connection.commit() + + wunlink(self.log, fp, vf) if self.args.plain_ip: dip = ip.replace(":", ".") diff --git a/tests/test_dedup.py b/tests/test_dedup.py index b543ff84..11c75c44 100644 --- a/tests/test_dedup.py +++ b/tests/test_dedup.py @@ -19,6 +19,20 @@ class TestDedup(unittest.TestCase): def setUp(self): self.td = tu.get_ramdisk() + # (data, chash, wark) + self.files = [ + ( + "one", + "BfcDQQeKz2oG1CPSFyD5ZD1flTYm2IoCY23DqeeVgq6w", + "XMbpLRqVdtGmgggqjUI6uSoNMTqZVX4K6zr74XA1BRKc", + ), + ( + "two", + "ko1Q0eJNq3zKYs_oT83Pn8aVFgonj5G1wK8itwnYL4qj", + "fxvihWlnQIbVbUPr--TxyV41913kPLhXPD1ngXYxDfou", + ), + ] + def tearDown(self): os.chdir(tempfile.gettempdir()) shutil.rmtree(self.td) @@ -40,25 +54,54 @@ class TestDedup(unittest.TestCase): if self.fstab: self.conn.hsrv.hub.up2k.fstab = self.fstab + def test_a(self): + file404 = "\nJ2EOT" + f1, f2 = self.files + fns = ("f1", "f2", "f3") + dn = "d" + + self.conn = None + self.fstab = None + for e2d in [True, False]: + self.args = Cfg(v=[".::A"], a=[], e2d=e2d) + self.reset() + self.cinit() + + # dupes in parallel + sfn, hs = self.do_post_hs(dn, fns[0], f1, True) + for fn in fns[1:]: + h, b = self.handshake(dn, fn, f1) + self.assertIn(" 422 Unpro", h) + self.assertIn("a different location;", b) + self.do_post_data(dn, fns[0], f1, True, sfn, hs) + if not e2d: + # dupesched is e2d only; hs into existence + for fn, data in zip(fns, (f1[0], file404, file404)): + h, b = self.curl("%s/%s" % ("d", fn)) + self.assertEqual(b, data) + for fn in fns[1:]: + h, b = self.do_post_hs(dn, fn, f1, False) + for fn in fns: + h, b = self.curl("%s/%s" % ("d", fn)) + self.assertEqual(b, f1[0]) + + if not e2d: + continue + + # overwrite file + sfn, hs = self.do_post_hs(dn, fns[0], f2, True, replace=True) + self.do_post_data(dn, fns[0], f2, True, sfn, hs) + for fn, f in zip(fns, (f2, f1, f1)): + h, b = self.curl("%s/%s" % ("d", fn)) + self.assertEqual(b, f[0]) + def test(self): quick = True # sufficient for regular smoketests # quick = False dirnames = ["d1", "d2"] filenames = ["f1", "f2"] - files = [ - ( - "one", - "BfcDQQeKz2oG1CPSFyD5ZD1flTYm2IoCY23DqeeVgq6w", - "XMbpLRqVdtGmgggqjUI6uSoNMTqZVX4K6zr74XA1BRKc", - ), - ( - "two", - "ko1Q0eJNq3zKYs_oT83Pn8aVFgonj5G1wK8itwnYL4qj", - "fxvihWlnQIbVbUPr--TxyV41913kPLhXPD1ngXYxDfou", - ), - ] - # (data, chash, wark) + files = self.files self.ctr = 336 if quick else 2016 # estimated total num uploads self.conn = None @@ -127,10 +170,13 @@ class TestDedup(unittest.TestCase): def do_post(self, dn, fn, fi, first): print("\n\n# do_post", self.ctr, repr((dn, fn, fi, first))) self.ctr -= 1 + sfn, hs = self.do_post_hs(dn, fn, fi, first) + return self.do_post_data(dn, fn, fi, first, sfn, hs) - data, chash, wark = fi - hs = self.handshake(dn, fn, fi) - self.assertEqual(hs["wark"], wark) + def do_post_hs(self, dn, fn, fi, first, replace=False): + h, b = self.handshake(dn, fn, fi, replace=replace) + hs = json.loads(b) + self.assertEqual(hs["wark"], fi[2]) sfn = hs["name"] if sfn == fn: @@ -140,6 +186,10 @@ class TestDedup(unittest.TestCase): if first: raise Exception("wait what") + return sfn, hs + + def do_post_data(self, dn, fn, fi, first, sfn, hs): + data, chash, wark = fi if hs["hash"]: self.assertEqual(hs["hash"][0], chash) self.put_chunk(dn, wark, chash, data) @@ -150,16 +200,18 @@ class TestDedup(unittest.TestCase): self.assertEqual(b, data) return sfn - def handshake(self, dn, fn, fi): + def handshake(self, dn, fn, fi, replace=False): hdr = "POST /%s/ HTTP/1.1\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: %d\r\n\r\n" msg = {"name": fn, "size": 3, "lmod": 1234567890, "life": 0, "hash": [fi[1]]} + if replace: + msg["replace"] = True buf = json.dumps(msg).encode("utf-8") buf = (hdr % (dn, len(buf))).encode("utf-8") + buf - print("HS -->", buf) + # print("HS -->", buf) HttpCli(self.conn.setbuf(buf)).run() ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1) - print("HS <--", ret) - return json.loads(ret[1]) + # print("HS <--", ret) + return ret def put_chunk(self, dn, wark, chash, data): msg = [ @@ -173,7 +225,7 @@ class TestDedup(unittest.TestCase): data, ] buf = "\r\n".join(msg).encode("utf-8") - print("PUT -->", buf) + # print("PUT -->", buf) HttpCli(self.conn.setbuf(buf)).run() ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1) self.assertEqual(ret[1], "thank") @@ -181,6 +233,7 @@ class TestDedup(unittest.TestCase): def curl(self, url, binary=False, meth=None): h = "%s /%s HTTP/1.1\r\nConnection: close\r\n\r\n" h = h % (meth or "GET", url) + # print("CURL -->", url) HttpCli(self.conn.setbuf(h.encode("utf-8"))).run() if binary: h, b = self.conn.s._reply.split(b"\r\n\r\n", 1)