From 3794aa7ac7d92cadfd446a3ff8b00de838885ace Mon Sep 17 00:00:00 2001 From: ed Date: Tue, 2 Jul 2019 22:58:31 +0000 Subject: [PATCH] support repetitive files --- copyparty/httpcli.py | 42 +++++++++++++++++++++++++++++++----------- copyparty/up2k.py | 27 ++++++++++++++++++--------- 2 files changed, 49 insertions(+), 20 deletions(-) diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index bc4447aa..9abd8a33 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -282,7 +282,7 @@ class HttpCli(object): x = self.conn.hsrv.broker.put(True, "up2k.handle_chunk", wark, chash) response = x.get() - chunksize, ofs, path = response + chunksize, cstart, path = response if self.args.nw: path = os.devnull @@ -290,21 +290,40 @@ class HttpCli(object): if remains > chunksize: raise Pebkac(400, "your chunk is too big to fit") - self.log("writing {} #{} @{} len {}".format(path, chash, ofs, remains)) + self.log("writing {} #{} @{} len {}".format(path, chash, cstart, remains)) reader = read_socket(self.sr, remains) - with open(path, "rb+") as f: - f.seek(ofs) + with open(path, "rb+", 512 * 1024) as f: + f.seek(cstart[0]) post_sz, _, sha_b64 = hashcopy(self.conn, reader, f) - if sha_b64 != chash: - raise Pebkac( - 400, - "your chunk got corrupted somehow:\n{} expected,\n{} received ({} bytes)".format( - chash, sha_b64, post_sz - ), - ) + if sha_b64 != chash: + raise Pebkac( + 400, + "your chunk got corrupted somehow:\n{} expected,\n{} received ({} bytes)".format( + chash, sha_b64, post_sz + ), + ) + + if len(cstart) > 1: + self.log( + "clone {} to {}".format( + cstart[0], " & ".join(str(x) for x in cstart[1:]) + ) + ) + ofs = 0 + while ofs < chunksize: + bufsz = min(chunksize - ofs, 4 * 1024 * 1024) + f.seek(cstart[0] + ofs) + buf = f.read(bufsz) + for wofs in cstart[1:]: + f.seek(wofs + ofs) + f.write(buf) + + ofs += len(buf) + + self.log("clone {} done".format(cstart[0])) x = self.conn.hsrv.broker.put(True, "up2k.confirm_chunk", wark, chash) response = x.get() @@ -652,3 +671,4 @@ class HttpCli(object): ) self.reply(html.encode("utf-8", "replace")) return True + diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 6d744538..fe8d4b4a 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -56,15 +56,24 @@ class Up2k(object): "name": cj["name"], "size": cj["size"], "hash": deepcopy(cj["hash"]), - # upload state - "pend": deepcopy(cj["hash"]), } + + # one chunk may occur multiple times in a file; + # filter to unique values for the list of missing chunks + # (preserve order to reduce disk thrashing) + job["need"] = [] + lut = {} + for k in cj["hash"]: + if k not in lut: + job["need"].append(k) + lut[k] = 1 + self._new_upload(job) return { "name": job["name"], "size": job["size"], - "hash": job["pend"], + "hash": job["need"], "wark": wark, } @@ -74,16 +83,15 @@ class Up2k(object): if not job: raise Pebkac(404, "unknown wark") - if chash not in job["pend"]: + if chash not in job["need"]: raise Pebkac(200, "already got that but thanks??") - try: - nchunk = job["hash"].index(chash) - except ValueError: + nchunk = [n for n, v in enumerate(job["hash"]) if v == chash] + if not nchunk: raise Pebkac(404, "unknown chunk") chunksize = self._get_chunksize(job["size"]) - ofs = nchunk * chunksize + ofs = [chunksize * x for x in nchunk] path = os.path.join(job["vdir"], job["name"]) @@ -91,7 +99,7 @@ class Up2k(object): def confirm_chunk(self, wark, chash): with self.mutex: - self.registry[wark]["pend"].remove(chash) + self.registry[wark]["need"].remove(chash) def _get_chunksize(self, filesize): chunksize = 1024 * 1024 @@ -131,3 +139,4 @@ class Up2k(object): with open(path, "wb") as f: f.seek(job["size"] - 1) f.write(b"e") +