mirror of
https://github.com/9001/copyparty.git
synced 2025-08-17 09:02:15 -06:00
async tagging of incoming files
This commit is contained in:
parent
43a23bf733
commit
914e22c53e
|
@ -345,6 +345,10 @@ class HttpCli(object):
|
|||
with open(path, "wb", 512 * 1024) as f:
|
||||
post_sz, _, sha_b64 = hashcopy(self.conn, reader, f)
|
||||
|
||||
self.conn.hsrv.broker.put(
|
||||
False, "up2k.hash_file", vfs.realpath, vfs.flags, rem, fn
|
||||
)
|
||||
|
||||
return post_sz, sha_b64, remains, path
|
||||
|
||||
def handle_stash(self):
|
||||
|
@ -675,6 +679,9 @@ class HttpCli(object):
|
|||
raise Pebkac(400, "empty files in post")
|
||||
|
||||
files.append([sz, sha512_hex])
|
||||
self.conn.hsrv.broker.put(
|
||||
False, "up2k.hash_file", vfs.realpath, vfs.flags, rem, fname
|
||||
)
|
||||
self.conn.nbyte += sz
|
||||
|
||||
except Pebkac:
|
||||
|
@ -1251,7 +1258,7 @@ class HttpCli(object):
|
|||
"sz": sz,
|
||||
"ext": ext,
|
||||
"dt": dt,
|
||||
"ts": inf.st_mtime,
|
||||
"ts": int(inf.st_mtime),
|
||||
}
|
||||
if is_dir:
|
||||
dirs.append(item)
|
||||
|
|
|
@ -51,13 +51,14 @@ class Up2k(object):
|
|||
self.broker = broker
|
||||
self.args = broker.args
|
||||
self.log_func = broker.log
|
||||
self.persist = self.args.e2d
|
||||
|
||||
# config
|
||||
self.salt = broker.args.salt
|
||||
|
||||
# state
|
||||
self.mutex = threading.Lock()
|
||||
self.hashq = Queue()
|
||||
self.tagq = Queue()
|
||||
self.registry = {}
|
||||
self.entags = {}
|
||||
self.flags = {}
|
||||
|
@ -83,18 +84,26 @@ class Up2k(object):
|
|||
# static
|
||||
self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$")
|
||||
|
||||
if self.persist and not HAVE_SQLITE3:
|
||||
if not HAVE_SQLITE3:
|
||||
self.log("could not initialize sqlite3, will use in-memory registry only")
|
||||
|
||||
# this is kinda jank
|
||||
auth = AuthSrv(self.args, self.log, False)
|
||||
self.init_indexes(auth)
|
||||
have_e2d = self.init_indexes(auth)
|
||||
|
||||
if self.persist:
|
||||
if have_e2d:
|
||||
thr = threading.Thread(target=self._snapshot)
|
||||
thr.daemon = True
|
||||
thr.start()
|
||||
|
||||
thr = threading.Thread(target=self._tagger)
|
||||
thr.daemon = True
|
||||
thr.start()
|
||||
|
||||
thr = threading.Thread(target=self._hasher)
|
||||
thr.daemon = True
|
||||
thr.start()
|
||||
|
||||
def log(self, msg):
|
||||
self.log_func("up2k", msg + "\033[K")
|
||||
|
||||
|
@ -137,6 +146,7 @@ class Up2k(object):
|
|||
self.pp = ProgressPrinter()
|
||||
vols = auth.vfs.all_vols.values()
|
||||
t0 = time.time()
|
||||
have_e2d = False
|
||||
|
||||
live_vols = []
|
||||
for vol in vols:
|
||||
|
@ -157,6 +167,9 @@ class Up2k(object):
|
|||
|
||||
self.entags[vol.realpath] = en
|
||||
|
||||
if "e2d" in vol.flags:
|
||||
have_e2d = True
|
||||
|
||||
if "e2ds" in vol.flags:
|
||||
r = self._build_file_index(vol, vols)
|
||||
if not r:
|
||||
|
@ -185,6 +198,8 @@ class Up2k(object):
|
|||
msg = "\033[31mcould not read tags because no backends are available (mutagen or ffprobe)\033[0m"
|
||||
self.log(msg)
|
||||
|
||||
return have_e2d
|
||||
|
||||
def register_vpath(self, ptop, flags):
|
||||
with self.mutex:
|
||||
if ptop in self.registry:
|
||||
|
@ -192,7 +207,7 @@ class Up2k(object):
|
|||
|
||||
reg = {}
|
||||
path = os.path.join(ptop, ".hist", "up2k.snap")
|
||||
if self.persist and os.path.exists(path):
|
||||
if "e2d" in flags and os.path.exists(path):
|
||||
with gzip.GzipFile(path, "rb") as f:
|
||||
j = f.read().decode("utf-8")
|
||||
|
||||
|
@ -206,7 +221,7 @@ class Up2k(object):
|
|||
|
||||
self.flags[ptop] = flags
|
||||
self.registry[ptop] = reg
|
||||
if not self.persist or not HAVE_SQLITE3 or "d2d" in flags:
|
||||
if not HAVE_SQLITE3 or "e2d" not in flags or "d2d" in flags:
|
||||
return None
|
||||
|
||||
try:
|
||||
|
@ -286,6 +301,7 @@ class Up2k(object):
|
|||
self.log("stat: {} @ [{}]".format(repr(ex), abspath))
|
||||
continue
|
||||
|
||||
lmod = int(inf.st_mtime)
|
||||
if stat.S_ISDIR(inf.st_mode):
|
||||
if abspath in excl or abspath == histdir:
|
||||
continue
|
||||
|
@ -311,11 +327,11 @@ class Up2k(object):
|
|||
self.log(m.format(top, rp, len(in_db), rep_db))
|
||||
dts = -1
|
||||
|
||||
if dts == inf.st_mtime and dsz == inf.st_size:
|
||||
if dts == lmod and dsz == inf.st_size:
|
||||
continue
|
||||
|
||||
m = "reindex [{}] => [{}] ({}/{}) ({}/{})".format(
|
||||
top, rp, dts, inf.st_mtime, dsz, inf.st_size
|
||||
top, rp, dts, lmod, dsz, inf.st_size
|
||||
)
|
||||
self.log(m)
|
||||
self.db_rm(dbw[0], rd, fn)
|
||||
|
@ -334,7 +350,7 @@ class Up2k(object):
|
|||
continue
|
||||
|
||||
wark = up2k_wark_from_hashlist(self.salt, inf.st_size, hashes)
|
||||
self.db_add(dbw[0], wark, rd, fn, inf.st_mtime, inf.st_size)
|
||||
self.db_add(dbw[0], wark, rd, fn, lmod, inf.st_size)
|
||||
dbw[1] += 1
|
||||
ret += 1
|
||||
td = time.time() - dbw[2]
|
||||
|
@ -425,17 +441,9 @@ class Up2k(object):
|
|||
|
||||
abspath = os.path.join(ptop, rd, fn)
|
||||
self.pp.msg = "c{} {}".format(n_left, abspath)
|
||||
tags = self.mtag.get(abspath)
|
||||
tags = {k: v for k, v in tags.items() if k in entags}
|
||||
if not tags:
|
||||
# indicate scanned without tags
|
||||
tags = {"x": 0}
|
||||
|
||||
for k, v in tags.items():
|
||||
q = "insert into mt values (?,?,?)"
|
||||
c2.execute(q, (w[:16], k, v))
|
||||
n_add += 1
|
||||
n_buf += 1
|
||||
n_tags = self._tag_file(c2, entags, w, abspath)
|
||||
n_add += n_tags
|
||||
n_buf += n_tags
|
||||
|
||||
td = time.time() - last_write
|
||||
if n_buf >= 4096 or td >= 60:
|
||||
|
@ -448,6 +456,21 @@ class Up2k(object):
|
|||
|
||||
return n_add, n_rm, True
|
||||
|
||||
def _tag_file(self, write_cur, entags, wark, abspath):
|
||||
tags = self.mtag.get(abspath)
|
||||
tags = {k: v for k, v in tags.items() if k in entags}
|
||||
if not tags:
|
||||
# indicate scanned without tags
|
||||
tags = {"x": 0}
|
||||
|
||||
ret = 0
|
||||
for k, v in tags.items():
|
||||
q = "insert into mt values (?,?,?)"
|
||||
write_cur.execute(q, (wark[:16], k, v))
|
||||
ret += 1
|
||||
|
||||
return ret
|
||||
|
||||
def _orz(self, db_path):
|
||||
return sqlite3.connect(db_path, check_same_thread=False).cursor()
|
||||
|
||||
|
@ -779,17 +802,33 @@ class Up2k(object):
|
|||
if WINDOWS:
|
||||
self.lastmod_q.put([dst, (int(time.time()), int(job["lmod"]))])
|
||||
|
||||
cur = self.cur.get(job["ptop"], None)
|
||||
if cur:
|
||||
j = job
|
||||
self.db_rm(cur, j["prel"], j["name"])
|
||||
self.db_add(cur, j["wark"], j["prel"], j["name"], j["lmod"], j["size"])
|
||||
cur.connection.commit()
|
||||
|
||||
# legit api sware 2 me mum
|
||||
if self.idx_wark(
|
||||
job["ptop"],
|
||||
job["wark"],
|
||||
job["prel"],
|
||||
job["name"],
|
||||
job["lmod"],
|
||||
job["size"],
|
||||
):
|
||||
del self.registry[ptop][wark]
|
||||
# in-memory registry is reserved for unfinished uploads
|
||||
|
||||
return ret, dst
|
||||
return ret, dst
|
||||
|
||||
def idx_wark(self, ptop, wark, rd, fn, lmod, sz):
|
||||
cur = self.cur.get(ptop, None)
|
||||
if not cur:
|
||||
return False
|
||||
|
||||
self.db_rm(cur, rd, fn)
|
||||
self.db_add(cur, wark, rd, fn, int(lmod), sz)
|
||||
cur.connection.commit()
|
||||
|
||||
if "e2t" in self.flags[ptop]:
|
||||
self.tagq.put([ptop, wark, rd, fn])
|
||||
|
||||
return True
|
||||
|
||||
def db_rm(self, db, rd, fn):
|
||||
sql = "delete from up where rd = ? and fn = ?"
|
||||
|
@ -940,6 +979,45 @@ class Up2k(object):
|
|||
self.log("snap: {} |{}|".format(path, len(reg.keys())))
|
||||
prev[k] = etag
|
||||
|
||||
def _tagger(self):
|
||||
while True:
|
||||
ptop, wark, rd, fn = self.tagq.get()
|
||||
abspath = os.path.join(ptop, rd, fn)
|
||||
self.log("tagging " + abspath)
|
||||
with self.mutex:
|
||||
cur = self.cur[ptop]
|
||||
if not cur:
|
||||
self.log("\033[31mno cursor to write tags with??")
|
||||
continue
|
||||
|
||||
entags = self.entags[ptop]
|
||||
if not entags:
|
||||
self.log("\033[33mno entags okay.jpg")
|
||||
continue
|
||||
|
||||
if "e2t" in self.flags[ptop]:
|
||||
self._tag_file(cur, entags, wark, abspath)
|
||||
|
||||
cur.connection.commit()
|
||||
|
||||
def _hasher(self):
|
||||
while True:
|
||||
ptop, rd, fn = self.hashq.get()
|
||||
if "e2d" not in self.flags[ptop]:
|
||||
continue
|
||||
|
||||
abspath = os.path.join(ptop, rd, fn)
|
||||
self.log("hashing " + abspath)
|
||||
inf = os.stat(fsenc(abspath))
|
||||
hashes = self._hashlist_from_file(abspath)
|
||||
wark = up2k_wark_from_hashlist(self.salt, inf.st_size, hashes)
|
||||
with self.mutex:
|
||||
self.idx_wark(ptop, wark, rd, fn, inf.st_mtime, inf.st_size)
|
||||
|
||||
def hash_file(self, ptop, flags, rd, fn):
|
||||
self.register_vpath(ptop, flags)
|
||||
self.hashq.put([ptop, rd, fn])
|
||||
|
||||
|
||||
def up2k_chunksize(filesize):
|
||||
chunksize = 1024 * 1024
|
||||
|
|
Loading…
Reference in a new issue