From 1ad7a3f3786d33cf3ee0066cf439965f2d99e030 Mon Sep 17 00:00:00 2001 From: ed Date: Tue, 27 Jul 2021 15:48:00 +0000 Subject: [PATCH] await and monitor workers on startup --- copyparty/broker_mp.py | 12 ++++++------ copyparty/broker_thr.py | 1 + copyparty/httpsrv.py | 1 + copyparty/svchub.py | 26 +++++++++++++++++++++++++- copyparty/up2k.py | 17 ++++++++--------- 5 files changed, 41 insertions(+), 16 deletions(-) diff --git a/copyparty/broker_mp.py b/copyparty/broker_mp.py index 2bd18f5d..3dfae2cf 100644 --- a/copyparty/broker_mp.py +++ b/copyparty/broker_mp.py @@ -22,12 +22,9 @@ class BrokerMp(object): self.retpend_mutex = threading.Lock() self.mutex = threading.Lock() - cores = self.args.j - if not cores: - cores = mp.cpu_count() - - self.log("broker", "booting {} subprocesses".format(cores)) - for n in range(1, cores + 1): + self.num_workers = self.args.j or mp.cpu_count() + self.log("broker", "booting {} subprocesses".format(self.num_workers)) + for n in range(1, self.num_workers + 1): q_pend = mp.Queue(1) q_yield = mp.Queue(64) @@ -103,5 +100,8 @@ class BrokerMp(object): for p in self.procs: p.q_pend.put([0, dest, [args[0], len(self.procs)]]) + elif dest == "cb_httpsrv_up": + self.hub.cb_httpsrv_up() + else: raise Exception("what is " + str(dest)) diff --git a/copyparty/broker_thr.py b/copyparty/broker_thr.py index c8d88a95..086dddad 100644 --- a/copyparty/broker_thr.py +++ b/copyparty/broker_thr.py @@ -17,6 +17,7 @@ class BrokerThr(object): self.asrv = hub.asrv self.mutex = threading.Lock() + self.num_workers = 1 # instantiate all services here (TODO: inheritance?) self.httpsrv = HttpSrv(self, None) diff --git a/copyparty/httpsrv.py b/copyparty/httpsrv.py index 280b905e..05348740 100644 --- a/copyparty/httpsrv.py +++ b/copyparty/httpsrv.py @@ -141,6 +141,7 @@ class HttpSrv(object): fno = srv_sck.fileno() msg = "subscribed @ {}:{} f{}".format(ip, port, fno) self.log(self.name, msg) + self.broker.put(False, "cb_httpsrv_up") while not self.stopping: if self.args.log_conn: self.log(self.name, "|%sC-ncli" % ("-" * 1,), c="1;30") diff --git a/copyparty/svchub.py b/copyparty/svchub.py index d361a20b..17d873eb 100644 --- a/copyparty/svchub.py +++ b/copyparty/svchub.py @@ -39,6 +39,7 @@ class SvcHub(object): self.stop_req = False self.stopping = False self.stop_cond = threading.Condition() + self.httpsrv_up = 0 self.ansi_re = re.compile("\033\\[[^m]*m") self.log_mutex = threading.Lock() @@ -86,6 +87,29 @@ class SvcHub(object): self.broker = Broker(self) + def thr_httpsrv_up(self): + time.sleep(5) + failed = self.broker.num_workers - self.httpsrv_up + if not failed: + return + + m = "{}/{} workers failed to start" + m = m.format(failed, self.broker.num_workers) + self.log("root", m, 1) + #self.signal_handler(1,1) + os._exit(1) + + def cb_httpsrv_up(self): + self.httpsrv_up += 1 + if self.httpsrv_up != self.broker.num_workers: + return + + self.up2k.init_vols() + + thr = threading.Thread(target=self.sd_notify, name="sd-notify") + thr.daemon = True + thr.start() + def _logname(self): dt = datetime.utcfromtimestamp(time.time()) fn = self.args.lo @@ -135,7 +159,7 @@ class SvcHub(object): def run(self): self.tcpsrv.run() - thr = threading.Thread(target=self.sd_notify, name="sd-notify") + thr = threading.Thread(target=self.thr_httpsrv_up) thr.daemon = True thr.start() diff --git a/copyparty/up2k.py b/copyparty/up2k.py index 58a424ca..181234c1 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -99,17 +99,16 @@ class Up2k(object): if self.args.no_fastboot: self.deferred_init() - else: - t = threading.Thread( - target=self.deferred_init, name="up2k-deferred-init", args=(0.5,) - ) - t.daemon = True - t.start() - def deferred_init(self, wait=0): - if wait: - time.sleep(wait) + def init_vols(self): + if self.args.no_fastboot: + return + t = threading.Thread(target=self.deferred_init, name="up2k-deferred-init") + t.daemon = True + t.start() + + def deferred_init(self): all_vols = self.asrv.vfs.all_vols have_e2d = self.init_indexes(all_vols)