diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index bc1f607f..68f1a64a 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -855,63 +855,67 @@ class HttpCli(object): response = x.get() chunksize, cstart, path, lastmod = response - if self.args.nw: - path = os.devnull - - if remains > chunksize: - raise Pebkac(400, "your chunk is too big to fit") - - self.log("writing {} #{} @{} len {}".format(path, chash, cstart, remains)) - - reader = read_socket(self.sr, remains) - - f = None - fpool = not self.args.no_fpool - if fpool: - with self.mutex: - try: - f = self.u2fh.pop(path) - except: - pass - - f = f or open(fsenc(path), "rb+", 512 * 1024) - try: - f.seek(cstart[0]) - post_sz, _, sha_b64 = hashcopy(reader, f) + if self.args.nw: + path = os.devnull - if sha_b64 != chash: - raise Pebkac( - 400, - "your chunk got corrupted somehow (received {} bytes); expected vs received hash:\n{}\n{}".format( - post_sz, chash, sha_b64 - ), - ) + if remains > chunksize: + raise Pebkac(400, "your chunk is too big to fit") - if len(cstart) > 1 and path != os.devnull: - self.log( - "clone {} to {}".format( - cstart[0], " & ".join(unicode(x) for x in cstart[1:]) - ) - ) - ofs = 0 - while ofs < chunksize: - bufsz = min(chunksize - ofs, 4 * 1024 * 1024) - f.seek(cstart[0] + ofs) - buf = f.read(bufsz) - for wofs in cstart[1:]: - f.seek(wofs + ofs) - f.write(buf) + self.log("writing {} #{} @{} len {}".format(path, chash, cstart, remains)) - ofs += len(buf) + reader = read_socket(self.sr, remains) - self.log("clone {} done".format(cstart[0])) - finally: - if not fpool: - f.close() - else: + f = None + fpool = not self.args.no_fpool + if fpool: with self.mutex: - self.u2fh.put(path, f) + try: + f = self.u2fh.pop(path) + except: + pass + + f = f or open(fsenc(path), "rb+", 512 * 1024) + + try: + f.seek(cstart[0]) + post_sz, _, sha_b64 = hashcopy(reader, f) + + if sha_b64 != chash: + raise Pebkac( + 400, + "your chunk got corrupted somehow (received {} bytes); expected vs received hash:\n{}\n{}".format( + post_sz, chash, sha_b64 + ), + ) + + if len(cstart) > 1 and path != os.devnull: + self.log( + "clone {} to {}".format( + cstart[0], " & ".join(unicode(x) for x in cstart[1:]) + ) + ) + ofs = 0 + while ofs < chunksize: + bufsz = min(chunksize - ofs, 4 * 1024 * 1024) + f.seek(cstart[0] + ofs) + buf = f.read(bufsz) + for wofs in cstart[1:]: + f.seek(wofs + ofs) + f.write(buf) + + ofs += len(buf) + + self.log("clone {} done".format(cstart[0])) + finally: + if not fpool: + f.close() + else: + with self.mutex: + self.u2fh.put(path, f) + finally: + x = self.conn.hsrv.broker.put(True, "up2k.release_chunk", ptop, wark, chash) + x.get() # block client until released x = self.conn.hsrv.broker.put(True, "up2k.confirm_chunk", ptop, wark, chash) x = x.get() diff --git a/copyparty/up2k.py b/copyparty/up2k.py index b70129c8..a80168fb 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -494,6 +494,7 @@ class Up2k(object): if bos.path.exists(path): reg[k] = job job["poke"] = time.time() + job["busy"] = {} else: self.log("ign deleted file in snap: [{}]".format(path)) @@ -1256,6 +1257,7 @@ class Up2k(object): "at": at, "hash": [], "need": [], + "busy": {}, } if job and wark in reg: @@ -1338,6 +1340,7 @@ class Up2k(object): "t0": now, "hash": deepcopy(cj["hash"]), "need": [], + "busy": {}, } # client-provided, sanitized by _get_wark: name, size, lmod for k in [ @@ -1444,6 +1447,11 @@ class Up2k(object): if not nchunk: raise Pebkac(400, "unknown chunk") + if wark in job["busy"]: + raise Pebkac(400, "that chunk is already being written to") + + job["busy"][wark] = 1 + job["poke"] = time.time() chunksize = up2k_chunksize(job["size"]) @@ -1453,6 +1461,14 @@ class Up2k(object): return [chunksize, ofs, path, job["lmod"]] + def release_chunk(self, ptop, wark, chash): + with self.mutex: + job = self.registry[ptop].get(wark) + if job: + job["busy"].pop(wark, None) + + return [True] + def confirm_chunk(self, ptop, wark, chash): with self.mutex: try: @@ -1463,6 +1479,8 @@ class Up2k(object): except Exception as ex: return "confirm_chunk, wark, " + repr(ex) + job["busy"].pop(wark, None) + try: job["need"].remove(chash) except Exception as ex: diff --git a/copyparty/web/up2k.js b/copyparty/web/up2k.js index 5375c4ab..e8b50876 100644 --- a/copyparty/web/up2k.js +++ b/copyparty/web/up2k.js @@ -1843,7 +1843,8 @@ function up2k_init(subtle) { st.bytes.uploaded += cdr - car; t.bytes_uploaded += cdr - car; } - else if (txt.indexOf('already got that') !== -1) { + else if (txt.indexOf('already got that') + 1 || + txt.indexOf('already being written') + 1) { console.log("ignoring dupe-segment error", t); } else { @@ -1851,6 +1852,9 @@ function up2k_init(subtle) { xhr.status, t.name) + (txt || "no further information")); return; } + orz2(xhr); + } + function orz2(xhr) { apop(st.busy.upload, upt); apop(t.postlist, npart); if (!t.postlist.length) { @@ -1872,9 +1876,11 @@ function up2k_init(subtle) { if (crashed) return; - toast.err(9.98, "failed to upload a chunk,\n" + tries + " retries so far -- retrying in 10sec\n\n" + t.name); + if (!toast.visible) + toast.warn(9.98, "failed to upload a chunk;\nprobably harmless, continuing\n\n" + t.name); + console.log('chunkpit onerror,', ++tries, t); - setTimeout(do_send, 10 * 1000); + orz2(xhr); }; xhr.open('POST', t.purl, true); xhr.setRequestHeader("X-Up2k-Hash", t.hash[npart]);