#!/usr/bin/env python3 # coding: utf-8 from __future__ import print_function, unicode_literals import json import os import shutil import tempfile import unittest from itertools import product from copyparty.authsrv import AuthSrv from copyparty.httpcli import HttpCli from tests import util as tu from tests.util import Cfg class TestDedup(unittest.TestCase): def setUp(self): self.td = tu.get_ramdisk() def tearDown(self): os.chdir(tempfile.gettempdir()) shutil.rmtree(self.td) def reset(self): td = os.path.join(self.td, "vfs") if os.path.exists(td): shutil.rmtree(td) os.mkdir(td) os.chdir(td) return td def test(self): quick = True # sufficient for regular smoketests # quick = False dirnames = ["d1", "d2"] filenames = ["f1", "f2"] files = [ ( "one", "BfcDQQeKz2oG1CPSFyD5ZD1flTYm2IoCY23DqeeVgq6w", "XMbpLRqVdtGmgggqjUI6uSoNMTqZVX4K6zr74XA1BRKc", ), ( "two", "ko1Q0eJNq3zKYs_oT83Pn8aVFgonj5G1wK8itwnYL4qj", "fxvihWlnQIbVbUPr--TxyV41913kPLhXPD1ngXYxDfou", ), ] # (data, chash, wark) self.ctr = 336 if quick else 2016 # estimated total num uploads self.conn = None fstab = None for e2d in [True, False]: self.args = Cfg(v=[".::A"], a=[], e2d=e2d) for dn1, fn1, f1 in product(dirnames, filenames, files): cm1 = (dn1, fn1, f1) for dn2, fn2, f2 in product(dirnames, filenames, files): cm2 = (dn2, fn2, f2) if cm1 == cm2: continue for dn3, fn3, f3 in product(dirnames, filenames, files): cm3 = (dn3, fn3, f3) if cm3 in (cm1, cm2): continue self.reset() if self.conn: fstab = self.conn.hsrv.hub.up2k.fstab self.conn.hsrv.hub.up2k.shutdown() self.asrv = AuthSrv(self.args, self.log) self.conn = tu.VHttpConn( self.args, self.asrv, self.log, b"", True ) if fstab: self.conn.hsrv.hub.up2k.fstab = fstab self.do_post(dn1, fn1, f1, True) self.do_post(dn2, fn2, f2, False) self.do_post(dn3, fn3, f3, False) if quick: break def do_post(self, dn, fn, fi, first): print("\n\n# do_post", self.ctr, repr((dn, fn, fi, first))) self.ctr -= 1 data, chash, wark = fi hs = self.handshake(dn, fn, fi) self.assertEqual(hs["wark"], wark) sfn = hs["name"] if sfn == fn: print("using original name " + fn) else: print(fn + " got renamed to " + sfn) if first: raise Exception("wait what") if hs["hash"]: self.assertEqual(hs["hash"][0], chash) self.put_chunk(dn, wark, chash, data) elif first: raise Exception("found first; %r, %r" % ((dn, fn, fi), hs)) h, b = self.curl("%s/%s" % (dn, sfn)) self.assertEqual(b, data) def handshake(self, dn, fn, fi): hdr = "POST /%s/ HTTP/1.1\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: %d\r\n\r\n" msg = {"name": fn, "size": 3, "lmod": 1234567890, "life": 0, "hash": [fi[1]]} buf = json.dumps(msg).encode("utf-8") buf = (hdr % (dn, len(buf))).encode("utf-8") + buf print("HS -->", buf) HttpCli(self.conn.setbuf(buf)).run() ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1) print("HS <--", ret) return json.loads(ret[1]) def put_chunk(self, dn, wark, chash, data): msg = [ "POST /%s/ HTTP/1.1" % (dn,), "Connection: close", "Content-Type: application/octet-stream", "Content-Length: 3", "X-Up2k-Hash: " + chash, "X-Up2k-Wark: " + wark, "", data, ] buf = "\r\n".join(msg).encode("utf-8") print("PUT -->", buf) HttpCli(self.conn.setbuf(buf)).run() ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1) self.assertEqual(ret[1], "thank") def curl(self, url, binary=False): h = "GET /%s HTTP/1.1\r\nConnection: close\r\n\r\n" HttpCli(self.conn.setbuf((h % (url,)).encode("utf-8"))).run() if binary: h, b = self.conn.s._reply.split(b"\r\n\r\n", 1) return [h.decode("utf-8"), b] return self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1) def log(self, src, msg, c=0): print(msg)