up2k: fully parallelize handshakes/uploads

This commit is contained in:
ed 2021-09-26 12:57:16 +02:00
parent 777a50063d
commit f9a66ffb0e
2 changed files with 61 additions and 10 deletions

View file

@ -66,6 +66,7 @@ class Up2k(object):
self.n_tagq = 0 self.n_tagq = 0
self.volstate = {} self.volstate = {}
self.need_rescan = {} self.need_rescan = {}
self.dupesched = {}
self.registry = {} self.registry = {}
self.entags = {} self.entags = {}
self.flags = {} self.flags = {}
@ -1121,8 +1122,14 @@ class Up2k(object):
continue continue
dp_abs = "/".join([cj["ptop"], dp_dir, dp_fn]) dp_abs = "/".join([cj["ptop"], dp_dir, dp_fn])
# relying on path.exists to return false on broken symlinks # relying on this to fail on broken symlinks
if bos.path.exists(dp_abs): try:
sz = bos.path.getsize(dp_abs)
except:
sz = 0
if sz:
# self.log("--- " + wark + " " + dp_abs + " found file", 4)
job = { job = {
"name": dp_fn, "name": dp_fn,
"prel": dp_dir, "prel": dp_dir,
@ -1137,6 +1144,7 @@ class Up2k(object):
} }
if job and wark in reg: if job and wark in reg:
# self.log("pop " + wark + " " + job["name"] + " handle_json db", 4)
del reg[wark] del reg[wark]
if job or wark in reg: if job or wark in reg:
@ -1165,7 +1173,15 @@ class Up2k(object):
self.log("unfinished:\n {0}\n {1}".format(src, dst)) self.log("unfinished:\n {0}\n {1}".format(src, dst))
err = "partial upload exists at a different location; please resume uploading here instead:\n" err = "partial upload exists at a different location; please resume uploading here instead:\n"
err += "/" + vsrc + " " err += "/" + vsrc + " "
dupe = [cj["prel"], cj["name"]]
try:
self.dupesched[src].append(dupe)
except:
self.dupesched[src] = [dupe]
raise Pebkac(400, err) raise Pebkac(400, err)
elif "nodupe" in self.flags[job["ptop"]]: elif "nodupe" in self.flags[job["ptop"]]:
self.log("dupe-reject:\n {0}\n {1}".format(src, dst)) self.log("dupe-reject:\n {0}\n {1}".format(src, dst))
err = "upload rejected, file already exists:\n/" + vsrc + " " err = "upload rejected, file already exists:\n/" + vsrc + " "
@ -1357,6 +1373,7 @@ class Up2k(object):
except Exception as ex: except Exception as ex:
return "finish_upload, wark, " + repr(ex) return "finish_upload, wark, " + repr(ex)
# self.log("--- " + wark + " " + dst + " finish_upload atomic " + dst, 4)
atomic_move(src, dst) atomic_move(src, dst)
if ANYWIN: if ANYWIN:
@ -1366,9 +1383,28 @@ class Up2k(object):
a = [job[x] for x in "ptop wark prel name lmod size addr".split()] a = [job[x] for x in "ptop wark prel name lmod size addr".split()]
a += [job.get("at") or time.time()] a += [job.get("at") or time.time()]
if self.idx_wark(*a): if self.idx_wark(*a):
# self.log("pop " + wark + " " + dst + " finish_upload idx_wark", 4)
del self.registry[ptop][wark] del self.registry[ptop][wark]
# in-memory registry is reserved for unfinished uploads # in-memory registry is reserved for unfinished uploads
dupes = self.dupesched.pop(dst, [])
if not dupes:
return
cur = self.cur.get(ptop)
for rd, fn in dupes:
d2 = os.path.join(ptop, rd, fn)
if os.path.exists(d2):
continue
self._symlink(dst, d2)
if cur:
self.db_rm(cur, rd, fn)
self.db_add(cur, wark, rd, fn, *a[-4:])
if cur:
cur.connection.commit()
def idx_wark(self, ptop, wark, rd, fn, lmod, sz, ip, at): def idx_wark(self, ptop, wark, rd, fn, lmod, sz, ip, at):
cur = self.cur.get(ptop) cur = self.cur.get(ptop)
if not cur: if not cur:

View file

@ -1081,11 +1081,6 @@ function up2k_init(subtle) {
st.busy.handshake.length) st.busy.handshake.length)
return false; return false;
if (st.busy.handshake.length)
for (var n = t.n - 1; n >= t.n - parallel_uploads && n >= 0; n--)
if (st.files[n].t_uploading)
return false;
if ((uc.multitask ? 1 : 0) < if ((uc.multitask ? 1 : 0) <
st.todo.upload.length + st.todo.upload.length +
st.busy.upload.length) st.busy.upload.length)
@ -1138,6 +1133,17 @@ function up2k_init(subtle) {
st.busy.handshake.length + st.busy.handshake.length +
st.busy.upload.length; st.busy.upload.length;
if (was_busy && !is_busy) {
for (var a = 0; a < st.files.length; a++) {
var t = st.files[a];
if (t.want_recheck) {
t.want_recheck = false;
push_t(st.todo.handshake, t);
}
}
is_busy = st.todo.handshake.length;
}
if (was_busy != is_busy) { if (was_busy != is_busy) {
was_busy = is_busy; was_busy = is_busy;
@ -1467,6 +1473,7 @@ function up2k_init(subtle) {
} }
t.done = true; t.done = true;
t.fobj = null;
st.bytes.hashed += t.size; st.bytes.hashed += t.size;
st.bytes.finished += t.size; st.bytes.finished += t.size;
pvis.move(t.n, 'bz'); pvis.move(t.n, 'bz');
@ -1551,6 +1558,7 @@ function up2k_init(subtle) {
apop(st.busy.handshake, t); apop(st.busy.handshake, t);
st.bytes.finished += t.size; st.bytes.finished += t.size;
t.done = true; t.done = true;
t.fobj = null;
tasker(); tasker();
return; return;
} }
@ -1617,6 +1625,7 @@ function up2k_init(subtle) {
if (done) { if (done) {
t.done = true; t.done = true;
t.fobj = null;
st.bytes.finished += t.size - t.bytes_uploaded; st.bytes.finished += t.size - t.bytes_uploaded;
var spd1 = (t.size / ((t.t_hashed - t.t_hashing) / 1000.)) / (1024 * 1024.), var spd1 = (t.size / ((t.t_hashed - t.t_hashing) / 1000.)) / (1024 * 1024.),
spd2 = (t.size / ((t.t_uploaded - t.t_uploading) / 1000.)) / (1024 * 1024.); spd2 = (t.size / ((t.t_uploaded - t.t_uploading) / 1000.)) / (1024 * 1024.);
@ -1651,13 +1660,19 @@ function up2k_init(subtle) {
} }
st.bytes.finished += t.size; st.bytes.finished += t.size;
if (rsp.indexOf('partial upload exists') !== -1 || var err_pend = rsp.indexOf('partial upload exists') + 1,
rsp.indexOf('file already exists') !== -1) { err_dupe = rsp.indexOf('file already exists') + 1;
if (err_pend || err_dupe) {
err = rsp; err = rsp;
ofs = err.indexOf('\n/'); ofs = err.indexOf('\n/');
if (ofs !== -1) { if (ofs !== -1) {
err = err.slice(0, ofs + 1) + linksplit(err.slice(ofs + 2).trimEnd()).join(' '); err = err.slice(0, ofs + 1) + linksplit(err.slice(ofs + 2).trimEnd()).join(' ');
} }
if (!t.rechecks && err_pend) {
t.rechecks = 0;
t.want_recheck = true;
}
} }
if (err != "") { if (err != "") {
pvis.seth(t.n, 1, "ERROR"); pvis.seth(t.n, 1, "ERROR");
@ -1755,7 +1770,7 @@ function up2k_init(subtle) {
if (crashed) if (crashed)
return; return;
toast.err(9, "failed to upload a chunk,\n" + tries + " retries so far -- retrying in 10sec\n\n" + t.name); toast.err(9.98, "failed to upload a chunk,\n" + tries + " retries so far -- retrying in 10sec\n\n" + t.name);
console.log('chunkpit onerror,', ++tries, t); console.log('chunkpit onerror,', ++tries, t);
setTimeout(do_send, 10 * 1000); setTimeout(do_send, 10 * 1000);
}; };