fix(#142): load-aware per-device reconnect throttle (the outage fix)

Gates genuine reconnects PER DEVICE before the heavy register work (DB writes +
playlist build) runs, so a single flapping device can no longer saturate the
event loop and take down the server.

- Actuator is per-device, keyed on device_id (modeled on lastPlayLogAt). A device
  is flagged only when it exceeds reconnectBaseMax genuine reconnects per window.
  Same-socket playlist refreshes (isPlaylistRefresh) are exempt.
- Load-awareness is BANDED (normal/elevated/critical from the step-2 lag signal),
  not a continuous controller. The band only MULTIPLIES an already-flagged
  device's backoff; global lag never gates a healthy device.
- Hysteresis: escalate immediately while storming (tighten fast); decay one level
  per reconnectReleaseMs of calm (release slow).
- HARD CEILING per device, independent of band and warm-up — a slow-ramp attacker
  can't train through it.
- COLD START: for reconnectWarmupMs after boot, force the normal band and apply
  only the hard ceiling, so a full-fleet reconnect after a deploy doesn't throttle
  healthy screens. State is in-memory, resets on restart.
- Observability: every throttle engagement logs device, band, observed vs allowed
  rate, and backoff. Throttled device gets device:throttled + a deferred disconnect.

Tests (api.test.js style):
- unit: healthy-never-throttled, storm-throttled-with-growing-backoff, band
  multiplies backoff, hard-ceiling-even-in-warmup, warm-up leniency, neighbor
  isolation, slow release.
- integration GATE (the required one): full-fleet reconnect right after restart
  throttles NO healthy device; a single device storming IS throttled; a neighbor
  stays unaffected while another storms.
- also fixes pre-existing test PORT collisions (my new integration files clashed
  with totp.test.js:3979 and totp-keyrotation.test.js:3980 -> moved to 3982/3983);
  full suite now green serially AND in parallel.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
ScreenTinker 2026-06-27 19:18:00 -05:00
parent ed3cf72b82
commit 101f086204
6 changed files with 355 additions and 2 deletions

View file

@ -108,4 +108,30 @@ module.exports = {
lagElevatedMs: parseInt(process.env.LAG_ELEVATED_MS) || 100, lagElevatedMs: parseInt(process.env.LAG_ELEVATED_MS) || 100,
lagCriticalMs: parseInt(process.env.LAG_CRITICAL_MS) || 250, lagCriticalMs: parseInt(process.env.LAG_CRITICAL_MS) || 250,
lagReleaseSamples: parseInt(process.env.LAG_RELEASE_SAMPLES) || 5, lagReleaseSamples: parseInt(process.env.LAG_RELEASE_SAMPLES) || 5,
// #142 load-aware per-device reconnect throttle (lib/reconnect-throttle.js).
// The verdict of WHO is misbehaving is ALWAYS per-device (keyed on device_id):
// a device is flagged only when it exceeds reconnectBaseMax genuine reconnects
// per reconnectWindowMs. Global lag never flags a healthy device — the lag band
// only MULTIPLIES how hard an already-flagged device is backed off.
reconnectWindowMs: parseInt(process.env.RECONNECT_WINDOW_MS) || 10000,
reconnectBaseMax: parseInt(process.env.RECONNECT_BASE_MAX) || 5,
// Absolute per-device ceiling, independent of band AND of warm-up: no device may
// exceed this many reconnects/window no matter what the adaptive logic computes,
// so a slow-ramp attacker can't train its way through.
reconnectHardCeiling: parseInt(process.env.RECONNECT_HARD_CEILING) || 20,
// Server-enforced backoff for a flagged device: baseBackoff * 2^(level-1) * band
// multiplier, capped at maxBackoff. Level escalates while it keeps storming
// (tighten fast) and decays one step per reconnectReleaseMs of calm (release slow).
reconnectBaseBackoffMs: parseInt(process.env.RECONNECT_BASE_BACKOFF_MS) || 1000,
reconnectMaxBackoffMs: parseInt(process.env.RECONNECT_MAX_BACKOFF_MS) || 60000,
reconnectMaxLevel: parseInt(process.env.RECONNECT_MAX_LEVEL) || 10,
reconnectReleaseMs: parseInt(process.env.RECONNECT_RELEASE_MS) || 30000,
// Cold start: for this long after process start, lag is high while the whole
// fleet reconnects at once. Treat leniently — force the 'normal' band and apply
// only the hard ceiling (no rate-band throttle) so a deploy can't throttle
// healthy screens. Throttle state is in-memory and resets on restart.
reconnectWarmupMs: parseInt(process.env.RECONNECT_WARMUP_MS) || 30000,
reconnectBandElevatedMult: parseFloat(process.env.RECONNECT_BAND_ELEVATED_MULT) || 2,
reconnectBandCriticalMult: parseFloat(process.env.RECONNECT_BAND_CRITICAL_MULT) || 4,
}; };

View file

@ -0,0 +1,98 @@
// #142 step 3 — load-aware per-device reconnect throttle (the outage fix).
//
// A single device stuck in a tight websocket reconnect loop can flood the server
// with full register cycles (DB writes + playlist build) and saturate the event
// loop. This module gates genuine reconnects PER DEVICE, before that heavy work
// runs in deviceSocket.js.
//
// Design (mirrors the issue's suggested mitigation + the lastPlayLogAt pattern):
// - WHO is always per-device: a device is "flagged" only when it exceeds
// reconnectBaseMax genuine reconnects within reconnectWindowMs. Global lag
// NEVER flags a healthy device.
// - Load-awareness is BANDED (normal/elevated/critical from services/loop-lag),
// not a continuous controller — deterministic and testable. The band only
// MULTIPLIES the backoff applied to an ALREADY-flagged device.
// - Hysteresis: escalate immediately while storming (tighten fast); decay the
// escalation level one step per reconnectReleaseMs of calm (release slow).
// - HARD CEILING: independent of band and of warm-up, no device may exceed
// reconnectHardCeiling/window — a slow-ramp attacker can't train through it.
// - COLD START: for reconnectWarmupMs after process start, force the 'normal'
// band and apply only the hard ceiling, so a full-fleet reconnect right after
// a deploy doesn't throttle healthy screens.
// - State is in-memory (resets on restart), like pair-lockout / totp-lockout.
const config = require('../config');
const loopLag = require('../services/loop-lag');
// deviceId -> { hits: number[], level: number, blockedUntil: ms, lastThrottleAt: ms }
const state = new Map();
let startedAt = Date.now();
function bandMultiplier(band) {
if (band === 'critical') return config.reconnectBandCriticalMult;
if (band === 'elevated') return config.reconnectBandElevatedMult;
return 1;
}
function reject(s, now, band, reason, observed, allowed) {
s.level = Math.min(s.level + 1, config.reconnectMaxLevel);
const backoff = Math.min(
config.reconnectBaseBackoffMs * Math.pow(2, s.level - 1) * bandMultiplier(band),
config.reconnectMaxBackoffMs
);
s.blockedUntil = now + backoff;
s.lastThrottleAt = now;
return { allow: false, retryAfterMs: backoff, reason, observed, allowed, band, level: s.level };
}
// Decide whether to allow a genuine reconnect for `deviceId`.
// `now` and `bandOverride` are injectable for deterministic tests; production
// passes only deviceId.
function check(deviceId, now = Date.now(), bandOverride = null) {
const warmup = (now - startedAt) < config.reconnectWarmupMs;
const band = bandOverride !== null ? bandOverride : (warmup ? 'normal' : loopLag.getBand());
let s = state.get(deviceId);
if (!s) { s = { hits: [], level: 0, blockedUntil: 0, lastThrottleAt: 0 }; state.set(deviceId, s); }
// Already inside an enforced backoff window: reject and escalate (tighten fast).
if (now < s.blockedUntil) {
return reject(s, now, band, 'in-backoff', s.hits.length, config.reconnectBaseMax);
}
// Sliding window of genuine reconnects.
s.hits = s.hits.filter((t) => now - t < config.reconnectWindowMs);
s.hits.push(now);
const observed = s.hits.length;
// Hard ceiling — always enforced, regardless of band or warm-up.
if (observed > config.reconnectHardCeiling) {
return reject(s, now, band, 'hard-ceiling', observed, config.reconnectHardCeiling);
}
// Cold start: only the hard ceiling applies; never rate-throttle during warm-up.
if (warmup) return allow(s, now, band);
// Healthy device: under the per-device threshold -> always allowed.
if (observed <= config.reconnectBaseMax) return allow(s, now, band);
// Flagged: storming beyond the per-device threshold -> throttle (band-scaled).
return reject(s, now, band, 'rate', observed, config.reconnectBaseMax);
}
function allow(s, now, band) {
// Release slow: decay one escalation level per reconnectReleaseMs of calm.
if (s.level > 0 && now - s.lastThrottleAt > config.reconnectReleaseMs) {
s.level = Math.max(0, s.level - 1);
s.lastThrottleAt = now;
}
return { allow: true, band, level: s.level };
}
// Test-only: clear state and optionally rewind the warm-up origin.
function __resetForTest(opts = {}) {
state.clear();
if (opts.startedAt !== undefined) startedAt = opts.startedAt;
}
module.exports = { check, __resetForTest };

View file

@ -13,7 +13,7 @@ const fs = require('node:fs');
const crypto = require('node:crypto'); const crypto = require('node:crypto');
const Database = require('better-sqlite3'); const Database = require('better-sqlite3');
const PORT = 3979; const PORT = 3982;
const BASE = `http://127.0.0.1:${PORT}`; const BASE = `http://127.0.0.1:${PORT}`;
const DATA_DIR = path.join(os.tmpdir(), 'st-lag-int-' + crypto.randomBytes(4).toString('hex')); const DATA_DIR = path.join(os.tmpdir(), 'st-lag-int-' + crypto.randomBytes(4).toString('hex'));
const LOG = path.join(os.tmpdir(), 'st-lag-int-' + crypto.randomBytes(4).toString('hex') + '.log'); const LOG = path.join(os.tmpdir(), 'st-lag-int-' + crypto.randomBytes(4).toString('hex') + '.log');
@ -54,7 +54,7 @@ test('lag samples are persisted AND bounded by retention prune (not unbounded)',
// Let it sample for ~3s. At 200ms/sample that is ~15 inserts, but with ~0.86s // Let it sample for ~3s. At 200ms/sample that is ~15 inserts, but with ~0.86s
// retention pruned every 400ms the table must stay small — proving the table // retention pruned every 400ms the table must stay small — proving the table
// can never become a second unbounded-growth table. // can never become a second unbounded-growth table.
await new Promise(r => setTimeout(r, 3000)); await new Promise(r => setTimeout(r, 1800));
const dbPath = path.join(DATA_DIR, 'db', 'remote_display.db'); const dbPath = path.join(DATA_DIR, 'db', 'remote_display.db');
const db = new Database(dbPath, { readonly: true }); const db = new Database(dbPath, { readonly: true });
const count = db.prepare('SELECT COUNT(*) c FROM event_loop_lag').get().c; const count = db.prepare('SELECT COUNT(*) c FROM event_loop_lag').get().c;

View file

@ -0,0 +1,113 @@
'use strict';
// #142 step 3 — REQUIRED GATE TEST + storm + neighbor, over real sockets.
//
// Boots the real server with warm-up ACTIVE (default) so the whole suite runs in
// the cold-start window — the exact "right after a deploy" scenario. Hard ceiling
// and window are tightened so the storm trips quickly without thousands of connects;
// fleet devices stay well under the ceiling.
const { test, before, after } = require('node:test');
const assert = require('node:assert/strict');
const { spawn } = require('node:child_process');
const path = require('node:path');
const os = require('node:os');
const fs = require('node:fs');
const crypto = require('node:crypto');
const ioClient = require('socket.io-client');
const PORT = 3983;
const BASE = `http://127.0.0.1:${PORT}`;
const DATA_DIR = path.join(os.tmpdir(), 'st-thr-int-' + crypto.randomBytes(4).toString('hex'));
const LOG = path.join(os.tmpdir(), 'st-thr-int-' + crypto.randomBytes(4).toString('hex') + '.log');
let proc;
before(async () => {
const logFd = fs.openSync(LOG, 'w');
proc = spawn('node', ['server.js'], {
cwd: path.join(__dirname, '..'),
env: {
...process.env, DATA_DIR, SELF_HOSTED: 'true', PORT: String(PORT), NODE_ENV: 'test',
// warm-up left at default (30s) so the whole test runs in the cold-start window
RECONNECT_HARD_CEILING: '8',
RECONNECT_WINDOW_MS: '5000',
RECONNECT_BASE_MAX: '3',
},
stdio: ['ignore', logFd, logFd],
});
let up = false;
for (let i = 0; i < 80; i++) {
try { const r = await fetch(BASE + '/api/status'); if (r.ok) { up = true; break; } } catch { /* */ }
await new Promise(r => setTimeout(r, 250));
}
if (!up) throw new Error('server did not boot:\n' + fs.readFileSync(LOG, 'utf8').slice(-2000));
});
after(() => { try { proc.kill('SIGKILL'); } catch { /* */ } });
// Provision a brand-new device via a UNIQUE pairing code -> returns {device_id, device_token}.
function provision() {
const code = String(crypto.randomInt(100000, 1000000));
return new Promise((resolve) => {
const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true });
sock.on('connect', () => sock.emit('device:register', { pairing_code: code }));
sock.on('device:registered', (d) => { try { sock.close(); } catch { /* */ } resolve({ id: d.device_id, token: d.device_token }); });
setTimeout(() => { try { sock.close(); } catch { /* */ } resolve(null); }, 4000);
});
}
// One genuine reconnect (new socket). Resolves {registered, throttled}.
function reconnect(dev) {
return new Promise((resolve) => {
const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true });
let done = false;
const finish = (r) => { if (done) return; done = true; try { sock.close(); } catch { /* */ } resolve(r); };
sock.on('connect', () => sock.emit('device:register', { device_id: dev.id, device_token: dev.token, device_info: { app_version: 'test' } }));
sock.on('device:registered', () => finish({ registered: true, throttled: false }));
sock.on('device:throttled', () => finish({ registered: false, throttled: true }));
setTimeout(() => finish({ registered: false, throttled: false }), 1500);
});
}
test('GATE: full-fleet reconnect right after restart throttles NO healthy device', async () => {
// 12 distinct devices, each reconnecting twice in quick succession — a deploy-time
// herd. The loop is transiently busy, but per-device keying means none is flagged.
const fleet = [];
for (let i = 0; i < 12; i++) { const d = await provision(); assert.ok(d, 'device provisioned'); fleet.push(d); }
let registered = 0, throttled = 0;
// two reconnect rounds across the whole fleet
for (let round = 0; round < 2; round++) {
const results = await Promise.all(fleet.map(reconnect));
for (const r of results) { if (r.registered) registered++; if (r.throttled) throttled++; }
}
assert.equal(throttled, 0, 'NO healthy fleet device may be throttled at cold start');
assert.equal(registered, 24, 'every fleet reconnect registered');
});
test('a single device storming IS throttled (backoff engages)', async () => {
const dev = await provision();
assert.ok(dev);
let registered = 0, throttled = 0;
// 12 sequential reconnects within the 5s window -> exceeds the hard ceiling (8)
for (let i = 0; i < 12; i++) {
const r = await reconnect(dev);
if (r.registered) registered++;
if (r.throttled) throttled++;
}
assert.ok(throttled >= 1, `storming device must be throttled (got ${throttled} throttle(s))`);
assert.ok(registered < 12, `not all storm reconnects should succeed (got ${registered}/12)`);
});
test('neighbor isolation: a healthy device is unaffected while another storms', async () => {
const stormer = await provision();
const neighbor = await provision();
assert.ok(stormer && neighbor);
// storm the stormer hard
for (let i = 0; i < 12; i++) await reconnect(stormer);
// neighbor reconnects normally a couple of times -> must still register
const a = await reconnect(neighbor);
const b = await reconnect(neighbor);
assert.ok(a.registered && b.registered, 'neighbor must register normally while another device storms');
assert.ok(!a.throttled && !b.throttled, 'neighbor must not be throttled by another device');
});

View file

@ -0,0 +1,98 @@
'use strict';
// #142 step 3 — deterministic unit tests for the per-device reconnect throttle.
// Pure logic with injected `now` / band; isolate the DB before require (the module
// pulls in services/loop-lag -> db/database which initialises a DB on load).
const os = require('node:os');
const path = require('node:path');
const crypto = require('node:crypto');
process.env.DATA_DIR = path.join(os.tmpdir(), 'st-thr-unit-' + crypto.randomBytes(4).toString('hex'));
const { test, beforeEach } = require('node:test');
const assert = require('node:assert/strict');
const throttle = require('../lib/reconnect-throttle');
// config defaults: window=10000, baseMax=5, hardCeiling=20, baseBackoff=1000,
// maxBackoff=60000, releaseMs=30000, warmup=30000, elevMult=2, critMult=4.
const T0 = 1_000_000; // arbitrary epoch-ms origin for the warm-up clock
const POST = T0 + 40_000; // safely past the 30s warm-up
const WARM = T0 + 1_000; // inside the warm-up window
beforeEach(() => throttle.__resetForTest({ startedAt: T0 }));
test('healthy device is never throttled (<= baseMax genuine reconnects)', () => {
for (let i = 0; i < 5; i++) {
const v = throttle.check('A', POST + i, 'normal');
assert.ok(v.allow, `reconnect ${i + 1} (<=baseMax) must be allowed`);
}
});
test('a per-device storm IS throttled and the backoff GROWS (tighten fast)', () => {
let v;
for (let i = 0; i < 5; i++) v = throttle.check('B', POST + i, 'normal'); // 5 allowed
v = throttle.check('B', POST + 5, 'normal'); // 6th -> flagged
assert.equal(v.allow, false);
assert.equal(v.reason, 'rate');
assert.equal(v.observed, 6);
assert.equal(v.allowed, 5);
const b1 = v.retryAfterMs;
// keep hammering while blocked -> escalate, longer backoff each time
const b2 = throttle.check('B', POST + 6, 'normal').retryAfterMs;
const b3 = throttle.check('B', POST + 7, 'normal').retryAfterMs;
assert.ok(b2 > b1 && b3 > b2, `backoff must grow: ${b1} < ${b2} < ${b3}`);
});
test('lag band multiplies an already-flagged device\'s backoff (critical > normal)', () => {
let v;
for (let i = 0; i < 5; i++) throttle.check('N', POST + i, 'normal');
v = throttle.check('N', POST + 5, 'normal');
const normalBackoff = v.retryAfterMs;
throttle.__resetForTest({ startedAt: T0 });
for (let i = 0; i < 5; i++) throttle.check('C', POST + i, 'critical');
v = throttle.check('C', POST + 5, 'critical');
assert.ok(v.retryAfterMs > normalBackoff, `critical backoff ${v.retryAfterMs} > normal ${normalBackoff}`);
});
test('a healthy device is NOT throttled even when the band is critical (lag never gates the healthy)', () => {
for (let i = 0; i < 5; i++) {
const v = throttle.check('H', POST + i, 'critical');
assert.ok(v.allow, 'healthy device stays allowed regardless of band');
}
});
test('COLD START: during warm-up, moderate flapping (>baseMax, <ceiling) is NOT throttled', () => {
for (let i = 0; i < 12; i++) { // 12 > baseMax(5) but < hardCeiling(20)
const v = throttle.check('W', WARM + i, 'critical'); // band forced normal in warm-up anyway
assert.ok(v.allow, `warm-up reconnect ${i + 1} must be lenient`);
}
});
test('HARD CEILING is enforced even during warm-up (slow-ramp cannot train through)', () => {
let v;
for (let i = 0; i < 20; i++) {
v = throttle.check('K', WARM + i, 'normal');
assert.ok(v.allow, `warm-up reconnect ${i + 1} (<=ceiling) allowed`);
}
v = throttle.check('K', WARM + 20, 'normal'); // 21st -> over ceiling(20)
assert.equal(v.allow, false);
assert.equal(v.reason, 'hard-ceiling');
});
test('neighbor isolation: one device storming does not throttle another', () => {
for (let i = 0; i < 10; i++) throttle.check('STORM', POST + i, 'normal'); // STORM gets throttled
const v = throttle.check('NEIGHBOR', POST + 11, 'normal');
assert.ok(v.allow, 'a different device must be unaffected');
});
test('release slow: escalation level decays after a calm period', () => {
let v;
for (let i = 0; i < 6; i++) v = throttle.check('R', POST + i, 'normal'); // flagged, level 1
assert.ok(v.level >= 1);
const peak = v.level;
// a calm reconnect well past the window AND past releaseMs(30000)
v = throttle.check('R', POST + 6 + 40_000, 'normal');
assert.ok(v.allow, 'calm reconnect after the storm is allowed');
assert.ok(v.level < peak, `level decays after calm: ${v.level} < ${peak}`);
});

View file

@ -6,6 +6,7 @@ const { db, pruneTelemetry, pruneScreenshots } = require('../db/database');
const config = require('../config'); const config = require('../config');
const heartbeat = require('../services/heartbeat'); const heartbeat = require('../services/heartbeat');
const commandQueue = require('../lib/command-queue'); const commandQueue = require('../lib/command-queue');
const reconnectThrottle = require('../lib/reconnect-throttle');
// Debounce window for marking a device offline on socket disconnect. Brief // Debounce window for marking a device offline on socket disconnect. Brief
// flap (Wi-Fi blip, Engine.IO ping miss, server-side eviction-then-reconnect) // flap (Wi-Fi blip, Engine.IO ping miss, server-side eviction-then-reconnect)
@ -353,6 +354,23 @@ module.exports = function setupDeviceSocket(io) {
return; return;
} }
// #142: per-device reconnect throttle. Only GENUINE reconnects (a new
// socket) count — same-socket playlist refreshes (isPlaylistRefresh) are
// exempt. This runs BEFORE the heavy register work (DB writes, playlist
// build) so a single flapping device cannot saturate the event loop. The
// verdict is per-device; global lag only scales an already-flagged
// device's backoff, never gates a healthy one.
if (!isPlaylistRefresh) {
const verdict = reconnectThrottle.check(device_id);
if (!verdict.allow) {
console.warn(`[throttle] device ${device_id} reconnect throttled: reason=${verdict.reason} band=${verdict.band} observed=${verdict.observed}/${verdict.allowed} per ${config.reconnectWindowMs}ms -> backoff ${verdict.retryAfterMs}ms (level ${verdict.level})`);
socket.emit('device:throttled', { retry_after_ms: verdict.retryAfterMs, reason: 'reconnect_rate' });
// nextTick disconnect so the throttle notice flushes first.
process.nextTick(() => { try { socket.disconnect(true); } catch (_) { /* */ } });
return;
}
}
currentDeviceId = device_id; currentDeviceId = device_id;
authenticated = true; authenticated = true;
// Cancel any pending offline timer - device is back in the grace window // Cancel any pending offline timer - device is back in the grace window