From 999b7ae919631449c3ff152abffea33db66c17bf Mon Sep 17 00:00:00 2001 From: ed Date: Fri, 9 Dec 2022 19:58:13 +0000 Subject: [PATCH] safer to merge wal on startup instead --- copyparty/httpconn.py | 3 --- copyparty/u2idx.py | 7 ------- copyparty/up2k.py | 48 +++++++++++++++++++++++++++++++++++-------- 3 files changed, 40 insertions(+), 18 deletions(-) diff --git a/copyparty/httpconn.py b/copyparty/httpconn.py index 003897bd..d7a29cb0 100644 --- a/copyparty/httpconn.py +++ b/copyparty/httpconn.py @@ -83,9 +83,6 @@ class HttpConn(object): except: pass - if self.u2idx: - self.u2idx.shutdown() - def set_rproxy(self, ip: Optional[str] = None) -> str: if ip is None: color = 36 diff --git a/copyparty/u2idx.py b/copyparty/u2idx.py index 343950c4..2c357bba 100644 --- a/copyparty/u2idx.py +++ b/copyparty/u2idx.py @@ -60,13 +60,6 @@ class U2idx(object): def log(self, msg: str, c: Union[int, str] = 0) -> None: self.log_func("u2idx", msg, c) - def shutdown(self) -> None: - for v in self.cur.values(): - try: - v.close() - except: - pass - def fsearch( self, vols: list[tuple[str, str, dict[str, Any]]], body: dict[str, Any] ) -> list[dict[str, Any]]: diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 8a3ed288..c7312e12 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -557,6 +557,34 @@ class Up2k(object): if self.stop: return False + for vol in all_vols.values(): + if "nowal" in vol.flags: + continue + + reg = self.register_vpath(vol.realpath, vol.flags) + try: + assert reg + cur, db_path = reg + if bos.path.getsize(db_path + "-wal") < 1024 * 1024 * 5: + continue + except: + continue + + try: + with self.mutex: + cur.execute("pragma wal_checkpoint(truncate)") + try: + cur.execute("commit") # absolutely necessary! for some reason + except: + pass + + cur.connection.commit() # this one maybe not + except Exception as ex: + self.log("checkpoint failed: {}".format(ex), 3) + + if self.stop: + return False + self.pp.end = True msg = "{} volumes in {:.2f} sec" @@ -677,6 +705,7 @@ class Up2k(object): if "nosync" in flags: cur.execute("pragma synchronous=0") + cur.connection.commit() return cur, db_path except: msg = "cannot use database at [{}]:\n{}" @@ -1745,9 +1774,13 @@ class Up2k(object): self._set_tagscan(write_cur, True) return ret + def _trace(self, msg: str) -> None: + self.log("ST: {}".format(msg)) + def _orz(self, db_path: str) -> "sqlite3.Cursor": - return sqlite3.connect(db_path, self.timeout, check_same_thread=False).cursor() - # x.set_trace_callback(trace) + c = sqlite3.connect(db_path, self.timeout, check_same_thread=False).cursor() + # c.connection.set_trace_callback(self._trace) + return c def _open_db(self, db_path: str) -> "sqlite3.Cursor": existed = bos.path.exists(db_path) @@ -3196,6 +3229,7 @@ class Up2k(object): if self.mth: self.mth.stop = True + # in case we're killed early for x in list(self.spools): self._unspool(x) @@ -3206,14 +3240,12 @@ class Up2k(object): t0 = time.time() while self.pp: time.sleep(0.1) - if time.time() - t0 > 2: + if time.time() - t0 >= 1: break - for cur in self.cur.values(): - try: - cur.close() - except: - pass + # if there is time + for x in list(self.spools): + self._unspool(x) def up2k_chunksize(filesize: int) -> int: