From ed3cf72b826a9df5b0486cfe6a16739d78da0773 Mon Sep 17 00:00:00 2001 From: ScreenTinker Date: Sat, 27 Jun 2026 19:01:08 -0500 Subject: [PATCH] feat(#142): event-loop lag telemetry (perf_hooks) + bounded storage Continuously samples event-loop delay via perf_hooks.monitorEventLoopDelay() (C++-backed histogram; cheap). Each window persists mean/p50/p99/max to a new event_loop_lag table and recomputes a coarse load band (normal/elevated/critical) from the window p99. Standalone value: current lag is exposed on /api/status and band changes are logged, so site lag is diagnosable independent of throttling. The band feeds the #142 reconnect throttle (next commit) but ships first as its own subsystem. - event_loop_lag is bounded from day one: indexed on sampled_at + scheduled prune (LAG_TELEMETRY_RETENTION_DAYS, small default) modeled on the play_logs prune. Deliberately NOT another unbounded-growth table. - Band transitions are asymmetric: jump up immediately (tighten fast), release one level at a time after N calm samples below a deadband (release slow, no flap). Pure nextBand() function, unit-tested deterministically. - config: LAG_SAMPLE_INTERVAL_MS, LAG_RESOLUTION_MS, LAG_TELEMETRY_RETENTION_DAYS, LAG_PRUNE_INTERVAL_MS, LAG_ELEVATED_MS, LAG_CRITICAL_MS, LAG_RELEASE_SAMPLES. - tests: band-transition unit tests; integration proves sampling persists, stays bounded under the prune, and surfaces on /api/status. Co-Authored-By: Claude Opus 4.8 (1M context) --- server/config.js | 18 ++++ server/db/database.js | 4 + server/db/schema.sql | 16 ++++ server/routes/status.js | 4 + server/server.js | 4 + server/services/loop-lag.js | 107 +++++++++++++++++++++++ server/test/loop-lag-integration.test.js | 64 ++++++++++++++ server/test/loop-lag.test.js | 57 ++++++++++++ 8 files changed, 274 insertions(+) create mode 100644 server/services/loop-lag.js create mode 100644 server/test/loop-lag-integration.test.js create mode 100644 server/test/loop-lag.test.js diff --git a/server/config.js b/server/config.js index 83b895f..e73e107 100644 --- a/server/config.js +++ b/server/config.js @@ -90,4 +90,22 @@ module.exports = { // on MSP-style deployments where an admin/operator assigns users to existing // orgs after signup instead. autoCreateOrgOnSignup: !['false', '0'].includes(String(process.env.AUTO_CREATE_ORG_ON_SIGNUP || '').toLowerCase()), + + // #142 event-loop lag telemetry (services/loop-lag.js). perf_hooks + // monitorEventLoopDelay is C++-backed, so continuous sampling is cheap. Each + // window's p99 is persisted to event_loop_lag (bounded: indexed + pruned from + // day one) and drives the banded load level the reconnect throttle reads. + lagSampleIntervalMs: parseInt(process.env.LAG_SAMPLE_INTERVAL_MS) || 1000, + lagResolutionMs: parseInt(process.env.LAG_RESOLUTION_MS) || 20, + lagTelemetryRetentionDays: parseFloat(process.env.LAG_TELEMETRY_RETENTION_DAYS) || 3, + lagPruneIntervalMs: parseInt(process.env.LAG_PRUNE_INTERVAL_MS) || 3600000, + // Banded load levels from the window p99 (ms). Asymmetric by design: a band is + // entered immediately when its up-threshold is crossed (tighten fast), but + // released only one step at a time after lagReleaseSamples consecutive samples + // fall below a deadband (release slow), so small fluctuations don't flap it. + // Bands ONLY scale how hard an already-flagged device is throttled; a healthy + // device is never gated by global lag. + lagElevatedMs: parseInt(process.env.LAG_ELEVATED_MS) || 100, + lagCriticalMs: parseInt(process.env.LAG_CRITICAL_MS) || 250, + lagReleaseSamples: parseInt(process.env.LAG_RELEASE_SAMPLES) || 5, }; diff --git a/server/db/database.js b/server/db/database.js index dae8574..5d5c72f 100644 --- a/server/db/database.js +++ b/server/db/database.js @@ -230,6 +230,10 @@ const migrations = [ // Both the dashboard uptime query and the retention prune were full scans — the // dashboard-degradation cause once the table reached 1M+ rows. "CREATE INDEX IF NOT EXISTS idx_device_status_log_device_ts ON device_status_log(device_id, timestamp)", + // #142: event-loop lag telemetry table (bounded: indexed + scheduled prune). + // schema.sql creates these on fresh installs; this covers existing DBs. + "CREATE TABLE IF NOT EXISTS event_loop_lag (id INTEGER PRIMARY KEY AUTOINCREMENT, sampled_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), mean_ms REAL NOT NULL, p50_ms REAL NOT NULL, p99_ms REAL NOT NULL, max_ms REAL NOT NULL, band TEXT NOT NULL DEFAULT 'normal')", + "CREATE INDEX IF NOT EXISTS idx_event_loop_lag_sampled ON event_loop_lag(sampled_at)", ]; // Apply each ALTER idempotently. A "duplicate column name" / "already exists" // error means the column is already present (expected on a migrated DB) - benign. diff --git a/server/db/schema.sql b/server/db/schema.sql index e4e67e0..3bf26fa 100644 --- a/server/db/schema.sql +++ b/server/db/schema.sql @@ -469,6 +469,22 @@ CREATE TABLE IF NOT EXISTS device_status_log ( -- was the dashboard-degradation cause in the outage report. CREATE INDEX IF NOT EXISTS idx_device_status_log_device_ts ON device_status_log(device_id, timestamp); +-- ===================== EVENT LOOP LAG (#142) ===================== +-- Event-loop delay telemetry from perf_hooks.monitorEventLoopDelay(). Bounded +-- from day one: indexed on sampled_at and pruned on a schedule (see +-- services/loop-lag.js, LAG_TELEMETRY_RETENTION_DAYS) so it can never become a +-- second unbounded-growth table. +CREATE TABLE IF NOT EXISTS event_loop_lag ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + sampled_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), + mean_ms REAL NOT NULL, + p50_ms REAL NOT NULL, + p99_ms REAL NOT NULL, + max_ms REAL NOT NULL, + band TEXT NOT NULL DEFAULT 'normal' +); +CREATE INDEX IF NOT EXISTS idx_event_loop_lag_sampled ON event_loop_lag(sampled_at); + -- ===================== DEVICE FINGERPRINTS ===================== CREATE TABLE IF NOT EXISTS device_fingerprints ( diff --git a/server/routes/status.js b/server/routes/status.js index aa5470e..e2c62ea 100644 --- a/server/routes/status.js +++ b/server/routes/status.js @@ -7,6 +7,7 @@ const fs = require('fs'); const config = require('../config'); const VERSION = require('../version'); const { PLATFORM_ROLES } = require('../middleware/auth'); +const loopLag = require('../services/loop-lag'); // Public status page router.get('/', (req, res) => { @@ -24,6 +25,9 @@ router.get('/', (req, res) => { version, uptime_human: formatUptime(uptime), timestamp: new Date().toISOString(), + // #142: current event-loop lag snapshot, so site lag is diagnosable from the + // health endpoint independent of any throttling. Cheap (in-memory read). + loop_lag: loopLag.getLag(), }); }); diff --git a/server/server.js b/server/server.js index 687cd54..2798804 100644 --- a/server/server.js +++ b/server/server.js @@ -625,6 +625,10 @@ app.set('io', io); const { startHeartbeatChecker } = require('./services/heartbeat'); startHeartbeatChecker(io); +// #142: start event-loop lag sampling (feeds /api/status + the reconnect throttle) +const { startLoopLagMonitor } = require('./services/loop-lag'); +startLoopLagMonitor(); + // Start command-queue sweep (prunes expired entries for offline devices) const commandQueue = require('./lib/command-queue'); commandQueue.startSweep(); diff --git a/server/services/loop-lag.js b/server/services/loop-lag.js new file mode 100644 index 0000000..b7f45f8 --- /dev/null +++ b/server/services/loop-lag.js @@ -0,0 +1,107 @@ +// #142 — Event-loop lag telemetry (the data subsystem; ships before the throttle). +// +// Continuously samples event-loop delay via perf_hooks.monitorEventLoopDelay() +// (a C++-backed histogram — cheap). Each window we read mean/p50/p99/max, persist +// a row to the bounded `event_loop_lag` table, and recompute a coarse load BAND +// (normal | elevated | critical) from the window p99. +// +// The band is consumed by the reconnect throttle (#142 step 3), but this module +// has standalone value: getLag() is surfaced on /api/status and band changes are +// logged, so site connectivity/lag is diagnosable independent of any throttling. +// +// Band transitions are deliberately asymmetric (see nextBand): jump UP immediately +// when an up-threshold is crossed (tighten fast), step DOWN only one level at a +// time after lagReleaseSamples consecutive calm samples below a deadband (release +// slow). This avoids band flap from transient blips. + +const { monitorEventLoopDelay } = require('perf_hooks'); +const { db } = require('../db/database'); +const config = require('../config'); + +const NS_PER_MS = 1e6; +// A band releases only once p99 falls below this fraction of the band's entry +// threshold — the deadband that stops small fluctuations from flapping the band. +const DEADBAND = 0.5; +const LEVEL = { normal: 0, elevated: 1, critical: 2 }; + +let histogram = null; +let band = 'normal'; +let calmSamples = 0; +let current = { mean_ms: 0, p50_ms: 0, p99_ms: 0, max_ms: 0, band: 'normal', sampled_at: 0 }; + +// Pure band-transition function (exported for deterministic unit tests). Given the +// current band, the window p99 (ms), and the running calm-sample count, returns the +// next [band, calmSamples]. Up is immediate (may skip a level); down is one step +// per release window, gated by a deadband. +function nextBand(cur, p99, calm) { + const level = LEVEL[cur] ?? 0; + // UP — immediate, tighten fast (normal can jump straight to critical). + if (p99 >= config.lagCriticalMs && level < LEVEL.critical) return ['critical', 0]; + if (p99 >= config.lagElevatedMs && level < LEVEL.elevated) return ['elevated', 0]; + // DOWN — slow, one step, only below the current band's deadband. + if (level === LEVEL.critical && p99 <= config.lagCriticalMs * DEADBAND) { + const c = calm + 1; + return c >= config.lagReleaseSamples ? ['elevated', 0] : ['critical', c]; + } + if (level === LEVEL.elevated && p99 <= config.lagElevatedMs * DEADBAND) { + const c = calm + 1; + return c >= config.lagReleaseSamples ? ['normal', 0] : ['elevated', c]; + } + // Hold (inside deadband, or already normal): reset the calm counter. + return [cur, 0]; +} + +const round2 = (x) => Math.round(x * 100) / 100; + +function sample() { + const p99 = histogram.percentile(99) / NS_PER_MS; + const snap = { + mean_ms: round2(histogram.mean / NS_PER_MS), + p50_ms: round2(histogram.percentile(50) / NS_PER_MS), + p99_ms: round2(p99), + max_ms: round2(histogram.max / NS_PER_MS), + }; + histogram.reset(); + + const prev = band; + [band, calmSamples] = nextBand(band, snap.p99_ms, calmSamples); + current = { ...snap, band, sampled_at: Math.floor(Date.now() / 1000) }; + + try { + db.prepare( + 'INSERT INTO event_loop_lag (sampled_at, mean_ms, p50_ms, p99_ms, max_ms, band) VALUES (?, ?, ?, ?, ?, ?)' + ).run(current.sampled_at, snap.mean_ms, snap.p50_ms, snap.p99_ms, snap.max_ms, band); + } catch (_) { /* table may not exist on a partially-migrated DB */ } + + // Observable: log whenever we're loaded or when the band changes (incl. back to + // normal). Healthy steady state stays quiet. + if (band !== 'normal' || prev !== 'normal') { + const tag = band !== prev ? ` (was ${prev})` : ''; + console.log(`[loop-lag] band=${band}${tag} mean=${snap.mean_ms}ms p99=${snap.p99_ms}ms max=${snap.max_ms}ms`); + } +} + +function pruneLag() { + try { + const cutoff = Math.floor(Date.now() / 1000) - Math.round(config.lagTelemetryRetentionDays * 86400); + const n = db.prepare('DELETE FROM event_loop_lag WHERE sampled_at < ?').run(cutoff).changes; + if (n > 0) console.log(`[loop-lag] pruned ${n} sample(s) older than ${config.lagTelemetryRetentionDays}d`); + } catch (_) { /* ignore */ } +} + +function startLoopLagMonitor() { + if (histogram) return; // idempotent + histogram = monitorEventLoopDelay({ resolution: config.lagResolutionMs }); + histogram.enable(); + const t1 = setInterval(sample, config.lagSampleIntervalMs); + pruneLag(); // sweep stale rows on boot + const t2 = setInterval(pruneLag, config.lagPruneIntervalMs); + // Don't keep the process alive on these timers (matters for tests / clean exit). + if (t1.unref) t1.unref(); + if (t2.unref) t2.unref(); +} + +function getBand() { return band; } +function getLag() { return { ...current }; } + +module.exports = { startLoopLagMonitor, getBand, getLag, nextBand }; diff --git a/server/test/loop-lag-integration.test.js b/server/test/loop-lag-integration.test.js new file mode 100644 index 0000000..098755e --- /dev/null +++ b/server/test/loop-lag-integration.test.js @@ -0,0 +1,64 @@ +'use strict'; + +// #142 step 2 — integration: the lag monitor samples, persists to a BOUNDED table, +// and surfaces current lag on /api/status. Boots the real server with fast sampling +// and a tiny (fractional-day) retention so the prune is observable within the test. + +const { test, before, after } = require('node:test'); +const assert = require('node:assert/strict'); +const { spawn } = require('node:child_process'); +const path = require('node:path'); +const os = require('node:os'); +const fs = require('node:fs'); +const crypto = require('node:crypto'); +const Database = require('better-sqlite3'); + +const PORT = 3979; +const BASE = `http://127.0.0.1:${PORT}`; +const DATA_DIR = path.join(os.tmpdir(), 'st-lag-int-' + crypto.randomBytes(4).toString('hex')); +const LOG = path.join(os.tmpdir(), 'st-lag-int-' + crypto.randomBytes(4).toString('hex') + '.log'); +let proc; + +before(async () => { + const logFd = fs.openSync(LOG, 'w'); + proc = spawn('node', ['server.js'], { + cwd: path.join(__dirname, '..'), + env: { + ...process.env, DATA_DIR, SELF_HOSTED: 'true', PORT: String(PORT), NODE_ENV: 'test', + LAG_SAMPLE_INTERVAL_MS: '200', // sample fast + LAG_TELEMETRY_RETENTION_DAYS: '0.00001', // ~0.86s retention + LAG_PRUNE_INTERVAL_MS: '400', // prune often + }, + stdio: ['ignore', logFd, logFd], + }); + let up = false; + for (let i = 0; i < 80; i++) { + try { const r = await fetch(BASE + '/api/status'); if (r.ok) { up = true; break; } } catch { /* not yet */ } + await new Promise(r => setTimeout(r, 250)); + } + if (!up) throw new Error('server did not boot:\n' + fs.readFileSync(LOG, 'utf8').slice(-2000)); +}); + +after(() => { try { proc.kill('SIGKILL'); } catch { /* */ } }); + +test('/api/status exposes a current loop_lag snapshot', async () => { + const r = await fetch(BASE + '/api/status'); + const body = await r.json(); + assert.ok(body.loop_lag, 'loop_lag present on /api/status'); + assert.ok(['normal', 'elevated', 'critical'].includes(body.loop_lag.band), 'band is a valid level'); + assert.equal(typeof body.loop_lag.p99_ms, 'number', 'p99_ms is numeric'); + assert.equal(typeof body.loop_lag.mean_ms, 'number', 'mean_ms is numeric'); +}); + +test('lag samples are persisted AND bounded by retention prune (not unbounded)', async () => { + // Let it sample for ~3s. At 200ms/sample that is ~15 inserts, but with ~0.86s + // retention pruned every 400ms the table must stay small — proving the table + // can never become a second unbounded-growth table. + await new Promise(r => setTimeout(r, 3000)); + const dbPath = path.join(DATA_DIR, 'db', 'remote_display.db'); + const db = new Database(dbPath, { readonly: true }); + const count = db.prepare('SELECT COUNT(*) c FROM event_loop_lag').get().c; + db.close(); + assert.ok(count >= 1, 'lag samples are being persisted'); + assert.ok(count < 15, `table is bounded by the prune (held ${count} rows over ~3s of 200ms sampling)`); +}); diff --git a/server/test/loop-lag.test.js b/server/test/loop-lag.test.js new file mode 100644 index 0000000..0b3049c --- /dev/null +++ b/server/test/loop-lag.test.js @@ -0,0 +1,57 @@ +'use strict'; + +// #142 step 2 — deterministic unit tests for the event-loop-lag band transitions. +// Pure function, no sockets/timing. Isolate the DB to a temp dir BEFORE requiring +// the module (requiring it pulls in db/database, which initialises a DB on load). + +const os = require('node:os'); +const path = require('node:path'); +const crypto = require('node:crypto'); +process.env.DATA_DIR = path.join(os.tmpdir(), 'st-lag-unit-' + crypto.randomBytes(4).toString('hex')); + +const { test } = require('node:test'); +const assert = require('node:assert/strict'); +const { nextBand } = require('../services/loop-lag'); + +// config defaults exercised here: elevated=100ms, critical=250ms, releaseSamples=5, +// deadband=0.5 -> release-below thresholds: elevated@50ms, critical@125ms. + +test('UP is immediate and can skip a level (tighten fast)', () => { + assert.deepEqual(nextBand('normal', 50, 0), ['normal', 0], 'below elevated stays normal'); + assert.deepEqual(nextBand('normal', 100, 0), ['elevated', 0], 'crossing elevated up-threshold jumps immediately'); + assert.deepEqual(nextBand('normal', 250, 0), ['critical', 0], 'a big spike jumps normal->critical in one sample'); + assert.deepEqual(nextBand('elevated', 250, 0), ['critical', 0]); +}); + +test('deadband holds the band for small fluctuations (no flap)', () => { + // elevated, p99 between release(50) and up(100) -> hold elevated, calm reset + assert.deepEqual(nextBand('elevated', 80, 3), ['elevated', 0]); + // critical, p99 between release(125) and up(250) -> hold critical + assert.deepEqual(nextBand('critical', 200, 4), ['critical', 0]); +}); + +test('DOWN is slow: requires lagReleaseSamples calm samples below the deadband', () => { + // elevated -> normal only after 5 consecutive calm samples + let band = 'elevated', calm = 0; + for (let i = 0; i < 4; i++) { + [band, calm] = nextBand(band, 20, calm); + assert.equal(band, 'elevated', `still elevated after ${i + 1} calm sample(s)`); + } + [band, calm] = nextBand(band, 20, calm); // 5th + assert.deepEqual([band, calm], ['normal', 0], 'drops to normal on the 5th calm sample'); +}); + +test('DOWN releases one level at a time: critical -> elevated -> normal', () => { + let band = 'critical', calm = 0; + for (let i = 0; i < 5; i++) [band, calm] = nextBand(band, 10, calm); + assert.equal(band, 'elevated', 'critical releases to elevated, never straight to normal'); + for (let i = 0; i < 5; i++) [band, calm] = nextBand(band, 10, calm); + assert.equal(band, 'normal', 'then elevated releases to normal'); +}); + +test('a single calm sample does not release (calm counter resets on a non-calm sample)', () => { + let [band, calm] = nextBand('elevated', 20, 0); // calm=1 + assert.deepEqual([band, calm], ['elevated', 1]); + [band, calm] = nextBand(band, 80, calm); // back inside deadband -> reset + assert.deepEqual([band, calm], ['elevated', 0], 'one blip resets the release counter'); +});