diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index b9a2c164..13ea0f68 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -345,6 +345,10 @@ class HttpCli(object): with open(path, "wb", 512 * 1024) as f: post_sz, _, sha_b64 = hashcopy(self.conn, reader, f) + self.conn.hsrv.broker.put( + False, "up2k.hash_file", vfs.realpath, vfs.flags, rem, fn + ) + return post_sz, sha_b64, remains, path def handle_stash(self): @@ -675,6 +679,9 @@ class HttpCli(object): raise Pebkac(400, "empty files in post") files.append([sz, sha512_hex]) + self.conn.hsrv.broker.put( + False, "up2k.hash_file", vfs.realpath, vfs.flags, rem, fname + ) self.conn.nbyte += sz except Pebkac: @@ -1251,7 +1258,7 @@ class HttpCli(object): "sz": sz, "ext": ext, "dt": dt, - "ts": inf.st_mtime, + "ts": int(inf.st_mtime), } if is_dir: dirs.append(item) diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 54024aad..7aad942b 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -51,13 +51,14 @@ class Up2k(object): self.broker = broker self.args = broker.args self.log_func = broker.log - self.persist = self.args.e2d # config self.salt = broker.args.salt # state self.mutex = threading.Lock() + self.hashq = Queue() + self.tagq = Queue() self.registry = {} self.entags = {} self.flags = {} @@ -83,18 +84,26 @@ class Up2k(object): # static self.r_hash = re.compile("^[0-9a-zA-Z_-]{43}$") - if self.persist and not HAVE_SQLITE3: + if not HAVE_SQLITE3: self.log("could not initialize sqlite3, will use in-memory registry only") # this is kinda jank auth = AuthSrv(self.args, self.log, False) - self.init_indexes(auth) + have_e2d = self.init_indexes(auth) - if self.persist: + if have_e2d: thr = threading.Thread(target=self._snapshot) thr.daemon = True thr.start() + thr = threading.Thread(target=self._tagger) + thr.daemon = True + thr.start() + + thr = threading.Thread(target=self._hasher) + thr.daemon = True + thr.start() + def log(self, msg): self.log_func("up2k", msg + "\033[K") @@ -137,6 +146,7 @@ class Up2k(object): self.pp = ProgressPrinter() vols = auth.vfs.all_vols.values() t0 = time.time() + have_e2d = False live_vols = [] for vol in vols: @@ -157,6 +167,9 @@ class Up2k(object): self.entags[vol.realpath] = en + if "e2d" in vol.flags: + have_e2d = True + if "e2ds" in vol.flags: r = self._build_file_index(vol, vols) if not r: @@ -185,6 +198,8 @@ class Up2k(object): msg = "\033[31mcould not read tags because no backends are available (mutagen or ffprobe)\033[0m" self.log(msg) + return have_e2d + def register_vpath(self, ptop, flags): with self.mutex: if ptop in self.registry: @@ -192,7 +207,7 @@ class Up2k(object): reg = {} path = os.path.join(ptop, ".hist", "up2k.snap") - if self.persist and os.path.exists(path): + if "e2d" in flags and os.path.exists(path): with gzip.GzipFile(path, "rb") as f: j = f.read().decode("utf-8") @@ -206,7 +221,7 @@ class Up2k(object): self.flags[ptop] = flags self.registry[ptop] = reg - if not self.persist or not HAVE_SQLITE3 or "d2d" in flags: + if not HAVE_SQLITE3 or "e2d" not in flags or "d2d" in flags: return None try: @@ -286,6 +301,7 @@ class Up2k(object): self.log("stat: {} @ [{}]".format(repr(ex), abspath)) continue + lmod = int(inf.st_mtime) if stat.S_ISDIR(inf.st_mode): if abspath in excl or abspath == histdir: continue @@ -311,11 +327,11 @@ class Up2k(object): self.log(m.format(top, rp, len(in_db), rep_db)) dts = -1 - if dts == inf.st_mtime and dsz == inf.st_size: + if dts == lmod and dsz == inf.st_size: continue m = "reindex [{}] => [{}] ({}/{}) ({}/{})".format( - top, rp, dts, inf.st_mtime, dsz, inf.st_size + top, rp, dts, lmod, dsz, inf.st_size ) self.log(m) self.db_rm(dbw[0], rd, fn) @@ -334,7 +350,7 @@ class Up2k(object): continue wark = up2k_wark_from_hashlist(self.salt, inf.st_size, hashes) - self.db_add(dbw[0], wark, rd, fn, inf.st_mtime, inf.st_size) + self.db_add(dbw[0], wark, rd, fn, lmod, inf.st_size) dbw[1] += 1 ret += 1 td = time.time() - dbw[2] @@ -425,17 +441,9 @@ class Up2k(object): abspath = os.path.join(ptop, rd, fn) self.pp.msg = "c{} {}".format(n_left, abspath) - tags = self.mtag.get(abspath) - tags = {k: v for k, v in tags.items() if k in entags} - if not tags: - # indicate scanned without tags - tags = {"x": 0} - - for k, v in tags.items(): - q = "insert into mt values (?,?,?)" - c2.execute(q, (w[:16], k, v)) - n_add += 1 - n_buf += 1 + n_tags = self._tag_file(c2, entags, w, abspath) + n_add += n_tags + n_buf += n_tags td = time.time() - last_write if n_buf >= 4096 or td >= 60: @@ -448,6 +456,21 @@ class Up2k(object): return n_add, n_rm, True + def _tag_file(self, write_cur, entags, wark, abspath): + tags = self.mtag.get(abspath) + tags = {k: v for k, v in tags.items() if k in entags} + if not tags: + # indicate scanned without tags + tags = {"x": 0} + + ret = 0 + for k, v in tags.items(): + q = "insert into mt values (?,?,?)" + write_cur.execute(q, (wark[:16], k, v)) + ret += 1 + + return ret + def _orz(self, db_path): return sqlite3.connect(db_path, check_same_thread=False).cursor() @@ -779,17 +802,33 @@ class Up2k(object): if WINDOWS: self.lastmod_q.put([dst, (int(time.time()), int(job["lmod"]))]) - cur = self.cur.get(job["ptop"], None) - if cur: - j = job - self.db_rm(cur, j["prel"], j["name"]) - self.db_add(cur, j["wark"], j["prel"], j["name"], j["lmod"], j["size"]) - cur.connection.commit() - + # legit api sware 2 me mum + if self.idx_wark( + job["ptop"], + job["wark"], + job["prel"], + job["name"], + job["lmod"], + job["size"], + ): del self.registry[ptop][wark] # in-memory registry is reserved for unfinished uploads - return ret, dst + return ret, dst + + def idx_wark(self, ptop, wark, rd, fn, lmod, sz): + cur = self.cur.get(ptop, None) + if not cur: + return False + + self.db_rm(cur, rd, fn) + self.db_add(cur, wark, rd, fn, int(lmod), sz) + cur.connection.commit() + + if "e2t" in self.flags[ptop]: + self.tagq.put([ptop, wark, rd, fn]) + + return True def db_rm(self, db, rd, fn): sql = "delete from up where rd = ? and fn = ?" @@ -940,6 +979,45 @@ class Up2k(object): self.log("snap: {} |{}|".format(path, len(reg.keys()))) prev[k] = etag + def _tagger(self): + while True: + ptop, wark, rd, fn = self.tagq.get() + abspath = os.path.join(ptop, rd, fn) + self.log("tagging " + abspath) + with self.mutex: + cur = self.cur[ptop] + if not cur: + self.log("\033[31mno cursor to write tags with??") + continue + + entags = self.entags[ptop] + if not entags: + self.log("\033[33mno entags okay.jpg") + continue + + if "e2t" in self.flags[ptop]: + self._tag_file(cur, entags, wark, abspath) + + cur.connection.commit() + + def _hasher(self): + while True: + ptop, rd, fn = self.hashq.get() + if "e2d" not in self.flags[ptop]: + continue + + abspath = os.path.join(ptop, rd, fn) + self.log("hashing " + abspath) + inf = os.stat(fsenc(abspath)) + hashes = self._hashlist_from_file(abspath) + wark = up2k_wark_from_hashlist(self.salt, inf.st_size, hashes) + with self.mutex: + self.idx_wark(ptop, wark, rd, fn, inf.st_mtime, inf.st_size) + + def hash_file(self, ptop, flags, rd, fn): + self.register_vpath(ptop, flags) + self.hashq.put([ptop, rd, fn]) + def up2k_chunksize(filesize): chunksize = 1024 * 1024