From 9fdc5ee7486fb604703c7a88bf4747384d3ae2e6 Mon Sep 17 00:00:00 2001 From: ed Date: Thu, 25 Feb 2021 22:30:40 +0100 Subject: [PATCH] use one sqlite3 cursor, closes #1 --- copyparty/u2idx.py | 16 +++---- copyparty/up2k.py | 106 ++++++++++++++++++++++----------------------- 2 files changed, 60 insertions(+), 62 deletions(-) diff --git a/copyparty/u2idx.py b/copyparty/u2idx.py index fdbb1e52..76e046cc 100644 --- a/copyparty/u2idx.py +++ b/copyparty/u2idx.py @@ -24,7 +24,7 @@ class U2idx(object): self.log("could not load sqlite3; searchign wqill be disabled") return - self.dbs = {} + self.cur = {} def log(self, msg): self.log_func("u2idx", msg) @@ -73,16 +73,16 @@ class U2idx(object): ret = [] lim = 100 for (vtop, ptop, flags) in vols: - db = self.dbs.get(ptop) - if not db: - db = _open(ptop) - if not db: + cur = self.cur.get(ptop) + if not cur: + cur = _open(ptop) + if not cur: continue - self.dbs[ptop] = db + self.cur[ptop] = cur # self.log("idx /{} @ {} {}".format(vtop, ptop, flags)) - c = db.execute(qstr, qv) + c = cur.execute(qstr, qv) for _, ts, sz, rd, fn in c: lim -= 1 if lim <= 0: @@ -97,7 +97,7 @@ class U2idx(object): def _open(ptop): db_path = os.path.join(ptop, ".hist", "up2k.db") if os.path.exists(db_path): - return sqlite3.connect(db_path) + return sqlite3.connect(db_path).cursor() def _conv_sz(q, body, k, sql): diff --git a/copyparty/up2k.py b/copyparty/up2k.py index c2ce4225..90eea6e9 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -55,14 +55,13 @@ class Up2k(object): # state self.mutex = threading.Lock() self.registry = {} - self.db = {} + self.cur = {} - self.mem_db = None + self.mem_cur = None if HAVE_SQLITE3: # mojibake detector - self.mem_db = sqlite3.connect(":memory:", check_same_thread=False) - self.mem_db.execute(r"create table a (b text)") - self.mem_db.commit() + self.mem_cur = sqlite3.connect(":memory:", check_same_thread=False).cursor() + self.mem_cur.execute(r"create table a (b text)") if WINDOWS: # usually fails to set lastmod too quickly @@ -87,9 +86,9 @@ class Up2k(object): def w8enc(self, rd, fn): ret = [] - for k, v in [["d", rd], ["f", fn]]: + for v in [rd, fn]: try: - self.mem_db.execute("select * from a where b = ?", (v,)) + self.mem_cur.execute("select * from a where b = ?", (v,)) ret.append(v) except: ret.append("//" + w8b64enc(v)) @@ -149,14 +148,13 @@ class Up2k(object): pass db_path = os.path.join(ptop, ".hist", "up2k.db") - if ptop in self.db: - # self.db[ptop].close() + if ptop in self.cur: return None try: - db = self._open_db(db_path) - self.db[ptop] = db - return db + cur = self._open_db(db_path) + self.cur[ptop] = cur + return cur except Exception as ex: self.log("cannot use database at [{}]: {}".format(ptop, repr(ex))) @@ -167,28 +165,26 @@ class Up2k(object): self.pp = ProgressPrinter() t0 = time.time() for top in tops: - db = self.register_vpath(top) - if not db: + dbw = [self.register_vpath(top), 0, time.time()] + if not dbw[0]: continue - self.pp.n = next(db.execute("select count(w) from up"))[0] + self.pp.n = next(dbw[0].execute("select count(w) from up"))[0] db_path = os.path.join(top, ".hist", "up2k.db") sz0 = os.path.getsize(db_path) // 1024 # can be symlink so don't `and d.startswith(top)`` excl = set([d for d in tops if d != top]) - dbw = [db, 0, time.time()] - n_add = self._build_dir(dbw, top, excl, top) - n_rm = self._drop_lost(db, top) + n_rm = self._drop_lost(dbw[0], top) if dbw[1]: self.log("commit {} new files".format(dbw[1])) - db.commit() + dbw[0].connection.commit() if n_add or n_rm: db_path = os.path.join(top, ".hist", "up2k.db") sz1 = os.path.getsize(db_path) // 1024 - db.execute("vacuum") + dbw[0].execute("vacuum") sz2 = os.path.getsize(db_path) // 1024 msg = "{} new, {} del, {} kB vacced, {} kB gain, {} kB now".format( n_add, n_rm, sz1 - sz2, sz2 - sz0, sz2 @@ -270,16 +266,16 @@ class Up2k(object): td = time.time() - dbw[2] if dbw[1] >= 4096 or td >= 60: self.log("commit {} new files".format(dbw[1])) - dbw[0].commit() + dbw[0].connection.commit() dbw[1] = 0 dbw[2] = time.time() return ret - def _drop_lost(self, db, top): + def _drop_lost(self, cur, top): rm = [] nchecked = 0 - nfiles = next(db.execute("select count(w) from up"))[0] - c = db.execute("select * from up") + nfiles = next(cur.execute("select count(w) from up"))[0] + c = cur.execute("select * from up") for dwark, dts, dsz, drd, dfn in c: nchecked += 1 if drd.startswith("//") or dfn.startswith("//"): @@ -298,25 +294,25 @@ class Up2k(object): self.log("forgetting {} deleted files".format(len(rm))) for rd, fn in rm: # self.log("{} / {}".format(rd, fn)) - self.db_rm(db, rd, fn) + self.db_rm(cur, rd, fn) return len(rm) def _open_db(self, db_path): existed = os.path.exists(db_path) - conn = sqlite3.connect(db_path, check_same_thread=False) + cur = sqlite3.connect(db_path, check_same_thread=False).cursor() try: - ver = self._read_ver(conn) + ver = self._read_ver(cur) if ver == 1: - conn = self._upgrade_v1(conn, db_path) - ver = self._read_ver(conn) + cur = self._upgrade_v1(cur, db_path) + ver = self._read_ver(cur) if ver == 2: try: - nfiles = next(conn.execute("select count(w) from up"))[0] + nfiles = next(cur.execute("select count(w) from up"))[0] self.log("found DB at {} |{}|".format(db_path, nfiles)) - return conn + return cur except Exception as ex: self.log("WARN: could not list files, DB corrupt?\n " + repr(ex)) @@ -325,22 +321,24 @@ class Up2k(object): elif not existed: raise Exception("whatever") + conn = cur.connection + cur.close() conn.close() os.unlink(db_path) - conn = sqlite3.connect(db_path, check_same_thread=False) + cur = sqlite3.connect(db_path, check_same_thread=False).cursor() except: pass # sqlite is variable-width only, no point in using char/nchar/varchar - self._create_v2(conn) - conn.commit() + self._create_v2(cur) + cur.connection.commit() self.log("created DB at {}".format(db_path)) - return conn + return cur - def _read_ver(self, conn): + def _read_ver(self, cur): for tab in ["ki", "kv"]: try: - c = conn.execute(r"select v from {} where k = 'sver'".format(tab)) + c = cur.execute(r"select v from {} where k = 'sver'".format(tab)) except: continue @@ -348,7 +346,7 @@ class Up2k(object): if rows: return int(rows[0][0]) - def _create_v2(self, conn): + def _create_v2(self, cur): for cmd in [ r"create table ks (k text, v text)", r"create table ki (k text, v int)", @@ -358,7 +356,7 @@ class Up2k(object): r"create index up_rd on up(rd)", r"create index up_fn on up(fn)", ]: - conn.execute(cmd) + cur.execute(cmd) def _upgrade_v1(self, odb, db_path): self.log("\033[33mupgrading v1 to v2:\033[0m {}".format(db_path)) @@ -367,7 +365,7 @@ class Up2k(object): if os.path.exists(npath): os.unlink(npath) - ndb = sqlite3.connect(npath, check_same_thread=False) + ndb = sqlite3.connect(npath, check_same_thread=False).cursor() self._create_v2(ndb) c = odb.execute("select * from up") @@ -376,14 +374,14 @@ class Up2k(object): v = (wark, ts, sz, rd, fn) ndb.execute("insert into up values (?,?,?,?,?)", v) - ndb.commit() - ndb.close() - odb.close() + ndb.connection.commit() + ndb.connection.close() + odb.connection.close() bpath = db_path + ".bak.v1" self.log("success; backup at: " + bpath) atomic_move(db_path, bpath) atomic_move(npath, db_path) - return sqlite3.connect(db_path, check_same_thread=False) + return sqlite3.connect(db_path, check_same_thread=False).cursor() def handle_json(self, cj): self.register_vpath(cj["ptop"]) @@ -393,10 +391,10 @@ class Up2k(object): now = time.time() job = None with self.mutex: - db = self.db.get(cj["ptop"], None) + cur = self.cur.get(cj["ptop"], None) reg = self.registry[cj["ptop"]] - if db: - cur = db.execute(r"select * from up where w = ?", (wark,)) + if cur: + cur = cur.execute(r"select * from up where w = ?", (wark,)) for _, dtime, dsize, dp_dir, dp_fn in cur: if dp_dir.startswith("//") or dp_fn.startswith("//"): dp_dir, dp_fn = self.w8dec(dp_dir, dp_fn) @@ -584,12 +582,13 @@ class Up2k(object): if WINDOWS: self.lastmod_q.put([dst, (int(time.time()), int(job["lmod"]))]) - db = self.db.get(job["ptop"], None) - if db: + cur = self.cur.get(job["ptop"], None) + if cur: j = job - self.db_rm(db, j["prel"], j["name"]) - self.db_add(db, j["wark"], j["prel"], j["name"], j["lmod"], j["size"]) - db.commit() + self.db_rm(cur, j["prel"], j["name"]) + self.db_add(cur, j["wark"], j["prel"], j["name"], j["lmod"], j["size"]) + cur.connection.commit() + del self.registry[ptop][wark] # in-memory registry is reserved for unfinished uploads @@ -635,10 +634,9 @@ class Up2k(object): fsz = os.path.getsize(path) csz = up2k_chunksize(fsz) ret = [] - last_print = time.time() with open(path, "rb", 512 * 1024) as f: while fsz > 0: - self.pp.msg = msg = "{} MB".format(int(fsz / 1024 / 1024)) + self.pp.msg = "{} MB".format(int(fsz / 1024 / 1024)) hashobj = hashlib.sha512() rem = min(csz, fsz) fsz -= rem