feat(#142): event-loop lag telemetry (perf_hooks) + bounded storage

Continuously samples event-loop delay via perf_hooks.monitorEventLoopDelay()
(C++-backed histogram; cheap). Each window persists mean/p50/p99/max to a new
event_loop_lag table and recomputes a coarse load band (normal/elevated/critical)
from the window p99. Standalone value: current lag is exposed on /api/status and
band changes are logged, so site lag is diagnosable independent of throttling.

The band feeds the #142 reconnect throttle (next commit) but ships first as its
own subsystem.

- event_loop_lag is bounded from day one: indexed on sampled_at + scheduled prune
  (LAG_TELEMETRY_RETENTION_DAYS, small default) modeled on the play_logs prune.
  Deliberately NOT another unbounded-growth table.
- Band transitions are asymmetric: jump up immediately (tighten fast), release one
  level at a time after N calm samples below a deadband (release slow, no flap).
  Pure nextBand() function, unit-tested deterministically.
- config: LAG_SAMPLE_INTERVAL_MS, LAG_RESOLUTION_MS, LAG_TELEMETRY_RETENTION_DAYS,
  LAG_PRUNE_INTERVAL_MS, LAG_ELEVATED_MS, LAG_CRITICAL_MS, LAG_RELEASE_SAMPLES.
- tests: band-transition unit tests; integration proves sampling persists, stays
  bounded under the prune, and surfaces on /api/status.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
ScreenTinker 2026-06-27 19:01:08 -05:00
parent d90cfb3986
commit ed3cf72b82
8 changed files with 274 additions and 0 deletions

View file

@ -90,4 +90,22 @@ module.exports = {
// on MSP-style deployments where an admin/operator assigns users to existing
// orgs after signup instead.
autoCreateOrgOnSignup: !['false', '0'].includes(String(process.env.AUTO_CREATE_ORG_ON_SIGNUP || '').toLowerCase()),
// #142 event-loop lag telemetry (services/loop-lag.js). perf_hooks
// monitorEventLoopDelay is C++-backed, so continuous sampling is cheap. Each
// window's p99 is persisted to event_loop_lag (bounded: indexed + pruned from
// day one) and drives the banded load level the reconnect throttle reads.
lagSampleIntervalMs: parseInt(process.env.LAG_SAMPLE_INTERVAL_MS) || 1000,
lagResolutionMs: parseInt(process.env.LAG_RESOLUTION_MS) || 20,
lagTelemetryRetentionDays: parseFloat(process.env.LAG_TELEMETRY_RETENTION_DAYS) || 3,
lagPruneIntervalMs: parseInt(process.env.LAG_PRUNE_INTERVAL_MS) || 3600000,
// Banded load levels from the window p99 (ms). Asymmetric by design: a band is
// entered immediately when its up-threshold is crossed (tighten fast), but
// released only one step at a time after lagReleaseSamples consecutive samples
// fall below a deadband (release slow), so small fluctuations don't flap it.
// Bands ONLY scale how hard an already-flagged device is throttled; a healthy
// device is never gated by global lag.
lagElevatedMs: parseInt(process.env.LAG_ELEVATED_MS) || 100,
lagCriticalMs: parseInt(process.env.LAG_CRITICAL_MS) || 250,
lagReleaseSamples: parseInt(process.env.LAG_RELEASE_SAMPLES) || 5,
};

View file

@ -230,6 +230,10 @@ const migrations = [
// Both the dashboard uptime query and the retention prune were full scans — the
// dashboard-degradation cause once the table reached 1M+ rows.
"CREATE INDEX IF NOT EXISTS idx_device_status_log_device_ts ON device_status_log(device_id, timestamp)",
// #142: event-loop lag telemetry table (bounded: indexed + scheduled prune).
// schema.sql creates these on fresh installs; this covers existing DBs.
"CREATE TABLE IF NOT EXISTS event_loop_lag (id INTEGER PRIMARY KEY AUTOINCREMENT, sampled_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), mean_ms REAL NOT NULL, p50_ms REAL NOT NULL, p99_ms REAL NOT NULL, max_ms REAL NOT NULL, band TEXT NOT NULL DEFAULT 'normal')",
"CREATE INDEX IF NOT EXISTS idx_event_loop_lag_sampled ON event_loop_lag(sampled_at)",
];
// Apply each ALTER idempotently. A "duplicate column name" / "already exists"
// error means the column is already present (expected on a migrated DB) - benign.

View file

@ -469,6 +469,22 @@ CREATE TABLE IF NOT EXISTS device_status_log (
-- was the dashboard-degradation cause in the outage report.
CREATE INDEX IF NOT EXISTS idx_device_status_log_device_ts ON device_status_log(device_id, timestamp);
-- ===================== EVENT LOOP LAG (#142) =====================
-- Event-loop delay telemetry from perf_hooks.monitorEventLoopDelay(). Bounded
-- from day one: indexed on sampled_at and pruned on a schedule (see
-- services/loop-lag.js, LAG_TELEMETRY_RETENTION_DAYS) so it can never become a
-- second unbounded-growth table.
CREATE TABLE IF NOT EXISTS event_loop_lag (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sampled_at INTEGER NOT NULL DEFAULT (strftime('%s','now')),
mean_ms REAL NOT NULL,
p50_ms REAL NOT NULL,
p99_ms REAL NOT NULL,
max_ms REAL NOT NULL,
band TEXT NOT NULL DEFAULT 'normal'
);
CREATE INDEX IF NOT EXISTS idx_event_loop_lag_sampled ON event_loop_lag(sampled_at);
-- ===================== DEVICE FINGERPRINTS =====================
CREATE TABLE IF NOT EXISTS device_fingerprints (

View file

@ -7,6 +7,7 @@ const fs = require('fs');
const config = require('../config');
const VERSION = require('../version');
const { PLATFORM_ROLES } = require('../middleware/auth');
const loopLag = require('../services/loop-lag');
// Public status page
router.get('/', (req, res) => {
@ -24,6 +25,9 @@ router.get('/', (req, res) => {
version,
uptime_human: formatUptime(uptime),
timestamp: new Date().toISOString(),
// #142: current event-loop lag snapshot, so site lag is diagnosable from the
// health endpoint independent of any throttling. Cheap (in-memory read).
loop_lag: loopLag.getLag(),
});
});

View file

@ -625,6 +625,10 @@ app.set('io', io);
const { startHeartbeatChecker } = require('./services/heartbeat');
startHeartbeatChecker(io);
// #142: start event-loop lag sampling (feeds /api/status + the reconnect throttle)
const { startLoopLagMonitor } = require('./services/loop-lag');
startLoopLagMonitor();
// Start command-queue sweep (prunes expired entries for offline devices)
const commandQueue = require('./lib/command-queue');
commandQueue.startSweep();

107
server/services/loop-lag.js Normal file
View file

@ -0,0 +1,107 @@
// #142 — Event-loop lag telemetry (the data subsystem; ships before the throttle).
//
// Continuously samples event-loop delay via perf_hooks.monitorEventLoopDelay()
// (a C++-backed histogram — cheap). Each window we read mean/p50/p99/max, persist
// a row to the bounded `event_loop_lag` table, and recompute a coarse load BAND
// (normal | elevated | critical) from the window p99.
//
// The band is consumed by the reconnect throttle (#142 step 3), but this module
// has standalone value: getLag() is surfaced on /api/status and band changes are
// logged, so site connectivity/lag is diagnosable independent of any throttling.
//
// Band transitions are deliberately asymmetric (see nextBand): jump UP immediately
// when an up-threshold is crossed (tighten fast), step DOWN only one level at a
// time after lagReleaseSamples consecutive calm samples below a deadband (release
// slow). This avoids band flap from transient blips.
const { monitorEventLoopDelay } = require('perf_hooks');
const { db } = require('../db/database');
const config = require('../config');
const NS_PER_MS = 1e6;
// A band releases only once p99 falls below this fraction of the band's entry
// threshold — the deadband that stops small fluctuations from flapping the band.
const DEADBAND = 0.5;
const LEVEL = { normal: 0, elevated: 1, critical: 2 };
let histogram = null;
let band = 'normal';
let calmSamples = 0;
let current = { mean_ms: 0, p50_ms: 0, p99_ms: 0, max_ms: 0, band: 'normal', sampled_at: 0 };
// Pure band-transition function (exported for deterministic unit tests). Given the
// current band, the window p99 (ms), and the running calm-sample count, returns the
// next [band, calmSamples]. Up is immediate (may skip a level); down is one step
// per release window, gated by a deadband.
function nextBand(cur, p99, calm) {
const level = LEVEL[cur] ?? 0;
// UP — immediate, tighten fast (normal can jump straight to critical).
if (p99 >= config.lagCriticalMs && level < LEVEL.critical) return ['critical', 0];
if (p99 >= config.lagElevatedMs && level < LEVEL.elevated) return ['elevated', 0];
// DOWN — slow, one step, only below the current band's deadband.
if (level === LEVEL.critical && p99 <= config.lagCriticalMs * DEADBAND) {
const c = calm + 1;
return c >= config.lagReleaseSamples ? ['elevated', 0] : ['critical', c];
}
if (level === LEVEL.elevated && p99 <= config.lagElevatedMs * DEADBAND) {
const c = calm + 1;
return c >= config.lagReleaseSamples ? ['normal', 0] : ['elevated', c];
}
// Hold (inside deadband, or already normal): reset the calm counter.
return [cur, 0];
}
const round2 = (x) => Math.round(x * 100) / 100;
function sample() {
const p99 = histogram.percentile(99) / NS_PER_MS;
const snap = {
mean_ms: round2(histogram.mean / NS_PER_MS),
p50_ms: round2(histogram.percentile(50) / NS_PER_MS),
p99_ms: round2(p99),
max_ms: round2(histogram.max / NS_PER_MS),
};
histogram.reset();
const prev = band;
[band, calmSamples] = nextBand(band, snap.p99_ms, calmSamples);
current = { ...snap, band, sampled_at: Math.floor(Date.now() / 1000) };
try {
db.prepare(
'INSERT INTO event_loop_lag (sampled_at, mean_ms, p50_ms, p99_ms, max_ms, band) VALUES (?, ?, ?, ?, ?, ?)'
).run(current.sampled_at, snap.mean_ms, snap.p50_ms, snap.p99_ms, snap.max_ms, band);
} catch (_) { /* table may not exist on a partially-migrated DB */ }
// Observable: log whenever we're loaded or when the band changes (incl. back to
// normal). Healthy steady state stays quiet.
if (band !== 'normal' || prev !== 'normal') {
const tag = band !== prev ? ` (was ${prev})` : '';
console.log(`[loop-lag] band=${band}${tag} mean=${snap.mean_ms}ms p99=${snap.p99_ms}ms max=${snap.max_ms}ms`);
}
}
function pruneLag() {
try {
const cutoff = Math.floor(Date.now() / 1000) - Math.round(config.lagTelemetryRetentionDays * 86400);
const n = db.prepare('DELETE FROM event_loop_lag WHERE sampled_at < ?').run(cutoff).changes;
if (n > 0) console.log(`[loop-lag] pruned ${n} sample(s) older than ${config.lagTelemetryRetentionDays}d`);
} catch (_) { /* ignore */ }
}
function startLoopLagMonitor() {
if (histogram) return; // idempotent
histogram = monitorEventLoopDelay({ resolution: config.lagResolutionMs });
histogram.enable();
const t1 = setInterval(sample, config.lagSampleIntervalMs);
pruneLag(); // sweep stale rows on boot
const t2 = setInterval(pruneLag, config.lagPruneIntervalMs);
// Don't keep the process alive on these timers (matters for tests / clean exit).
if (t1.unref) t1.unref();
if (t2.unref) t2.unref();
}
function getBand() { return band; }
function getLag() { return { ...current }; }
module.exports = { startLoopLagMonitor, getBand, getLag, nextBand };

View file

@ -0,0 +1,64 @@
'use strict';
// #142 step 2 — integration: the lag monitor samples, persists to a BOUNDED table,
// and surfaces current lag on /api/status. Boots the real server with fast sampling
// and a tiny (fractional-day) retention so the prune is observable within the test.
const { test, before, after } = require('node:test');
const assert = require('node:assert/strict');
const { spawn } = require('node:child_process');
const path = require('node:path');
const os = require('node:os');
const fs = require('node:fs');
const crypto = require('node:crypto');
const Database = require('better-sqlite3');
const PORT = 3979;
const BASE = `http://127.0.0.1:${PORT}`;
const DATA_DIR = path.join(os.tmpdir(), 'st-lag-int-' + crypto.randomBytes(4).toString('hex'));
const LOG = path.join(os.tmpdir(), 'st-lag-int-' + crypto.randomBytes(4).toString('hex') + '.log');
let proc;
before(async () => {
const logFd = fs.openSync(LOG, 'w');
proc = spawn('node', ['server.js'], {
cwd: path.join(__dirname, '..'),
env: {
...process.env, DATA_DIR, SELF_HOSTED: 'true', PORT: String(PORT), NODE_ENV: 'test',
LAG_SAMPLE_INTERVAL_MS: '200', // sample fast
LAG_TELEMETRY_RETENTION_DAYS: '0.00001', // ~0.86s retention
LAG_PRUNE_INTERVAL_MS: '400', // prune often
},
stdio: ['ignore', logFd, logFd],
});
let up = false;
for (let i = 0; i < 80; i++) {
try { const r = await fetch(BASE + '/api/status'); if (r.ok) { up = true; break; } } catch { /* not yet */ }
await new Promise(r => setTimeout(r, 250));
}
if (!up) throw new Error('server did not boot:\n' + fs.readFileSync(LOG, 'utf8').slice(-2000));
});
after(() => { try { proc.kill('SIGKILL'); } catch { /* */ } });
test('/api/status exposes a current loop_lag snapshot', async () => {
const r = await fetch(BASE + '/api/status');
const body = await r.json();
assert.ok(body.loop_lag, 'loop_lag present on /api/status');
assert.ok(['normal', 'elevated', 'critical'].includes(body.loop_lag.band), 'band is a valid level');
assert.equal(typeof body.loop_lag.p99_ms, 'number', 'p99_ms is numeric');
assert.equal(typeof body.loop_lag.mean_ms, 'number', 'mean_ms is numeric');
});
test('lag samples are persisted AND bounded by retention prune (not unbounded)', async () => {
// Let it sample for ~3s. At 200ms/sample that is ~15 inserts, but with ~0.86s
// retention pruned every 400ms the table must stay small — proving the table
// can never become a second unbounded-growth table.
await new Promise(r => setTimeout(r, 3000));
const dbPath = path.join(DATA_DIR, 'db', 'remote_display.db');
const db = new Database(dbPath, { readonly: true });
const count = db.prepare('SELECT COUNT(*) c FROM event_loop_lag').get().c;
db.close();
assert.ok(count >= 1, 'lag samples are being persisted');
assert.ok(count < 15, `table is bounded by the prune (held ${count} rows over ~3s of 200ms sampling)`);
});

View file

@ -0,0 +1,57 @@
'use strict';
// #142 step 2 — deterministic unit tests for the event-loop-lag band transitions.
// Pure function, no sockets/timing. Isolate the DB to a temp dir BEFORE requiring
// the module (requiring it pulls in db/database, which initialises a DB on load).
const os = require('node:os');
const path = require('node:path');
const crypto = require('node:crypto');
process.env.DATA_DIR = path.join(os.tmpdir(), 'st-lag-unit-' + crypto.randomBytes(4).toString('hex'));
const { test } = require('node:test');
const assert = require('node:assert/strict');
const { nextBand } = require('../services/loop-lag');
// config defaults exercised here: elevated=100ms, critical=250ms, releaseSamples=5,
// deadband=0.5 -> release-below thresholds: elevated@50ms, critical@125ms.
test('UP is immediate and can skip a level (tighten fast)', () => {
assert.deepEqual(nextBand('normal', 50, 0), ['normal', 0], 'below elevated stays normal');
assert.deepEqual(nextBand('normal', 100, 0), ['elevated', 0], 'crossing elevated up-threshold jumps immediately');
assert.deepEqual(nextBand('normal', 250, 0), ['critical', 0], 'a big spike jumps normal->critical in one sample');
assert.deepEqual(nextBand('elevated', 250, 0), ['critical', 0]);
});
test('deadband holds the band for small fluctuations (no flap)', () => {
// elevated, p99 between release(50) and up(100) -> hold elevated, calm reset
assert.deepEqual(nextBand('elevated', 80, 3), ['elevated', 0]);
// critical, p99 between release(125) and up(250) -> hold critical
assert.deepEqual(nextBand('critical', 200, 4), ['critical', 0]);
});
test('DOWN is slow: requires lagReleaseSamples calm samples below the deadband', () => {
// elevated -> normal only after 5 consecutive calm samples
let band = 'elevated', calm = 0;
for (let i = 0; i < 4; i++) {
[band, calm] = nextBand(band, 20, calm);
assert.equal(band, 'elevated', `still elevated after ${i + 1} calm sample(s)`);
}
[band, calm] = nextBand(band, 20, calm); // 5th
assert.deepEqual([band, calm], ['normal', 0], 'drops to normal on the 5th calm sample');
});
test('DOWN releases one level at a time: critical -> elevated -> normal', () => {
let band = 'critical', calm = 0;
for (let i = 0; i < 5; i++) [band, calm] = nextBand(band, 10, calm);
assert.equal(band, 'elevated', 'critical releases to elevated, never straight to normal');
for (let i = 0; i < 5; i++) [band, calm] = nextBand(band, 10, calm);
assert.equal(band, 'normal', 'then elevated releases to normal');
});
test('a single calm sample does not release (calm counter resets on a non-calm sample)', () => {
let [band, calm] = nextBand('elevated', 20, 0); // calm=1
assert.deepEqual([band, calm], ['elevated', 1]);
[band, calm] = nextBand(band, 80, calm); // back inside deadband -> reset
assert.deepEqual([band, calm], ['elevated', 0], 'one blip resets the release counter');
});