diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 37bae8f7..6c0257df 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -12,10 +12,13 @@ import re import os import sys import time +import signal import shutil import filecmp import locale import argparse +import threading +import traceback from textwrap import dedent from .__init__ import E, WINDOWS, VT100, PY2 @@ -164,6 +167,16 @@ def configure_ssl_ciphers(al): sys.exit(0) +def sighandler(signal=None, frame=None): + msg = [""] * 5 + for th in threading.enumerate(): + msg.append(str(th)) + msg.extend(traceback.format_stack(sys._current_frames()[th.ident])) + + msg.append("\n") + print("\n".join(msg)) + + def main(): time.strptime("19970815", "%Y%m%d") # python#7980 if WINDOWS: @@ -307,6 +320,8 @@ def main(): + " (if you crash with codec errors then that is why)" ) + # signal.signal(signal.SIGINT, sighandler) + SvcHub(al).run() diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 95af2cb2..af1a5781 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -64,7 +64,7 @@ class Up2k(object): self.flags = {} self.cur = {} self.mtag = None - self.n_mtag_tags_added = -1 + self.pending_tags = None self.mem_cur = None self.sqlite_ver = None @@ -462,14 +462,12 @@ class Up2k(object): abspath = os.path.join(ptop, rd, fn) self.pp.msg = "c{} {}".format(n_left, abspath) - args = [c3, entags, w, abspath] + args = [entags, w, abspath] if not mpool: - n_tags = self._tag_file(*args) + n_tags = self._tag_file(c3, *args) else: mpool.put(["mtag"] + args) - with self.mutex: - n_tags = self.n_mtag_tags_added - self.n_mtag_tags_added = 0 + n_tags = self._flush_mpool(c3) n_add += n_tags n_buf += n_tags @@ -481,15 +479,24 @@ class Up2k(object): last_write = time.time() n_buf = 0 - self._stop_mpool(mpool) + self._stop_mpool(mpool, c3) c3.close() c2.close() return n_add, n_rm, True + def _flush_mpool(self, wcur): + with self.mutex: + ret = 0 + for x in self.pending_tags: + self._tag_file(wcur, *x) + ret += 1 + + self.pending_tags = [] + return ret + def _run_all_mtp(self): - self.n_mtag_tags_added = 0 for ptop, flags in self.flags.items(): if "mtp" in flags: self._run_one_mtp(ptop) @@ -519,6 +526,7 @@ class Up2k(object): batch_sz = mpool.maxsize * 3 t_prev = time.time() n_prev = n_left + n_done = 0 while True: with self.mutex: q = "select w from mt where k = 't:mtp' limit ?" @@ -546,7 +554,9 @@ class Up2k(object): task_parsers = { k: v for k, v in parsers.items() if k in force or k not in have } - jobs.append([task_parsers, wcur, None, w, abspath]) + jobs.append([task_parsers, None, w, abspath]) + + n_done += self._flush_mpool(wcur) if not warks: break @@ -567,7 +577,7 @@ class Up2k(object): msg = "mtp: {} done, {} left, eta {}h {:02d}m" with self.mutex: - msg = msg.format(self.n_mtag_tags_added, n_left, int(h), int(m)) + msg = msg.format(n_done, n_left, int(h), int(m)) self.log(msg, c=6) for j in jobs: @@ -577,12 +587,12 @@ class Up2k(object): with self.mutex: cur.connection.commit() - self._stop_mpool(mpool) + self._stop_mpool(mpool, wcur) with self.mutex: cur.connection.commit() - if self.n_mtag_tags_added: - self.vac(cur, db_path, self.n_mtag_tags_added, 0, sz0) - + if n_done: + self.vac(cur, db_path, n_done, 0, sz0) + wcur.close() cur.close() @@ -592,9 +602,9 @@ class Up2k(object): # mp.pool.ThreadPool and concurrent.futures.ThreadPoolExecutor # both do crazy runahead so lets reinvent another wheel nw = os.cpu_count() if hasattr(os, "cpu_count") else 4 - if self.n_mtag_tags_added == -1: + if self.pending_tags is None: self.log("using {}x {}".format(nw, self.mtag.backend)) - self.n_mtag_tags_added = 0 + self.pending_tags = [] mpool = Queue(nw) for _ in range(nw): @@ -604,7 +614,7 @@ class Up2k(object): return mpool - def _stop_mpool(self, mpool): + def _stop_mpool(self, mpool, wcur): if not mpool: return @@ -612,6 +622,7 @@ class Up2k(object): mpool.put(None) mpool.join() + self._flush_mpool(wcur) def _tag_thr(self, q): while True: @@ -621,7 +632,7 @@ class Up2k(object): return try: - parser, write_cur, entags, wark, abspath = task + parser, entags, wark, abspath = task if parser == "mtag": tags = self.mtag.get(abspath) else: @@ -632,8 +643,7 @@ class Up2k(object): self.log("{}\033[0m [{}]".format(" ".join(vtags), abspath)) with self.mutex: - n = self._tag_file(write_cur, entags, wark, abspath, tags) - self.n_mtag_tags_added += n + self.pending_tags.append([entags, wark, abspath, tags]) except: ex = traceback.format_exc() if parser == "mtag": @@ -664,6 +674,7 @@ class Up2k(object): def _orz(self, db_path): return sqlite3.connect(db_path, check_same_thread=False).cursor() + # x.set_trace_callback(trace) def _open_db(self, db_path): existed = os.path.exists(db_path) diff --git a/copyparty/util.py b/copyparty/util.py index 5795a7d1..31a38a0c 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -10,6 +10,7 @@ import select import struct import hashlib import platform +import traceback import threading import mimetypes import contextlib @@ -148,6 +149,31 @@ def nuprint(msg): uprint("{}\n".format(msg)) +def rice_tid(): + tid = threading.current_thread().ident + c = struct.unpack(b"B" * 5, struct.pack(b">Q", tid)[-5:]) + return "".join("\033[1;37;48;5;{}m{:02x}".format(x, x) for x in c) + "\033[0m" + + +def trace(*args, **kwargs): + t = time.time() + stack = "".join( + "\033[36m{}\033[33m{}".format(x[0].split(os.sep)[-1][:-3], x[1]) + for x in traceback.extract_stack()[3:-1] + ) + parts = ["{:.6f}".format(t), rice_tid(), stack] + + if args: + parts.append(repr(args)) + + if kwargs: + parts.append(repr(kwargs)) + + msg = "\033[0m ".join(parts) + # _tracebuf.append(msg) + nuprint(msg) + + @contextlib.contextmanager def ren_open(fname, *args, **kwargs): fdir = kwargs.pop("fdir", None)