store upload ip and time

This commit is contained in:
ed 2021-07-29 00:30:10 +02:00
parent bf3941cf7a
commit 0c625a4e62
4 changed files with 66 additions and 32 deletions

View file

@ -498,7 +498,14 @@ class HttpCli(object):
if not self.args.nw:
vfs, vrem = vfs.get_dbv(rem)
self.conn.hsrv.broker.put(
False, "up2k.hash_file", vfs.realpath, vfs.flags, vrem, fn
False,
"up2k.hash_file",
vfs.realpath,
vfs.flags,
vrem,
fn,
self.ip,
time.time(),
)
return post_sz, sha_b64, remains, path
@ -905,6 +912,8 @@ class HttpCli(object):
dbv.flags,
vrem,
fname,
self.ip,
time.time(),
)
self.conn.nbyte += sz

View file

@ -244,7 +244,7 @@ class U2idx(object):
sret = []
c = cur.execute(q, v)
for hit in c:
w, ts, sz, rd, fn = hit
w, ts, sz, rd, fn, ip, at = hit
lim -= 1
if lim <= 0:
break

View file

@ -45,7 +45,7 @@ try:
except:
HAVE_SQLITE3 = False
DB_VER = 4
DB_VER = 5
class Up2k(object):
@ -522,7 +522,7 @@ class Up2k(object):
wark = up2k_wark_from_hashlist(self.salt, sz, hashes)
self.db_add(dbw[0], wark, rd, fn, lmod, sz)
self.db_add(dbw[0], wark, rd, fn, lmod, sz, "", 0)
dbw[1] += 1
ret += 1
td = time.time() - dbw[2]
@ -537,8 +537,8 @@ class Up2k(object):
rm = []
nchecked = 0
nfiles = next(cur.execute("select count(w) from up"))[0]
c = cur.execute("select * from up")
for dwark, dts, dsz, drd, dfn in c:
c = cur.execute("select rd, fn from up")
for drd, dfn in c:
nchecked += 1
if drd.startswith("//") or dfn.startswith("//"):
drd, dfn = s3dec(drd, dfn)
@ -941,6 +941,15 @@ class Up2k(object):
if not existed and ver is None:
return self._create_db(db_path, cur)
if ver == 4:
try:
m = "creating backup before upgrade: "
cur = self._backup_db(db_path, cur, ver, m)
self._upgrade_v4(cur)
ver = 5
except:
self.log("WARN: failed to upgrade from v4", 3)
if ver == DB_VER:
try:
nfiles = next(cur.execute("select count(w) from up"))[0]
@ -1011,9 +1020,10 @@ class Up2k(object):
idx = r"create index up_w on up(w)"
for cmd in [
r"create table up (w text, mt int, sz int, rd text, fn text)",
r"create table up (w text, mt int, sz int, rd text, fn text, ip text, at int)",
r"create index up_rd on up(rd)",
r"create index up_fn on up(fn)",
r"create index up_ip on up(ip)",
idx,
r"create table mt (w text, k text, v int)",
r"create index mt_w on mt(w)",
@ -1028,6 +1038,17 @@ class Up2k(object):
self.log("created DB at {}".format(db_path))
return cur
def _upgrade_v4(self, cur):
for cmd in [
r"alter table up add column ip text",
r"alter table up add column at int",
r"create index up_ip on up(ip)",
r"update kv set v=5 where k='sver'",
]:
cur.execute(cmd)
cur.connection.commit()
def handle_json(self, cj):
with self.mutex:
if not self.register_vpath(cj["ptop"], cj["vcfg"]):
@ -1051,7 +1072,7 @@ class Up2k(object):
argv = (wark[:16], wark)
cur = cur.execute(q, argv)
for _, dtime, dsize, dp_dir, dp_fn in cur:
for _, dtime, dsize, dp_dir, dp_fn, ip, at in cur:
if dp_dir.startswith("//") or dp_fn.startswith("//"):
dp_dir, dp_fn = s3dec(dp_dir, dp_fn)
@ -1065,6 +1086,8 @@ class Up2k(object):
"ptop": cj["ptop"],
"size": dsize,
"lmod": dtime,
"addr": ip,
"at": at,
"hash": [],
"need": [],
}
@ -1119,7 +1142,8 @@ class Up2k(object):
self._symlink(src, dst)
if cur:
a = [cj[x] for x in "prel name lmod size".split()]
a = [cj[x] for x in "prel name lmod size addr".split()]
a += [cj.get("at") or time.time()]
self.db_add(cur, wark, *a)
cur.connection.commit()
@ -1266,20 +1290,21 @@ class Up2k(object):
a = [dst, job["size"], (int(time.time()), int(job["lmod"]))]
self.lastmod_q.put(a)
a = [job[x] for x in "ptop wark prel name lmod size".split()]
a = [job[x] for x in "ptop wark prel name lmod size addr".split()]
a += [job.get("at") or time.time()]
if self.idx_wark(*a):
del self.registry[ptop][wark]
# in-memory registry is reserved for unfinished uploads
return ret, dst
def idx_wark(self, ptop, wark, rd, fn, lmod, sz):
def idx_wark(self, ptop, wark, rd, fn, lmod, sz, ip, at):
cur = self.cur.get(ptop)
if not cur:
return False
self.db_rm(cur, rd, fn)
self.db_add(cur, wark, rd, fn, lmod, sz)
self.db_add(cur, wark, rd, fn, lmod, sz, ip, at)
cur.connection.commit()
if "e2t" in self.flags[ptop]:
@ -1295,14 +1320,14 @@ class Up2k(object):
except:
db.execute(sql, s3enc(self.mem_cur, rd, fn))
def db_add(self, db, wark, rd, fn, ts, sz):
sql = "insert into up values (?,?,?,?,?)"
v = (wark, int(ts), sz, rd, fn)
def db_add(self, db, wark, rd, fn, ts, sz, ip, at):
sql = "insert into up values (?,?,?,?,?,?,?)"
v = (wark, int(ts), sz, rd, fn, ip or "", int(at or 0))
try:
db.execute(sql, v)
except:
rd, fn = s3enc(self.mem_cur, rd, fn)
v = (wark, int(ts), sz, rd, fn)
v = (wark, int(ts), sz, rd, fn, ip or "", int(at or 0))
db.execute(sql, v)
def handle_rm(self, uname, vpath):
@ -1331,7 +1356,7 @@ class Up2k(object):
with self.mutex:
try:
ptop = dbv.realpath
cur, wark, _, _ = self._find_from_vpath(ptop, vrem)
cur, wark, _, _, _, _ = self._find_from_vpath(ptop, vrem)
self._forget_file(ptop, vpath, cur, wark)
finally:
cur.connection.commit()
@ -1411,7 +1436,7 @@ class Up2k(object):
self.need_rescan[dvn.vpath] = 1
return "k"
c1, w, ftime, fsize = self._find_from_vpath(svn.realpath, srem)
c1, w, ftime, fsize, ip, at = self._find_from_vpath(svn.realpath, srem)
c2 = self.cur.get(dvn.realpath)
if ftime is None:
@ -1428,7 +1453,7 @@ class Up2k(object):
c1.connection.commit()
if c2:
self.db_add(c2, w, drd, dfn, ftime, fsize)
self.db_add(c2, w, drd, dfn, ftime, fsize, ip, at)
c2.connection.commit()
else:
self.log("not found in src db: [{}]".format(svp))
@ -1452,7 +1477,7 @@ class Up2k(object):
return None, None
rd, fn = vsplit(vrem)
q = "select w, mt, sz from up where rd=? and fn=? limit 1"
q = "select w, mt, sz, ip, at from up where rd=? and fn=? limit 1"
try:
c = cur.execute(q, (rd, fn))
except:
@ -1460,9 +1485,9 @@ class Up2k(object):
hit = c.fetchone()
if hit:
wark, ftime, fsize = hit
return cur, wark, ftime, fsize
return cur, None, None, None
wark, ftime, fsize, ip, at = hit
return cur, wark, ftime, fsize, ip, at
return cur, None, None, None, None, None
def _forget_file(self, ptop, vrem, cur, wark):
"""forgets file in db, fixes symlinks, does not delete"""
@ -1753,7 +1778,7 @@ class Up2k(object):
self.n_hashq -= 1
# self.log("hashq {}".format(self.n_hashq))
ptop, rd, fn = self.hashq.get()
ptop, rd, fn, ip, at = self.hashq.get()
# self.log("hashq {} pop {}/{}/{}".format(self.n_hashq, ptop, rd, fn))
if "e2d" not in self.flags[ptop]:
continue
@ -1764,12 +1789,12 @@ class Up2k(object):
hashes = self._hashlist_from_file(abspath)
wark = up2k_wark_from_hashlist(self.salt, inf.st_size, hashes)
with self.mutex:
self.idx_wark(ptop, wark, rd, fn, inf.st_mtime, inf.st_size)
self.idx_wark(ptop, wark, rd, fn, inf.st_mtime, inf.st_size, ip, at)
def hash_file(self, ptop, flags, rd, fn):
def hash_file(self, ptop, flags, rd, fn, ip, at):
with self.mutex:
self.register_vpath(ptop, flags)
self.hashq.put([ptop, rd, fn])
self.hashq.put([ptop, rd, fn, ip, at])
self.n_hashq += 1
# self.log("hashq {} push {}/{}/{}".format(self.n_hashq, ptop, rd, fn))

View file

@ -285,15 +285,15 @@ function Modpoll() {
console.log("modpoll diff |" + server_ref.length + "|, |" + server_now.length + "|");
this.modpoll.disabled = true;
var msg = [
"The document has changed on the server.<br />" +
"The document has changed on the server.",
"The changes will NOT be loaded into your editor automatically.",
"Press F5 or CTRL-R to refresh the page,<br />" +
"",
"Press F5 or CTRL-R to refresh the page,",
"replacing your document with the server copy.",
"",
"You can close this message to ignore and contnue."
];
return toast.warn(0, "<p>" + msg.join('</p>\n<p>') + '</p>');
return toast.warn(0, msg.join('\n'));
}
console.log('modpoll eq');