From 6acf4365734e8d2dd61e1b40209fd793836b1b97 Mon Sep 17 00:00:00 2001 From: ed Date: Thu, 20 Apr 2023 20:36:13 +0000 Subject: [PATCH] u2idx pool instead of per-socket; prevents running out of FDs thanks to thousands of sqlite3 sessions and neatly sidesteps what could possibly be a race in python's sqlite3 bindings where it sometimes forgets to close the fd --- copyparty/httpcli.py | 9 +++++---- copyparty/httpconn.py | 11 ++++++++--- copyparty/httpsrv.py | 37 ++++++++++++++++++++++++++++++++++++- copyparty/u2idx.py | 17 +++++++++-------- 4 files changed, 58 insertions(+), 16 deletions(-) diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 8f817a1f..2e05f818 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -1754,7 +1754,7 @@ class HttpCli(object): def handle_search(self, body: dict[str, Any]) -> bool: idx = self.conn.get_u2idx() - if not hasattr(idx, "p_end"): + if not idx or not hasattr(idx, "p_end"): raise Pebkac(500, "sqlite3 is not available on the server; cannot search") vols = [] @@ -3079,7 +3079,7 @@ class HttpCli(object): raise Pebkac(403, "the unpost feature is disabled in server config") idx = self.conn.get_u2idx() - if not hasattr(idx, "p_end"): + if not idx or not hasattr(idx, "p_end"): raise Pebkac(500, "sqlite3 is not available on the server; cannot unpost") filt = self.uparam.get("filter") @@ -3298,9 +3298,10 @@ class HttpCli(object): is_dir = stat.S_ISDIR(st.st_mode) icur = None - if e2t or (e2d and is_dir): + if is_dir and (e2t or e2d): idx = self.conn.get_u2idx() - icur = idx.get_cur(dbv.realpath) + if idx and hasattr(idx, "p_end"): + icur = idx.get_cur(dbv.realpath) if self.can_read: th_fmt = self.uparam.get("th") diff --git a/copyparty/httpconn.py b/copyparty/httpconn.py index 023b7674..708029ea 100644 --- a/copyparty/httpconn.py +++ b/copyparty/httpconn.py @@ -103,11 +103,12 @@ class HttpConn(object): def log(self, msg: str, c: Union[int, str] = 0) -> None: self.log_func(self.log_src, msg, c) - def get_u2idx(self) -> U2idx: - # one u2idx per tcp connection; + def get_u2idx(self) -> Optional[U2idx]: + # grab from a pool of u2idx instances; # sqlite3 fully parallelizes under python threads + # but avoid running out of FDs by creating too many if not self.u2idx: - self.u2idx = U2idx(self) + self.u2idx = self.hsrv.get_u2idx(str(self.addr)) return self.u2idx @@ -215,3 +216,7 @@ class HttpConn(object): self.cli = HttpCli(self) if not self.cli.run(): return + + if self.u2idx: + self.hsrv.put_u2idx(str(self.addr), self.u2idx) + self.u2idx = None diff --git a/copyparty/httpsrv.py b/copyparty/httpsrv.py index 08b55432..2e541d9b 100644 --- a/copyparty/httpsrv.py +++ b/copyparty/httpsrv.py @@ -11,7 +11,7 @@ import time import queue -from .__init__ import ANYWIN, EXE, MACOS, TYPE_CHECKING, EnvParams +from .__init__ import ANYWIN, CORES, EXE, MACOS, TYPE_CHECKING, EnvParams try: MNFE = ModuleNotFoundError @@ -40,6 +40,7 @@ except MNFE: from .bos import bos from .httpconn import HttpConn +from .u2idx import U2idx from .util import ( E_SCK, FHC, @@ -111,6 +112,9 @@ class HttpSrv(object): self.cb_ts = 0.0 self.cb_v = "" + self.u2idx_free: dict[str, U2idx] = {} + self.u2idx_n = 0 + env = jinja2.Environment() env.loader = jinja2.FileSystemLoader(os.path.join(self.E.mod, "web")) jn = ["splash", "svcs", "browser", "browser2", "msg", "md", "mde", "cf"] @@ -445,6 +449,9 @@ class HttpSrv(object): self.clients.remove(cli) self.ncli -= 1 + if cli.u2idx: + self.put_u2idx(str(addr), cli.u2idx) + def cachebuster(self) -> str: if time.time() - self.cb_ts < 1: return self.cb_v @@ -466,3 +473,31 @@ class HttpSrv(object): self.cb_v = v.decode("ascii")[-4:] self.cb_ts = time.time() return self.cb_v + + def get_u2idx(self, ident: str) -> Optional[U2idx]: + utab = self.u2idx_free + for _ in range(100): # 5/0.05 = 5sec + with self.mutex: + if utab: + if ident in utab: + return utab.pop(ident) + + return utab.pop(list(utab.keys())[0]) + + if self.u2idx_n < CORES: + self.u2idx_n += 1 + return U2idx(self) + + time.sleep(0.05) + # not using conditional waits, on a hunch that + # average performance will be faster like this + # since most servers won't be fully saturated + + return None + + def put_u2idx(self, ident: str, u2idx: U2idx) -> None: + with self.mutex: + while ident in self.u2idx_free: + ident += "a" + + self.u2idx_free[ident] = u2idx diff --git a/copyparty/u2idx.py b/copyparty/u2idx.py index d0ad50f0..62d9fa37 100644 --- a/copyparty/u2idx.py +++ b/copyparty/u2idx.py @@ -34,14 +34,14 @@ if True: # pylint: disable=using-constant-test from typing import Any, Optional, Union if TYPE_CHECKING: - from .httpconn import HttpConn + from .httpsrv import HttpSrv class U2idx(object): - def __init__(self, conn: "HttpConn") -> None: - self.log_func = conn.log_func - self.asrv = conn.asrv - self.args = conn.args + def __init__(self, hsrv: "HttpSrv") -> None: + self.log_func = hsrv.log + self.asrv = hsrv.asrv + self.args = hsrv.args self.timeout = self.args.srch_time if not HAVE_SQLITE3: @@ -51,7 +51,7 @@ class U2idx(object): self.active_id = "" self.active_cur: Optional["sqlite3.Cursor"] = None self.cur: dict[str, "sqlite3.Cursor"] = {} - self.mem_cur = sqlite3.connect(":memory:").cursor() + self.mem_cur = sqlite3.connect(":memory:", check_same_thread=False).cursor() self.mem_cur.execute(r"create table a (b text)") self.p_end = 0.0 @@ -101,7 +101,8 @@ class U2idx(object): uri = "" try: uri = "{}?mode=ro&nolock=1".format(Path(db_path).as_uri()) - cur = sqlite3.connect(uri, 2, uri=True).cursor() + db = sqlite3.connect(uri, 2, uri=True, check_same_thread=False) + cur = db.cursor() cur.execute('pragma table_info("up")').fetchone() self.log("ro: {}".format(db_path)) except: @@ -112,7 +113,7 @@ class U2idx(object): if not cur: # on windows, this steals the write-lock from up2k.deferred_init -- # seen on win 10.0.17763.2686, py 3.10.4, sqlite 3.37.2 - cur = sqlite3.connect(db_path, 2).cursor() + cur = sqlite3.connect(db_path, 2, check_same_thread=False).cursor() self.log("opened {}".format(db_path)) self.cur[ptop] = cur