don't block non-up2k uploads during indexing;

due to all upload APIs invoking up2k.hash_file to index uploads,
the uploads could block during a rescan for a crazy long time
(past most gateway timeouts); now this is mostly fire-and-forget

"mostly" because this also adds a conditional slowdown to
help the hasher churn through if the queue gets too big

worst case, if the server is restarted before it catches up, this
would rely on filesystem reindexing to eventually index the files
after a restart or on a schedule, meaning uploader info would be
lost on shutdown, but this is usually fine anyways (and this was
also the case until now)
This commit is contained in:
ed 2024-01-08 22:10:16 +00:00
parent 9bc09ce949
commit e8a653ca0c

View file

@ -145,9 +145,12 @@ class Up2k(object):
self.entags: dict[str, set[str]] = {}
self.mtp_parsers: dict[str, dict[str, MParser]] = {}
self.pending_tags: list[tuple[set[str], str, str, dict[str, Any]]] = []
self.hashq: Queue[tuple[str, str, str, str, str, float, str, bool]] = Queue()
self.hashq: Queue[
tuple[str, str, dict[str, Any], str, str, str, float, str, bool]
] = Queue()
self.tagq: Queue[tuple[str, str, str, str, str, float]] = Queue()
self.tag_event = threading.Condition()
self.hashq_mutex = threading.Lock()
self.n_hashq = 0
self.n_tagq = 0
self.mpool_used = False
@ -2351,7 +2354,8 @@ class Up2k(object):
t = "cannot receive uploads right now;\nserver busy with {}.\nPlease wait; the client will retry..."
raise Pebkac(503, t.format(self.blocked or "[unknown]"))
except TypeError:
# py2
if not PY2:
raise
with self.mutex:
self._job_volchk(cj)
@ -3991,16 +3995,16 @@ class Up2k(object):
self.log("tagged {} ({}+{})".format(abspath, ntags1, len(tags) - ntags1))
def _hasher(self) -> None:
with self.mutex:
with self.hashq_mutex:
self.n_hashq += 1
while True:
with self.mutex:
with self.hashq_mutex:
self.n_hashq -= 1
# self.log("hashq {}".format(self.n_hashq))
task = self.hashq.get()
if len(task) != 8:
if len(task) != 9:
raise Exception("invalid hash task")
try:
@ -4009,10 +4013,13 @@ class Up2k(object):
except Exception as ex:
self.log("failed to hash %s: %s" % (task, ex), 1)
def _hash_t(self, task: tuple[str, str, str, str, str, float, str, bool]) -> bool:
ptop, vtop, rd, fn, ip, at, usr, skip_xau = task
def _hash_t(
self, task: tuple[str, str, dict[str, Any], str, str, str, float, str, bool]
) -> bool:
ptop, vtop, flags, rd, fn, ip, at, usr, skip_xau = task
# self.log("hashq {} pop {}/{}/{}".format(self.n_hashq, ptop, rd, fn))
if "e2d" not in self.flags[ptop]:
with self.mutex:
if not self.register_vpath(ptop, flags):
return True
abspath = djoin(ptop, rd, fn)
@ -4064,11 +4071,22 @@ class Up2k(object):
usr: str,
skip_xau: bool = False,
) -> None:
with self.mutex:
self.register_vpath(ptop, flags)
self.hashq.put((ptop, vtop, rd, fn, ip, at, usr, skip_xau))
if "e2d" not in flags:
return
if self.n_hashq > 1024:
t = "%d files in hashq; taking a nap"
self.log(t % (self.n_hashq,), 6)
for _ in range(self.n_hashq // 1024):
time.sleep(0.1)
if self.n_hashq < 1024:
break
zt = (ptop, vtop, flags, rd, fn, ip, at, usr, skip_xau)
with self.hashq_mutex:
self.hashq.put(zt)
self.n_hashq += 1
# self.log("hashq {} push {}/{}/{}".format(self.n_hashq, ptop, rd, fn))
def shutdown(self) -> None:
self.stop = True