use one sqlite3 cursor, closes #1

This commit is contained in:
ed 2021-02-25 22:30:40 +01:00
parent 00ff133387
commit 9fdc5ee748
2 changed files with 60 additions and 62 deletions

View file

@ -24,7 +24,7 @@ class U2idx(object):
self.log("could not load sqlite3; searchign wqill be disabled")
return
self.dbs = {}
self.cur = {}
def log(self, msg):
self.log_func("u2idx", msg)
@ -73,16 +73,16 @@ class U2idx(object):
ret = []
lim = 100
for (vtop, ptop, flags) in vols:
db = self.dbs.get(ptop)
if not db:
db = _open(ptop)
if not db:
cur = self.cur.get(ptop)
if not cur:
cur = _open(ptop)
if not cur:
continue
self.dbs[ptop] = db
self.cur[ptop] = cur
# self.log("idx /{} @ {} {}".format(vtop, ptop, flags))
c = db.execute(qstr, qv)
c = cur.execute(qstr, qv)
for _, ts, sz, rd, fn in c:
lim -= 1
if lim <= 0:
@ -97,7 +97,7 @@ class U2idx(object):
def _open(ptop):
db_path = os.path.join(ptop, ".hist", "up2k.db")
if os.path.exists(db_path):
return sqlite3.connect(db_path)
return sqlite3.connect(db_path).cursor()
def _conv_sz(q, body, k, sql):

View file

@ -55,14 +55,13 @@ class Up2k(object):
# state
self.mutex = threading.Lock()
self.registry = {}
self.db = {}
self.cur = {}
self.mem_db = None
self.mem_cur = None
if HAVE_SQLITE3:
# mojibake detector
self.mem_db = sqlite3.connect(":memory:", check_same_thread=False)
self.mem_db.execute(r"create table a (b text)")
self.mem_db.commit()
self.mem_cur = sqlite3.connect(":memory:", check_same_thread=False).cursor()
self.mem_cur.execute(r"create table a (b text)")
if WINDOWS:
# usually fails to set lastmod too quickly
@ -87,9 +86,9 @@ class Up2k(object):
def w8enc(self, rd, fn):
ret = []
for k, v in [["d", rd], ["f", fn]]:
for v in [rd, fn]:
try:
self.mem_db.execute("select * from a where b = ?", (v,))
self.mem_cur.execute("select * from a where b = ?", (v,))
ret.append(v)
except:
ret.append("//" + w8b64enc(v))
@ -149,14 +148,13 @@ class Up2k(object):
pass
db_path = os.path.join(ptop, ".hist", "up2k.db")
if ptop in self.db:
# self.db[ptop].close()
if ptop in self.cur:
return None
try:
db = self._open_db(db_path)
self.db[ptop] = db
return db
cur = self._open_db(db_path)
self.cur[ptop] = cur
return cur
except Exception as ex:
self.log("cannot use database at [{}]: {}".format(ptop, repr(ex)))
@ -167,28 +165,26 @@ class Up2k(object):
self.pp = ProgressPrinter()
t0 = time.time()
for top in tops:
db = self.register_vpath(top)
if not db:
dbw = [self.register_vpath(top), 0, time.time()]
if not dbw[0]:
continue
self.pp.n = next(db.execute("select count(w) from up"))[0]
self.pp.n = next(dbw[0].execute("select count(w) from up"))[0]
db_path = os.path.join(top, ".hist", "up2k.db")
sz0 = os.path.getsize(db_path) // 1024
# can be symlink so don't `and d.startswith(top)``
excl = set([d for d in tops if d != top])
dbw = [db, 0, time.time()]
n_add = self._build_dir(dbw, top, excl, top)
n_rm = self._drop_lost(db, top)
n_rm = self._drop_lost(dbw[0], top)
if dbw[1]:
self.log("commit {} new files".format(dbw[1]))
db.commit()
dbw[0].connection.commit()
if n_add or n_rm:
db_path = os.path.join(top, ".hist", "up2k.db")
sz1 = os.path.getsize(db_path) // 1024
db.execute("vacuum")
dbw[0].execute("vacuum")
sz2 = os.path.getsize(db_path) // 1024
msg = "{} new, {} del, {} kB vacced, {} kB gain, {} kB now".format(
n_add, n_rm, sz1 - sz2, sz2 - sz0, sz2
@ -270,16 +266,16 @@ class Up2k(object):
td = time.time() - dbw[2]
if dbw[1] >= 4096 or td >= 60:
self.log("commit {} new files".format(dbw[1]))
dbw[0].commit()
dbw[0].connection.commit()
dbw[1] = 0
dbw[2] = time.time()
return ret
def _drop_lost(self, db, top):
def _drop_lost(self, cur, top):
rm = []
nchecked = 0
nfiles = next(db.execute("select count(w) from up"))[0]
c = db.execute("select * from up")
nfiles = next(cur.execute("select count(w) from up"))[0]
c = cur.execute("select * from up")
for dwark, dts, dsz, drd, dfn in c:
nchecked += 1
if drd.startswith("//") or dfn.startswith("//"):
@ -298,25 +294,25 @@ class Up2k(object):
self.log("forgetting {} deleted files".format(len(rm)))
for rd, fn in rm:
# self.log("{} / {}".format(rd, fn))
self.db_rm(db, rd, fn)
self.db_rm(cur, rd, fn)
return len(rm)
def _open_db(self, db_path):
existed = os.path.exists(db_path)
conn = sqlite3.connect(db_path, check_same_thread=False)
cur = sqlite3.connect(db_path, check_same_thread=False).cursor()
try:
ver = self._read_ver(conn)
ver = self._read_ver(cur)
if ver == 1:
conn = self._upgrade_v1(conn, db_path)
ver = self._read_ver(conn)
cur = self._upgrade_v1(cur, db_path)
ver = self._read_ver(cur)
if ver == 2:
try:
nfiles = next(conn.execute("select count(w) from up"))[0]
nfiles = next(cur.execute("select count(w) from up"))[0]
self.log("found DB at {} |{}|".format(db_path, nfiles))
return conn
return cur
except Exception as ex:
self.log("WARN: could not list files, DB corrupt?\n " + repr(ex))
@ -325,22 +321,24 @@ class Up2k(object):
elif not existed:
raise Exception("whatever")
conn = cur.connection
cur.close()
conn.close()
os.unlink(db_path)
conn = sqlite3.connect(db_path, check_same_thread=False)
cur = sqlite3.connect(db_path, check_same_thread=False).cursor()
except:
pass
# sqlite is variable-width only, no point in using char/nchar/varchar
self._create_v2(conn)
conn.commit()
self._create_v2(cur)
cur.connection.commit()
self.log("created DB at {}".format(db_path))
return conn
return cur
def _read_ver(self, conn):
def _read_ver(self, cur):
for tab in ["ki", "kv"]:
try:
c = conn.execute(r"select v from {} where k = 'sver'".format(tab))
c = cur.execute(r"select v from {} where k = 'sver'".format(tab))
except:
continue
@ -348,7 +346,7 @@ class Up2k(object):
if rows:
return int(rows[0][0])
def _create_v2(self, conn):
def _create_v2(self, cur):
for cmd in [
r"create table ks (k text, v text)",
r"create table ki (k text, v int)",
@ -358,7 +356,7 @@ class Up2k(object):
r"create index up_rd on up(rd)",
r"create index up_fn on up(fn)",
]:
conn.execute(cmd)
cur.execute(cmd)
def _upgrade_v1(self, odb, db_path):
self.log("\033[33mupgrading v1 to v2:\033[0m {}".format(db_path))
@ -367,7 +365,7 @@ class Up2k(object):
if os.path.exists(npath):
os.unlink(npath)
ndb = sqlite3.connect(npath, check_same_thread=False)
ndb = sqlite3.connect(npath, check_same_thread=False).cursor()
self._create_v2(ndb)
c = odb.execute("select * from up")
@ -376,14 +374,14 @@ class Up2k(object):
v = (wark, ts, sz, rd, fn)
ndb.execute("insert into up values (?,?,?,?,?)", v)
ndb.commit()
ndb.close()
odb.close()
ndb.connection.commit()
ndb.connection.close()
odb.connection.close()
bpath = db_path + ".bak.v1"
self.log("success; backup at: " + bpath)
atomic_move(db_path, bpath)
atomic_move(npath, db_path)
return sqlite3.connect(db_path, check_same_thread=False)
return sqlite3.connect(db_path, check_same_thread=False).cursor()
def handle_json(self, cj):
self.register_vpath(cj["ptop"])
@ -393,10 +391,10 @@ class Up2k(object):
now = time.time()
job = None
with self.mutex:
db = self.db.get(cj["ptop"], None)
cur = self.cur.get(cj["ptop"], None)
reg = self.registry[cj["ptop"]]
if db:
cur = db.execute(r"select * from up where w = ?", (wark,))
if cur:
cur = cur.execute(r"select * from up where w = ?", (wark,))
for _, dtime, dsize, dp_dir, dp_fn in cur:
if dp_dir.startswith("//") or dp_fn.startswith("//"):
dp_dir, dp_fn = self.w8dec(dp_dir, dp_fn)
@ -584,12 +582,13 @@ class Up2k(object):
if WINDOWS:
self.lastmod_q.put([dst, (int(time.time()), int(job["lmod"]))])
db = self.db.get(job["ptop"], None)
if db:
cur = self.cur.get(job["ptop"], None)
if cur:
j = job
self.db_rm(db, j["prel"], j["name"])
self.db_add(db, j["wark"], j["prel"], j["name"], j["lmod"], j["size"])
db.commit()
self.db_rm(cur, j["prel"], j["name"])
self.db_add(cur, j["wark"], j["prel"], j["name"], j["lmod"], j["size"])
cur.connection.commit()
del self.registry[ptop][wark]
# in-memory registry is reserved for unfinished uploads
@ -635,10 +634,9 @@ class Up2k(object):
fsz = os.path.getsize(path)
csz = up2k_chunksize(fsz)
ret = []
last_print = time.time()
with open(path, "rb", 512 * 1024) as f:
while fsz > 0:
self.pp.msg = msg = "{} MB".format(int(fsz / 1024 / 1024))
self.pp.msg = "{} MB".format(int(fsz / 1024 / 1024))
hashobj = hashlib.sha512()
rem = min(csz, fsz)
fsz -= rem