no racing pls

This commit is contained in:
ed 2021-03-19 20:42:33 +01:00
parent 5d6c61a861
commit 6599c3eced
3 changed files with 72 additions and 20 deletions

View file

@ -12,10 +12,13 @@ import re
import os import os
import sys import sys
import time import time
import signal
import shutil import shutil
import filecmp import filecmp
import locale import locale
import argparse import argparse
import threading
import traceback
from textwrap import dedent from textwrap import dedent
from .__init__ import E, WINDOWS, VT100, PY2 from .__init__ import E, WINDOWS, VT100, PY2
@ -164,6 +167,16 @@ def configure_ssl_ciphers(al):
sys.exit(0) sys.exit(0)
def sighandler(signal=None, frame=None):
msg = [""] * 5
for th in threading.enumerate():
msg.append(str(th))
msg.extend(traceback.format_stack(sys._current_frames()[th.ident]))
msg.append("\n")
print("\n".join(msg))
def main(): def main():
time.strptime("19970815", "%Y%m%d") # python#7980 time.strptime("19970815", "%Y%m%d") # python#7980
if WINDOWS: if WINDOWS:
@ -307,6 +320,8 @@ def main():
+ " (if you crash with codec errors then that is why)" + " (if you crash with codec errors then that is why)"
) )
# signal.signal(signal.SIGINT, sighandler)
SvcHub(al).run() SvcHub(al).run()

View file

@ -64,7 +64,7 @@ class Up2k(object):
self.flags = {} self.flags = {}
self.cur = {} self.cur = {}
self.mtag = None self.mtag = None
self.n_mtag_tags_added = -1 self.pending_tags = None
self.mem_cur = None self.mem_cur = None
self.sqlite_ver = None self.sqlite_ver = None
@ -462,14 +462,12 @@ class Up2k(object):
abspath = os.path.join(ptop, rd, fn) abspath = os.path.join(ptop, rd, fn)
self.pp.msg = "c{} {}".format(n_left, abspath) self.pp.msg = "c{} {}".format(n_left, abspath)
args = [c3, entags, w, abspath] args = [entags, w, abspath]
if not mpool: if not mpool:
n_tags = self._tag_file(*args) n_tags = self._tag_file(c3, *args)
else: else:
mpool.put(["mtag"] + args) mpool.put(["mtag"] + args)
with self.mutex: n_tags = self._flush_mpool(c3)
n_tags = self.n_mtag_tags_added
self.n_mtag_tags_added = 0
n_add += n_tags n_add += n_tags
n_buf += n_tags n_buf += n_tags
@ -481,15 +479,24 @@ class Up2k(object):
last_write = time.time() last_write = time.time()
n_buf = 0 n_buf = 0
self._stop_mpool(mpool) self._stop_mpool(mpool, c3)
c3.close() c3.close()
c2.close() c2.close()
return n_add, n_rm, True return n_add, n_rm, True
def _flush_mpool(self, wcur):
with self.mutex:
ret = 0
for x in self.pending_tags:
self._tag_file(wcur, *x)
ret += 1
self.pending_tags = []
return ret
def _run_all_mtp(self): def _run_all_mtp(self):
self.n_mtag_tags_added = 0
for ptop, flags in self.flags.items(): for ptop, flags in self.flags.items():
if "mtp" in flags: if "mtp" in flags:
self._run_one_mtp(ptop) self._run_one_mtp(ptop)
@ -519,6 +526,7 @@ class Up2k(object):
batch_sz = mpool.maxsize * 3 batch_sz = mpool.maxsize * 3
t_prev = time.time() t_prev = time.time()
n_prev = n_left n_prev = n_left
n_done = 0
while True: while True:
with self.mutex: with self.mutex:
q = "select w from mt where k = 't:mtp' limit ?" q = "select w from mt where k = 't:mtp' limit ?"
@ -546,7 +554,9 @@ class Up2k(object):
task_parsers = { task_parsers = {
k: v for k, v in parsers.items() if k in force or k not in have k: v for k, v in parsers.items() if k in force or k not in have
} }
jobs.append([task_parsers, wcur, None, w, abspath]) jobs.append([task_parsers, None, w, abspath])
n_done += self._flush_mpool(wcur)
if not warks: if not warks:
break break
@ -567,7 +577,7 @@ class Up2k(object):
msg = "mtp: {} done, {} left, eta {}h {:02d}m" msg = "mtp: {} done, {} left, eta {}h {:02d}m"
with self.mutex: with self.mutex:
msg = msg.format(self.n_mtag_tags_added, n_left, int(h), int(m)) msg = msg.format(n_done, n_left, int(h), int(m))
self.log(msg, c=6) self.log(msg, c=6)
for j in jobs: for j in jobs:
@ -577,11 +587,11 @@ class Up2k(object):
with self.mutex: with self.mutex:
cur.connection.commit() cur.connection.commit()
self._stop_mpool(mpool) self._stop_mpool(mpool, wcur)
with self.mutex: with self.mutex:
cur.connection.commit() cur.connection.commit()
if self.n_mtag_tags_added: if n_done:
self.vac(cur, db_path, self.n_mtag_tags_added, 0, sz0) self.vac(cur, db_path, n_done, 0, sz0)
wcur.close() wcur.close()
cur.close() cur.close()
@ -592,9 +602,9 @@ class Up2k(object):
# mp.pool.ThreadPool and concurrent.futures.ThreadPoolExecutor # mp.pool.ThreadPool and concurrent.futures.ThreadPoolExecutor
# both do crazy runahead so lets reinvent another wheel # both do crazy runahead so lets reinvent another wheel
nw = os.cpu_count() if hasattr(os, "cpu_count") else 4 nw = os.cpu_count() if hasattr(os, "cpu_count") else 4
if self.n_mtag_tags_added == -1: if self.pending_tags is None:
self.log("using {}x {}".format(nw, self.mtag.backend)) self.log("using {}x {}".format(nw, self.mtag.backend))
self.n_mtag_tags_added = 0 self.pending_tags = []
mpool = Queue(nw) mpool = Queue(nw)
for _ in range(nw): for _ in range(nw):
@ -604,7 +614,7 @@ class Up2k(object):
return mpool return mpool
def _stop_mpool(self, mpool): def _stop_mpool(self, mpool, wcur):
if not mpool: if not mpool:
return return
@ -612,6 +622,7 @@ class Up2k(object):
mpool.put(None) mpool.put(None)
mpool.join() mpool.join()
self._flush_mpool(wcur)
def _tag_thr(self, q): def _tag_thr(self, q):
while True: while True:
@ -621,7 +632,7 @@ class Up2k(object):
return return
try: try:
parser, write_cur, entags, wark, abspath = task parser, entags, wark, abspath = task
if parser == "mtag": if parser == "mtag":
tags = self.mtag.get(abspath) tags = self.mtag.get(abspath)
else: else:
@ -632,8 +643,7 @@ class Up2k(object):
self.log("{}\033[0m [{}]".format(" ".join(vtags), abspath)) self.log("{}\033[0m [{}]".format(" ".join(vtags), abspath))
with self.mutex: with self.mutex:
n = self._tag_file(write_cur, entags, wark, abspath, tags) self.pending_tags.append([entags, wark, abspath, tags])
self.n_mtag_tags_added += n
except: except:
ex = traceback.format_exc() ex = traceback.format_exc()
if parser == "mtag": if parser == "mtag":
@ -664,6 +674,7 @@ class Up2k(object):
def _orz(self, db_path): def _orz(self, db_path):
return sqlite3.connect(db_path, check_same_thread=False).cursor() return sqlite3.connect(db_path, check_same_thread=False).cursor()
# x.set_trace_callback(trace)
def _open_db(self, db_path): def _open_db(self, db_path):
existed = os.path.exists(db_path) existed = os.path.exists(db_path)

View file

@ -10,6 +10,7 @@ import select
import struct import struct
import hashlib import hashlib
import platform import platform
import traceback
import threading import threading
import mimetypes import mimetypes
import contextlib import contextlib
@ -148,6 +149,31 @@ def nuprint(msg):
uprint("{}\n".format(msg)) uprint("{}\n".format(msg))
def rice_tid():
tid = threading.current_thread().ident
c = struct.unpack(b"B" * 5, struct.pack(b">Q", tid)[-5:])
return "".join("\033[1;37;48;5;{}m{:02x}".format(x, x) for x in c) + "\033[0m"
def trace(*args, **kwargs):
t = time.time()
stack = "".join(
"\033[36m{}\033[33m{}".format(x[0].split(os.sep)[-1][:-3], x[1])
for x in traceback.extract_stack()[3:-1]
)
parts = ["{:.6f}".format(t), rice_tid(), stack]
if args:
parts.append(repr(args))
if kwargs:
parts.append(repr(kwargs))
msg = "\033[0m ".join(parts)
# _tracebuf.append(msg)
nuprint(msg)
@contextlib.contextmanager @contextlib.contextmanager
def ren_open(fname, *args, **kwargs): def ren_open(fname, *args, **kwargs):
fdir = kwargs.pop("fdir", None) fdir = kwargs.pop("fdir", None)