#!/usr/bin/env python3 # coding: utf-8 from __future__ import print_function, unicode_literals import json import os import shutil import tempfile import unittest from itertools import product from copyparty.authsrv import AuthSrv from copyparty.httpcli import HttpCli from tests import util as tu from tests.util import Cfg class TestDedup(unittest.TestCase): def setUp(self): self.td = tu.get_ramdisk() def tearDown(self): os.chdir(tempfile.gettempdir()) shutil.rmtree(self.td) def reset(self): td = os.path.join(self.td, "vfs") if os.path.exists(td): shutil.rmtree(td) os.mkdir(td) os.chdir(td) return td def cinit(self): if self.conn: self.fstab = self.conn.hsrv.hub.up2k.fstab self.conn.hsrv.hub.up2k.shutdown() self.asrv = AuthSrv(self.args, self.log) self.conn = tu.VHttpConn(self.args, self.asrv, self.log, b"", True) if self.fstab: self.conn.hsrv.hub.up2k.fstab = self.fstab def test(self): quick = True # sufficient for regular smoketests # quick = False dirnames = ["d1", "d2"] filenames = ["f1", "f2"] files = [ ( "one", "BfcDQQeKz2oG1CPSFyD5ZD1flTYm2IoCY23DqeeVgq6w", "XMbpLRqVdtGmgggqjUI6uSoNMTqZVX4K6zr74XA1BRKc", ), ( "two", "ko1Q0eJNq3zKYs_oT83Pn8aVFgonj5G1wK8itwnYL4qj", "fxvihWlnQIbVbUPr--TxyV41913kPLhXPD1ngXYxDfou", ), ] # (data, chash, wark) self.ctr = 336 if quick else 2016 # estimated total num uploads self.conn = None self.fstab = None for e2d in [True, False]: self.args = Cfg(v=[".::A"], a=[], e2d=e2d) for cm1 in product(dirnames, filenames, files): for cm2 in product(dirnames, filenames, files): if cm1 == cm2: continue for cm3 in product(dirnames, filenames, files): if cm3 in (cm1, cm2): continue f1 = cm1[2] f2 = cm2[2] f3 = cm3[2] if not e2d: rms = [-1] elif f1 == f2: if f1 == f3: rms = [0, 1, 2] else: rms = [0, 1] elif f1 == f3: rms = [0, 2] else: rms = [1, 2] for rm in rms: self.do_tc(cm1, cm2, cm3, rm) if quick: break def do_tc(self, cm1, cm2, cm3, irm): dn1, fn1, f1 = cm1 dn2, fn2, f2 = cm2 dn3, fn3, f3 = cm3 self.reset() self.cinit() fn1 = self.do_post(dn1, fn1, f1, True) fn2 = self.do_post(dn2, fn2, f2, False) fn3 = self.do_post(dn3, fn3, f3, False) if irm < 0: return cms = [(dn1, fn1, f1), (dn2, fn2, f2), (dn3, fn3, f3)] rm = cms[irm] dn, fn, _ = rm h, b = self.curl("%s/%s?delete" % (dn, fn), meth="POST") self.assertIn(" 200 OK", h) self.assertIn("deleted 1 files", b) h, b = self.curl("%s/%s" % (dn, fn)) self.assertIn(" 404 Not Fo", h) for cm in cms: if cm == rm: continue dn, fn, f = cm h, b = self.curl("%s/%s" % (dn, fn)) self.assertEqual(b, f[0]) def do_post(self, dn, fn, fi, first): print("\n\n# do_post", self.ctr, repr((dn, fn, fi, first))) self.ctr -= 1 data, chash, wark = fi hs = self.handshake(dn, fn, fi) self.assertEqual(hs["wark"], wark) sfn = hs["name"] if sfn == fn: print("using original name " + fn) else: print(fn + " got renamed to " + sfn) if first: raise Exception("wait what") if hs["hash"]: self.assertEqual(hs["hash"][0], chash) self.put_chunk(dn, wark, chash, data) elif first: raise Exception("found first; %r, %r" % ((dn, fn, fi), hs)) h, b = self.curl("%s/%s" % (dn, sfn)) self.assertEqual(b, data) return sfn def handshake(self, dn, fn, fi): hdr = "POST /%s/ HTTP/1.1\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: %d\r\n\r\n" msg = {"name": fn, "size": 3, "lmod": 1234567890, "life": 0, "hash": [fi[1]]} buf = json.dumps(msg).encode("utf-8") buf = (hdr % (dn, len(buf))).encode("utf-8") + buf print("HS -->", buf) HttpCli(self.conn.setbuf(buf)).run() ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1) print("HS <--", ret) return json.loads(ret[1]) def put_chunk(self, dn, wark, chash, data): msg = [ "POST /%s/ HTTP/1.1" % (dn,), "Connection: close", "Content-Type: application/octet-stream", "Content-Length: 3", "X-Up2k-Hash: " + chash, "X-Up2k-Wark: " + wark, "", data, ] buf = "\r\n".join(msg).encode("utf-8") print("PUT -->", buf) HttpCli(self.conn.setbuf(buf)).run() ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1) self.assertEqual(ret[1], "thank") def curl(self, url, binary=False, meth=None): h = "%s /%s HTTP/1.1\r\nConnection: close\r\n\r\n" h = h % (meth or "GET", url) HttpCli(self.conn.setbuf(h.encode("utf-8"))).run() if binary: h, b = self.conn.s._reply.split(b"\r\n\r\n", 1) return [h.decode("utf-8"), b] return self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1) def log(self, src, msg, c=0): print(msg)