handle multiple simultaneous uploads of the same file

This commit is contained in:
ed 2021-11-14 15:03:11 +01:00
parent 62c3272351
commit b206c5d64e
3 changed files with 82 additions and 54 deletions

View file

@ -855,63 +855,67 @@ class HttpCli(object):
response = x.get()
chunksize, cstart, path, lastmod = response
if self.args.nw:
path = os.devnull
if remains > chunksize:
raise Pebkac(400, "your chunk is too big to fit")
self.log("writing {} #{} @{} len {}".format(path, chash, cstart, remains))
reader = read_socket(self.sr, remains)
f = None
fpool = not self.args.no_fpool
if fpool:
with self.mutex:
try:
f = self.u2fh.pop(path)
except:
pass
f = f or open(fsenc(path), "rb+", 512 * 1024)
try:
f.seek(cstart[0])
post_sz, _, sha_b64 = hashcopy(reader, f)
if self.args.nw:
path = os.devnull
if sha_b64 != chash:
raise Pebkac(
400,
"your chunk got corrupted somehow (received {} bytes); expected vs received hash:\n{}\n{}".format(
post_sz, chash, sha_b64
),
)
if remains > chunksize:
raise Pebkac(400, "your chunk is too big to fit")
if len(cstart) > 1 and path != os.devnull:
self.log(
"clone {} to {}".format(
cstart[0], " & ".join(unicode(x) for x in cstart[1:])
)
)
ofs = 0
while ofs < chunksize:
bufsz = min(chunksize - ofs, 4 * 1024 * 1024)
f.seek(cstart[0] + ofs)
buf = f.read(bufsz)
for wofs in cstart[1:]:
f.seek(wofs + ofs)
f.write(buf)
self.log("writing {} #{} @{} len {}".format(path, chash, cstart, remains))
ofs += len(buf)
reader = read_socket(self.sr, remains)
self.log("clone {} done".format(cstart[0]))
finally:
if not fpool:
f.close()
else:
f = None
fpool = not self.args.no_fpool
if fpool:
with self.mutex:
self.u2fh.put(path, f)
try:
f = self.u2fh.pop(path)
except:
pass
f = f or open(fsenc(path), "rb+", 512 * 1024)
try:
f.seek(cstart[0])
post_sz, _, sha_b64 = hashcopy(reader, f)
if sha_b64 != chash:
raise Pebkac(
400,
"your chunk got corrupted somehow (received {} bytes); expected vs received hash:\n{}\n{}".format(
post_sz, chash, sha_b64
),
)
if len(cstart) > 1 and path != os.devnull:
self.log(
"clone {} to {}".format(
cstart[0], " & ".join(unicode(x) for x in cstart[1:])
)
)
ofs = 0
while ofs < chunksize:
bufsz = min(chunksize - ofs, 4 * 1024 * 1024)
f.seek(cstart[0] + ofs)
buf = f.read(bufsz)
for wofs in cstart[1:]:
f.seek(wofs + ofs)
f.write(buf)
ofs += len(buf)
self.log("clone {} done".format(cstart[0]))
finally:
if not fpool:
f.close()
else:
with self.mutex:
self.u2fh.put(path, f)
finally:
x = self.conn.hsrv.broker.put(True, "up2k.release_chunk", ptop, wark, chash)
x.get() # block client until released
x = self.conn.hsrv.broker.put(True, "up2k.confirm_chunk", ptop, wark, chash)
x = x.get()

View file

@ -494,6 +494,7 @@ class Up2k(object):
if bos.path.exists(path):
reg[k] = job
job["poke"] = time.time()
job["busy"] = {}
else:
self.log("ign deleted file in snap: [{}]".format(path))
@ -1256,6 +1257,7 @@ class Up2k(object):
"at": at,
"hash": [],
"need": [],
"busy": {},
}
if job and wark in reg:
@ -1338,6 +1340,7 @@ class Up2k(object):
"t0": now,
"hash": deepcopy(cj["hash"]),
"need": [],
"busy": {},
}
# client-provided, sanitized by _get_wark: name, size, lmod
for k in [
@ -1444,6 +1447,11 @@ class Up2k(object):
if not nchunk:
raise Pebkac(400, "unknown chunk")
if wark in job["busy"]:
raise Pebkac(400, "that chunk is already being written to")
job["busy"][wark] = 1
job["poke"] = time.time()
chunksize = up2k_chunksize(job["size"])
@ -1453,6 +1461,14 @@ class Up2k(object):
return [chunksize, ofs, path, job["lmod"]]
def release_chunk(self, ptop, wark, chash):
with self.mutex:
job = self.registry[ptop].get(wark)
if job:
job["busy"].pop(wark, None)
return [True]
def confirm_chunk(self, ptop, wark, chash):
with self.mutex:
try:
@ -1463,6 +1479,8 @@ class Up2k(object):
except Exception as ex:
return "confirm_chunk, wark, " + repr(ex)
job["busy"].pop(wark, None)
try:
job["need"].remove(chash)
except Exception as ex:

View file

@ -1843,7 +1843,8 @@ function up2k_init(subtle) {
st.bytes.uploaded += cdr - car;
t.bytes_uploaded += cdr - car;
}
else if (txt.indexOf('already got that') !== -1) {
else if (txt.indexOf('already got that') + 1 ||
txt.indexOf('already being written') + 1) {
console.log("ignoring dupe-segment error", t);
}
else {
@ -1851,6 +1852,9 @@ function up2k_init(subtle) {
xhr.status, t.name) + (txt || "no further information"));
return;
}
orz2(xhr);
}
function orz2(xhr) {
apop(st.busy.upload, upt);
apop(t.postlist, npart);
if (!t.postlist.length) {
@ -1872,9 +1876,11 @@ function up2k_init(subtle) {
if (crashed)
return;
toast.err(9.98, "failed to upload a chunk,\n" + tries + " retries so far -- retrying in 10sec\n\n" + t.name);
if (!toast.visible)
toast.warn(9.98, "failed to upload a chunk;\nprobably harmless, continuing\n\n" + t.name);
console.log('chunkpit onerror,', ++tries, t);
setTimeout(do_send, 10 * 1000);
orz2(xhr);
};
xhr.open('POST', t.purl, true);
xhr.setRequestHeader("X-Up2k-Hash", t.hash[npart]);