diff --git a/server/config.js b/server/config.js index e73e107..6a146a0 100644 --- a/server/config.js +++ b/server/config.js @@ -108,4 +108,30 @@ module.exports = { lagElevatedMs: parseInt(process.env.LAG_ELEVATED_MS) || 100, lagCriticalMs: parseInt(process.env.LAG_CRITICAL_MS) || 250, lagReleaseSamples: parseInt(process.env.LAG_RELEASE_SAMPLES) || 5, + + // #142 load-aware per-device reconnect throttle (lib/reconnect-throttle.js). + // The verdict of WHO is misbehaving is ALWAYS per-device (keyed on device_id): + // a device is flagged only when it exceeds reconnectBaseMax genuine reconnects + // per reconnectWindowMs. Global lag never flags a healthy device — the lag band + // only MULTIPLIES how hard an already-flagged device is backed off. + reconnectWindowMs: parseInt(process.env.RECONNECT_WINDOW_MS) || 10000, + reconnectBaseMax: parseInt(process.env.RECONNECT_BASE_MAX) || 5, + // Absolute per-device ceiling, independent of band AND of warm-up: no device may + // exceed this many reconnects/window no matter what the adaptive logic computes, + // so a slow-ramp attacker can't train its way through. + reconnectHardCeiling: parseInt(process.env.RECONNECT_HARD_CEILING) || 20, + // Server-enforced backoff for a flagged device: baseBackoff * 2^(level-1) * band + // multiplier, capped at maxBackoff. Level escalates while it keeps storming + // (tighten fast) and decays one step per reconnectReleaseMs of calm (release slow). + reconnectBaseBackoffMs: parseInt(process.env.RECONNECT_BASE_BACKOFF_MS) || 1000, + reconnectMaxBackoffMs: parseInt(process.env.RECONNECT_MAX_BACKOFF_MS) || 60000, + reconnectMaxLevel: parseInt(process.env.RECONNECT_MAX_LEVEL) || 10, + reconnectReleaseMs: parseInt(process.env.RECONNECT_RELEASE_MS) || 30000, + // Cold start: for this long after process start, lag is high while the whole + // fleet reconnects at once. Treat leniently — force the 'normal' band and apply + // only the hard ceiling (no rate-band throttle) so a deploy can't throttle + // healthy screens. Throttle state is in-memory and resets on restart. + reconnectWarmupMs: parseInt(process.env.RECONNECT_WARMUP_MS) || 30000, + reconnectBandElevatedMult: parseFloat(process.env.RECONNECT_BAND_ELEVATED_MULT) || 2, + reconnectBandCriticalMult: parseFloat(process.env.RECONNECT_BAND_CRITICAL_MULT) || 4, }; diff --git a/server/lib/reconnect-throttle.js b/server/lib/reconnect-throttle.js new file mode 100644 index 0000000..88a5c91 --- /dev/null +++ b/server/lib/reconnect-throttle.js @@ -0,0 +1,98 @@ +// #142 step 3 — load-aware per-device reconnect throttle (the outage fix). +// +// A single device stuck in a tight websocket reconnect loop can flood the server +// with full register cycles (DB writes + playlist build) and saturate the event +// loop. This module gates genuine reconnects PER DEVICE, before that heavy work +// runs in deviceSocket.js. +// +// Design (mirrors the issue's suggested mitigation + the lastPlayLogAt pattern): +// - WHO is always per-device: a device is "flagged" only when it exceeds +// reconnectBaseMax genuine reconnects within reconnectWindowMs. Global lag +// NEVER flags a healthy device. +// - Load-awareness is BANDED (normal/elevated/critical from services/loop-lag), +// not a continuous controller — deterministic and testable. The band only +// MULTIPLIES the backoff applied to an ALREADY-flagged device. +// - Hysteresis: escalate immediately while storming (tighten fast); decay the +// escalation level one step per reconnectReleaseMs of calm (release slow). +// - HARD CEILING: independent of band and of warm-up, no device may exceed +// reconnectHardCeiling/window — a slow-ramp attacker can't train through it. +// - COLD START: for reconnectWarmupMs after process start, force the 'normal' +// band and apply only the hard ceiling, so a full-fleet reconnect right after +// a deploy doesn't throttle healthy screens. +// - State is in-memory (resets on restart), like pair-lockout / totp-lockout. + +const config = require('../config'); +const loopLag = require('../services/loop-lag'); + +// deviceId -> { hits: number[], level: number, blockedUntil: ms, lastThrottleAt: ms } +const state = new Map(); +let startedAt = Date.now(); + +function bandMultiplier(band) { + if (band === 'critical') return config.reconnectBandCriticalMult; + if (band === 'elevated') return config.reconnectBandElevatedMult; + return 1; +} + +function reject(s, now, band, reason, observed, allowed) { + s.level = Math.min(s.level + 1, config.reconnectMaxLevel); + const backoff = Math.min( + config.reconnectBaseBackoffMs * Math.pow(2, s.level - 1) * bandMultiplier(band), + config.reconnectMaxBackoffMs + ); + s.blockedUntil = now + backoff; + s.lastThrottleAt = now; + return { allow: false, retryAfterMs: backoff, reason, observed, allowed, band, level: s.level }; +} + +// Decide whether to allow a genuine reconnect for `deviceId`. +// `now` and `bandOverride` are injectable for deterministic tests; production +// passes only deviceId. +function check(deviceId, now = Date.now(), bandOverride = null) { + const warmup = (now - startedAt) < config.reconnectWarmupMs; + const band = bandOverride !== null ? bandOverride : (warmup ? 'normal' : loopLag.getBand()); + + let s = state.get(deviceId); + if (!s) { s = { hits: [], level: 0, blockedUntil: 0, lastThrottleAt: 0 }; state.set(deviceId, s); } + + // Already inside an enforced backoff window: reject and escalate (tighten fast). + if (now < s.blockedUntil) { + return reject(s, now, band, 'in-backoff', s.hits.length, config.reconnectBaseMax); + } + + // Sliding window of genuine reconnects. + s.hits = s.hits.filter((t) => now - t < config.reconnectWindowMs); + s.hits.push(now); + const observed = s.hits.length; + + // Hard ceiling — always enforced, regardless of band or warm-up. + if (observed > config.reconnectHardCeiling) { + return reject(s, now, band, 'hard-ceiling', observed, config.reconnectHardCeiling); + } + + // Cold start: only the hard ceiling applies; never rate-throttle during warm-up. + if (warmup) return allow(s, now, band); + + // Healthy device: under the per-device threshold -> always allowed. + if (observed <= config.reconnectBaseMax) return allow(s, now, band); + + // Flagged: storming beyond the per-device threshold -> throttle (band-scaled). + return reject(s, now, band, 'rate', observed, config.reconnectBaseMax); +} + +function allow(s, now, band) { + // Release slow: decay one escalation level per reconnectReleaseMs of calm. + if (s.level > 0 && now - s.lastThrottleAt > config.reconnectReleaseMs) { + s.level = Math.max(0, s.level - 1); + s.lastThrottleAt = now; + } + return { allow: true, band, level: s.level }; +} + +// Test-only: clear state and optionally rewind the warm-up origin. +function __resetForTest(opts = {}) { + state.clear(); + if (opts.startedAt !== undefined) startedAt = opts.startedAt; +} + +module.exports = { check, __resetForTest }; diff --git a/server/test/loop-lag-integration.test.js b/server/test/loop-lag-integration.test.js index 098755e..3694ed0 100644 --- a/server/test/loop-lag-integration.test.js +++ b/server/test/loop-lag-integration.test.js @@ -13,7 +13,7 @@ const fs = require('node:fs'); const crypto = require('node:crypto'); const Database = require('better-sqlite3'); -const PORT = 3979; +const PORT = 3982; const BASE = `http://127.0.0.1:${PORT}`; const DATA_DIR = path.join(os.tmpdir(), 'st-lag-int-' + crypto.randomBytes(4).toString('hex')); const LOG = path.join(os.tmpdir(), 'st-lag-int-' + crypto.randomBytes(4).toString('hex') + '.log'); @@ -54,7 +54,7 @@ test('lag samples are persisted AND bounded by retention prune (not unbounded)', // Let it sample for ~3s. At 200ms/sample that is ~15 inserts, but with ~0.86s // retention pruned every 400ms the table must stay small — proving the table // can never become a second unbounded-growth table. - await new Promise(r => setTimeout(r, 3000)); + await new Promise(r => setTimeout(r, 1800)); const dbPath = path.join(DATA_DIR, 'db', 'remote_display.db'); const db = new Database(dbPath, { readonly: true }); const count = db.prepare('SELECT COUNT(*) c FROM event_loop_lag').get().c; diff --git a/server/test/reconnect-throttle-integration.test.js b/server/test/reconnect-throttle-integration.test.js new file mode 100644 index 0000000..59933d8 --- /dev/null +++ b/server/test/reconnect-throttle-integration.test.js @@ -0,0 +1,113 @@ +'use strict'; + +// #142 step 3 — REQUIRED GATE TEST + storm + neighbor, over real sockets. +// +// Boots the real server with warm-up ACTIVE (default) so the whole suite runs in +// the cold-start window — the exact "right after a deploy" scenario. Hard ceiling +// and window are tightened so the storm trips quickly without thousands of connects; +// fleet devices stay well under the ceiling. + +const { test, before, after } = require('node:test'); +const assert = require('node:assert/strict'); +const { spawn } = require('node:child_process'); +const path = require('node:path'); +const os = require('node:os'); +const fs = require('node:fs'); +const crypto = require('node:crypto'); +const ioClient = require('socket.io-client'); + +const PORT = 3983; +const BASE = `http://127.0.0.1:${PORT}`; +const DATA_DIR = path.join(os.tmpdir(), 'st-thr-int-' + crypto.randomBytes(4).toString('hex')); +const LOG = path.join(os.tmpdir(), 'st-thr-int-' + crypto.randomBytes(4).toString('hex') + '.log'); +let proc; + +before(async () => { + const logFd = fs.openSync(LOG, 'w'); + proc = spawn('node', ['server.js'], { + cwd: path.join(__dirname, '..'), + env: { + ...process.env, DATA_DIR, SELF_HOSTED: 'true', PORT: String(PORT), NODE_ENV: 'test', + // warm-up left at default (30s) so the whole test runs in the cold-start window + RECONNECT_HARD_CEILING: '8', + RECONNECT_WINDOW_MS: '5000', + RECONNECT_BASE_MAX: '3', + }, + stdio: ['ignore', logFd, logFd], + }); + let up = false; + for (let i = 0; i < 80; i++) { + try { const r = await fetch(BASE + '/api/status'); if (r.ok) { up = true; break; } } catch { /* */ } + await new Promise(r => setTimeout(r, 250)); + } + if (!up) throw new Error('server did not boot:\n' + fs.readFileSync(LOG, 'utf8').slice(-2000)); +}); + +after(() => { try { proc.kill('SIGKILL'); } catch { /* */ } }); + +// Provision a brand-new device via a UNIQUE pairing code -> returns {device_id, device_token}. +function provision() { + const code = String(crypto.randomInt(100000, 1000000)); + return new Promise((resolve) => { + const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true }); + sock.on('connect', () => sock.emit('device:register', { pairing_code: code })); + sock.on('device:registered', (d) => { try { sock.close(); } catch { /* */ } resolve({ id: d.device_id, token: d.device_token }); }); + setTimeout(() => { try { sock.close(); } catch { /* */ } resolve(null); }, 4000); + }); +} + +// One genuine reconnect (new socket). Resolves {registered, throttled}. +function reconnect(dev) { + return new Promise((resolve) => { + const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true }); + let done = false; + const finish = (r) => { if (done) return; done = true; try { sock.close(); } catch { /* */ } resolve(r); }; + sock.on('connect', () => sock.emit('device:register', { device_id: dev.id, device_token: dev.token, device_info: { app_version: 'test' } })); + sock.on('device:registered', () => finish({ registered: true, throttled: false })); + sock.on('device:throttled', () => finish({ registered: false, throttled: true })); + setTimeout(() => finish({ registered: false, throttled: false }), 1500); + }); +} + +test('GATE: full-fleet reconnect right after restart throttles NO healthy device', async () => { + // 12 distinct devices, each reconnecting twice in quick succession — a deploy-time + // herd. The loop is transiently busy, but per-device keying means none is flagged. + const fleet = []; + for (let i = 0; i < 12; i++) { const d = await provision(); assert.ok(d, 'device provisioned'); fleet.push(d); } + + let registered = 0, throttled = 0; + // two reconnect rounds across the whole fleet + for (let round = 0; round < 2; round++) { + const results = await Promise.all(fleet.map(reconnect)); + for (const r of results) { if (r.registered) registered++; if (r.throttled) throttled++; } + } + assert.equal(throttled, 0, 'NO healthy fleet device may be throttled at cold start'); + assert.equal(registered, 24, 'every fleet reconnect registered'); +}); + +test('a single device storming IS throttled (backoff engages)', async () => { + const dev = await provision(); + assert.ok(dev); + let registered = 0, throttled = 0; + // 12 sequential reconnects within the 5s window -> exceeds the hard ceiling (8) + for (let i = 0; i < 12; i++) { + const r = await reconnect(dev); + if (r.registered) registered++; + if (r.throttled) throttled++; + } + assert.ok(throttled >= 1, `storming device must be throttled (got ${throttled} throttle(s))`); + assert.ok(registered < 12, `not all storm reconnects should succeed (got ${registered}/12)`); +}); + +test('neighbor isolation: a healthy device is unaffected while another storms', async () => { + const stormer = await provision(); + const neighbor = await provision(); + assert.ok(stormer && neighbor); + // storm the stormer hard + for (let i = 0; i < 12; i++) await reconnect(stormer); + // neighbor reconnects normally a couple of times -> must still register + const a = await reconnect(neighbor); + const b = await reconnect(neighbor); + assert.ok(a.registered && b.registered, 'neighbor must register normally while another device storms'); + assert.ok(!a.throttled && !b.throttled, 'neighbor must not be throttled by another device'); +}); diff --git a/server/test/reconnect-throttle.test.js b/server/test/reconnect-throttle.test.js new file mode 100644 index 0000000..819a294 --- /dev/null +++ b/server/test/reconnect-throttle.test.js @@ -0,0 +1,98 @@ +'use strict'; + +// #142 step 3 — deterministic unit tests for the per-device reconnect throttle. +// Pure logic with injected `now` / band; isolate the DB before require (the module +// pulls in services/loop-lag -> db/database which initialises a DB on load). + +const os = require('node:os'); +const path = require('node:path'); +const crypto = require('node:crypto'); +process.env.DATA_DIR = path.join(os.tmpdir(), 'st-thr-unit-' + crypto.randomBytes(4).toString('hex')); + +const { test, beforeEach } = require('node:test'); +const assert = require('node:assert/strict'); +const throttle = require('../lib/reconnect-throttle'); + +// config defaults: window=10000, baseMax=5, hardCeiling=20, baseBackoff=1000, +// maxBackoff=60000, releaseMs=30000, warmup=30000, elevMult=2, critMult=4. +const T0 = 1_000_000; // arbitrary epoch-ms origin for the warm-up clock +const POST = T0 + 40_000; // safely past the 30s warm-up +const WARM = T0 + 1_000; // inside the warm-up window + +beforeEach(() => throttle.__resetForTest({ startedAt: T0 })); + +test('healthy device is never throttled (<= baseMax genuine reconnects)', () => { + for (let i = 0; i < 5; i++) { + const v = throttle.check('A', POST + i, 'normal'); + assert.ok(v.allow, `reconnect ${i + 1} (<=baseMax) must be allowed`); + } +}); + +test('a per-device storm IS throttled and the backoff GROWS (tighten fast)', () => { + let v; + for (let i = 0; i < 5; i++) v = throttle.check('B', POST + i, 'normal'); // 5 allowed + v = throttle.check('B', POST + 5, 'normal'); // 6th -> flagged + assert.equal(v.allow, false); + assert.equal(v.reason, 'rate'); + assert.equal(v.observed, 6); + assert.equal(v.allowed, 5); + const b1 = v.retryAfterMs; + // keep hammering while blocked -> escalate, longer backoff each time + const b2 = throttle.check('B', POST + 6, 'normal').retryAfterMs; + const b3 = throttle.check('B', POST + 7, 'normal').retryAfterMs; + assert.ok(b2 > b1 && b3 > b2, `backoff must grow: ${b1} < ${b2} < ${b3}`); +}); + +test('lag band multiplies an already-flagged device\'s backoff (critical > normal)', () => { + let v; + for (let i = 0; i < 5; i++) throttle.check('N', POST + i, 'normal'); + v = throttle.check('N', POST + 5, 'normal'); + const normalBackoff = v.retryAfterMs; + + throttle.__resetForTest({ startedAt: T0 }); + for (let i = 0; i < 5; i++) throttle.check('C', POST + i, 'critical'); + v = throttle.check('C', POST + 5, 'critical'); + assert.ok(v.retryAfterMs > normalBackoff, `critical backoff ${v.retryAfterMs} > normal ${normalBackoff}`); +}); + +test('a healthy device is NOT throttled even when the band is critical (lag never gates the healthy)', () => { + for (let i = 0; i < 5; i++) { + const v = throttle.check('H', POST + i, 'critical'); + assert.ok(v.allow, 'healthy device stays allowed regardless of band'); + } +}); + +test('COLD START: during warm-up, moderate flapping (>baseMax, { + for (let i = 0; i < 12; i++) { // 12 > baseMax(5) but < hardCeiling(20) + const v = throttle.check('W', WARM + i, 'critical'); // band forced normal in warm-up anyway + assert.ok(v.allow, `warm-up reconnect ${i + 1} must be lenient`); + } +}); + +test('HARD CEILING is enforced even during warm-up (slow-ramp cannot train through)', () => { + let v; + for (let i = 0; i < 20; i++) { + v = throttle.check('K', WARM + i, 'normal'); + assert.ok(v.allow, `warm-up reconnect ${i + 1} (<=ceiling) allowed`); + } + v = throttle.check('K', WARM + 20, 'normal'); // 21st -> over ceiling(20) + assert.equal(v.allow, false); + assert.equal(v.reason, 'hard-ceiling'); +}); + +test('neighbor isolation: one device storming does not throttle another', () => { + for (let i = 0; i < 10; i++) throttle.check('STORM', POST + i, 'normal'); // STORM gets throttled + const v = throttle.check('NEIGHBOR', POST + 11, 'normal'); + assert.ok(v.allow, 'a different device must be unaffected'); +}); + +test('release slow: escalation level decays after a calm period', () => { + let v; + for (let i = 0; i < 6; i++) v = throttle.check('R', POST + i, 'normal'); // flagged, level 1 + assert.ok(v.level >= 1); + const peak = v.level; + // a calm reconnect well past the window AND past releaseMs(30000) + v = throttle.check('R', POST + 6 + 40_000, 'normal'); + assert.ok(v.allow, 'calm reconnect after the storm is allowed'); + assert.ok(v.level < peak, `level decays after calm: ${v.level} < ${peak}`); +}); diff --git a/server/ws/deviceSocket.js b/server/ws/deviceSocket.js index b15c038..5bc9566 100644 --- a/server/ws/deviceSocket.js +++ b/server/ws/deviceSocket.js @@ -6,6 +6,7 @@ const { db, pruneTelemetry, pruneScreenshots } = require('../db/database'); const config = require('../config'); const heartbeat = require('../services/heartbeat'); const commandQueue = require('../lib/command-queue'); +const reconnectThrottle = require('../lib/reconnect-throttle'); // Debounce window for marking a device offline on socket disconnect. Brief // flap (Wi-Fi blip, Engine.IO ping miss, server-side eviction-then-reconnect) @@ -353,6 +354,23 @@ module.exports = function setupDeviceSocket(io) { return; } + // #142: per-device reconnect throttle. Only GENUINE reconnects (a new + // socket) count — same-socket playlist refreshes (isPlaylistRefresh) are + // exempt. This runs BEFORE the heavy register work (DB writes, playlist + // build) so a single flapping device cannot saturate the event loop. The + // verdict is per-device; global lag only scales an already-flagged + // device's backoff, never gates a healthy one. + if (!isPlaylistRefresh) { + const verdict = reconnectThrottle.check(device_id); + if (!verdict.allow) { + console.warn(`[throttle] device ${device_id} reconnect throttled: reason=${verdict.reason} band=${verdict.band} observed=${verdict.observed}/${verdict.allowed} per ${config.reconnectWindowMs}ms -> backoff ${verdict.retryAfterMs}ms (level ${verdict.level})`); + socket.emit('device:throttled', { retry_after_ms: verdict.retryAfterMs, reason: 'reconnect_rate' }); + // nextTick disconnect so the throttle notice flushes first. + process.nextTick(() => { try { socket.disconnect(true); } catch (_) { /* */ } }); + return; + } + } + currentDeviceId = device_id; authenticated = true; // Cancel any pending offline timer - device is back in the grace window