diff --git a/copyparty/u2idx.py b/copyparty/u2idx.py index a0b71d17..5eb13adc 100644 --- a/copyparty/u2idx.py +++ b/copyparty/u2idx.py @@ -53,12 +53,17 @@ class U2idx(object): self.log("your python does not have sqlite3; searching will be disabled") return + if self.args.srch_icase: + self._open_db = self._open_db_icase + else: + self._open_db = self._open_db_std + assert sqlite3 # type: ignore # !rm self.active_id = "" self.active_cur: Optional["sqlite3.Cursor"] = None self.cur: dict[str, "sqlite3.Cursor"] = {} - self.mem_cur = self._connect_db(":memory:", check_same_thread=False).cursor() + self.mem_cur = sqlite3.connect(":memory:", check_same_thread=False).cursor() self.mem_cur.execute(r"create table a (b text)") self.sh_cur: Optional["sqlite3.Cursor"] = None @@ -69,12 +74,13 @@ class U2idx(object): def log(self, msg: str, c: Union[int, str] = 0) -> None: self.log_func("u2idx", msg, c) - def _connect_db(self, *args, **kwargs): - """Wrapper for sqlite3.connect() that allows us to perform additional setup and extension of the database""" - assert sqlite3 # type: ignore # !rm - db = sqlite3.connect(*args, **kwargs) + def _open_db_std(self, *args, **kwargs): + assert sqlite3 # type: ignore # !rm + kwargs["check_same_thread"] = False + return sqlite3.connect(*args, **kwargs) - # leverage python's casefold() string method for search terms + def _open_db_icase(self, *args, **kwargs): + db = self._open_db_std(*args, **kwargs) db.create_function("casefold", 1, lambda x: x.casefold() if x else x) return db @@ -126,7 +132,7 @@ class U2idx(object): assert sqlite3 # type: ignore # !rm - db = self._connect_db(self.args.shr_db, timeout=2, check_same_thread=False) + db = sqlite3.connect(self.args.shr_db, timeout=2, check_same_thread=False) cur = db.cursor() cur.execute('pragma table_info("sh")').fetchall() self.sh_cur = cur @@ -157,8 +163,7 @@ class U2idx(object): uri = "" try: uri = "{}?mode=ro&nolock=1".format(Path(db_path).as_uri()) - db = self._connect_db(uri, timeout=2, uri=True, check_same_thread=False) - cur = db.cursor() + cur = self._open_db(uri, timeout=2, uri=True).cursor() cur.execute('pragma table_info("up")').fetchone() self.log("ro: %r" % (db_path,)) except: @@ -169,7 +174,7 @@ class U2idx(object): if not cur: # on windows, this steals the write-lock from up2k.deferred_init -- # seen on win 10.0.17763.2686, py 3.10.4, sqlite 3.37.2 - cur = self._connect_db(db_path, timeout=2, check_same_thread=False).cursor() + cur = self._open_db(db_path, timeout=2).cursor() self.log("opened %r" % (db_path,)) self.cur[ptop] = cur @@ -182,6 +187,8 @@ class U2idx(object): if not HAVE_SQLITE3: return [], [], False + icase = self.args.srch_icase + q = "" v: Union[str, int] = "" va: list[Union[str, int]] = [] @@ -241,9 +248,13 @@ class U2idx(object): elif v == "path": v = "trim(?||up.rd,'/')" va.append("\nrd") + if icase: + v = "casefold(%s)" % (v,) elif v == "name": - v = "casefold(up.fn)" + v = "up.fn" + if icase: + v = "casefold(%s)" % (v,) elif v == "tags" or ptn_mt.match(v): have_mt = True @@ -294,9 +305,11 @@ class U2idx(object): tail = "||'%'" v = v[:-1] - if "casefold(up.fn)" in q: - # casefold the search term as well - v = unicode(v).casefold() + if icase and "casefold(" in q: + try: + v = unicode(v).casefold() + except: + v = unicode(v).lower() q += " {}?{} ".format(head, tail) va.append(v) @@ -313,14 +326,14 @@ class U2idx(object): continue va.pop() - va.append(zs.casefold()) + va.append(zs.lower()) q = q[: m.start()] field, oper = m.groups() if oper in ["=", "=="]: q += " {} like ? ) ".format(field) else: - q += " casefold({}) {} ? ) ".format(field, oper) + q += " lower({}) {} ? ) ".format(field, oper) try: return self.run_query(uname, vols, q, va, have_mt, True, lim)