diff --git a/server/config.js b/server/config.js index d0cf823..1c6d26d 100644 --- a/server/config.js +++ b/server/config.js @@ -149,4 +149,14 @@ module.exports = { // (device_id, content_id, status) reports within this window. A status CHANGE // has a different key and passes immediately. In-memory; resets on restart. contentAckDedupMs: parseInt(process.env.CONTENT_ACK_DEDUP_MS) || 10000, + + // #143 content-ack RATE budget (lib/content-ack-limiter.js), layered on top of the + // dedup above. Caps TOTAL acks per device per window REGARDLESS of differing + // content_id — the flood the dedup misses (a device cycling 2-4 ids makes every + // ack look unique, so dedup never fires, yet aggregate volume blocks the loop). + // TUNING GUESSES — validate against Bold's real fleet. Legit playlist cadence is + // roughly <=1 ack/s/device; the flood is many/s. 20 per 10s (=2/s) sits above + // legit and below the flood. Easy to retune via env. + contentAckMaxPerWindow: parseInt(process.env.CONTENT_ACK_MAX_PER_WINDOW) || 20, + contentAckRateWindowMs: parseInt(process.env.CONTENT_ACK_RATE_WINDOW_MS) || 10000, }; diff --git a/server/lib/content-ack-limiter.js b/server/lib/content-ack-limiter.js new file mode 100644 index 0000000..36cf8ff --- /dev/null +++ b/server/lib/content-ack-limiter.js @@ -0,0 +1,64 @@ +// #143 — content-ack flood control (the single control on the content-ack path). +// +// Folds three concerns into ONE per-device limiter so there are no competing +// limiters on this path (reconnect-throttle.js is left untouched): +// 1. #142 dedup — drop an exact (content_id, status) repeat within the dedup +// window. Legit repeat suppression; does NOT consume rate budget. +// 2. #143 per-device RATE budget — cap TOTAL non-duplicate acks per device per +// window regardless of differing content_id. This is what dedup misses: a +// device cycling 2-4 ids makes each ack look unique, so dedup never fires, +// but aggregate volume still floods the loop. Over budget -> shed silently. +// 3. #143 global pressure valve — when loop-lag (services/loop-lag.js) reports +// the CRITICAL band, shed non-essential acks even for a device within its own +// budget. Reuses the existing band + hysteresis; never fires below critical. +// +// Per-device, in-memory, resets on restart (like lastPlayLogAt / pair-lockout). +// Fixed window (counter reset per window) — simple and makes "log once per window" +// natural. `band` is injected so this is testable without the loop-lag monitor. + +const config = require('../config'); + +// deviceId -> { winStart, count, shedNotified, dup: Map(content|status -> ts) } +const state = new Map(); + +// Returns one of: +// { action: 'pass' } -> caller logs + emits +// { action: 'dedup' } -> drop (exact repeat) +// { action: 'shed-rate', logStart, observed, budget } -> drop (over per-device budget) +// { action: 'shed-valve' } -> drop (global critical-lag valve) +function check(deviceId, contentId, status, band = 'normal', now = Date.now()) { + let s = state.get(deviceId); + if (!s) { s = { winStart: now, count: 0, shedNotified: false, dup: new Map() }; state.set(deviceId, s); } + + // Roll the fixed rate window. + if (now - s.winStart >= config.contentAckRateWindowMs) { + s.winStart = now; + s.count = 0; + s.shedNotified = false; + // Bound the dedup map: drop entries older than the dedup window. + for (const [k, t] of s.dup) if (now - t >= config.contentAckDedupMs) s.dup.delete(k); + } + + // 1) Dedup — exact (content, status) repeat within the dedup window. Does NOT + // consume rate budget (it's a legit repeat we simply suppress). + const key = `${contentId}|${status}`; + if (now - (s.dup.get(key) || 0) < config.contentAckDedupMs) return { action: 'dedup' }; + s.dup.set(key, now); + + // 2) Per-device rate budget — always applies, counts all non-duplicate acks. + s.count++; + if (s.count > config.contentAckMaxPerWindow) { + const logStart = !s.shedNotified; // log ONCE per device per window when shedding starts + s.shedNotified = true; + return { action: 'shed-rate', logStart, observed: s.count, budget: config.contentAckMaxPerWindow }; + } + + // 3) Global valve — extra shedding only under critical lag; a within-budget device + // in a non-critical band is never touched here. + if (band === 'critical') return { action: 'shed-valve' }; + + return { action: 'pass' }; +} + +function reset() { state.clear(); } // tests +module.exports = { check, reset }; diff --git a/server/services/loop-lag.js b/server/services/loop-lag.js index b7f45f8..f160ee0 100644 --- a/server/services/loop-lag.js +++ b/server/services/loop-lag.js @@ -79,6 +79,14 @@ function sample() { const tag = band !== prev ? ` (was ${prev})` : ''; console.log(`[loop-lag] band=${band}${tag} mean=${snap.mean_ms}ms p99=${snap.p99_ms}ms max=${snap.max_ms}ms`); } + + // #143 global pressure valve — log ONLY the band edge (open/close), not per shed + // message. When critical, deviceSocket sheds non-essential acks (it reads getBand()). + if (band === 'critical' && prev !== 'critical') { + console.warn(`[shed] global valve OPEN — loop-lag critical (p99=${snap.p99_ms}ms); shedding non-essential device messages (content-acks). reconnects + dashboard still processed.`); + } else if (prev === 'critical' && band !== 'critical') { + console.log(`[shed] global valve CLOSED — loop-lag recovered (band=${band}, p99=${snap.p99_ms}ms)`); + } } function pruneLag() { diff --git a/server/test/content-ack-flood.test.js b/server/test/content-ack-flood.test.js new file mode 100644 index 0000000..934bd4e --- /dev/null +++ b/server/test/content-ack-flood.test.js @@ -0,0 +1,87 @@ +'use strict'; + +// #143 integration — a device cycling DIFFERENT content ids at high rate (the case +// dedup misses) is rate-limited; an under-budget device passes every ack. Observed +// via the server log: passing acks log `Device content : ready`; over- +// budget drops are silent except ONE `[content-ack] shedding device ` line. +// Normal band (default lag thresholds), unique PORT 3985. + +const { test, before, after } = require('node:test'); +const assert = require('node:assert/strict'); +const { spawn } = require('node:child_process'); +const path = require('node:path'); +const os = require('node:os'); +const fs = require('node:fs'); +const crypto = require('node:crypto'); +const ioClient = require('socket.io-client'); + +const PORT = 3985; +const BASE = `http://127.0.0.1:${PORT}`; +const DATA_DIR = path.join(os.tmpdir(), 'st-flood-' + crypto.randomBytes(4).toString('hex')); +const LOG = path.join(os.tmpdir(), 'st-flood-' + crypto.randomBytes(4).toString('hex') + '.log'); +let proc; +const sleep = (ms) => new Promise(r => setTimeout(r, ms)); + +before(async () => { + const logFd = fs.openSync(LOG, 'w'); + proc = spawn('node', ['server.js'], { + cwd: path.join(__dirname, '..'), + env: { + ...process.env, DATA_DIR, SELF_HOSTED: 'true', PORT: String(PORT), NODE_ENV: 'test', + CONTENT_ACK_MAX_PER_WINDOW: '5', CONTENT_ACK_RATE_WINDOW_MS: '10000', CONTENT_ACK_DEDUP_MS: '50', + }, + stdio: ['ignore', logFd, logFd], + }); + let up = false; + for (let i = 0; i < 80; i++) { try { const r = await fetch(BASE + '/api/status'); if (r.ok) { up = true; break; } } catch { /* */ } await sleep(250); } + if (!up) throw new Error('server did not boot:\n' + fs.readFileSync(LOG, 'utf8').slice(-2000)); +}); +after(() => { try { proc.kill('SIGKILL'); } catch { /* */ } }); + +function provision() { + const code = String(crypto.randomInt(100000, 1000000)); + return new Promise((resolve) => { + const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true }); + sock.on('connect', () => sock.emit('device:register', { pairing_code: code })); + sock.on('device:registered', (d) => { try { sock.close(); } catch { /* */ } resolve({ id: d.device_id, token: d.device_token }); }); + setTimeout(() => resolve(null), 4000); + }); +} +function openRegistered(dev) { + return new Promise((resolve, reject) => { + const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true }); + sock.on('connect', () => sock.emit('device:register', { device_id: dev.id, device_token: dev.token, device_info: { app_version: 'test' } })); + sock.on('device:registered', () => resolve(sock)); + sock.on('device:auth-error', () => reject(new Error('auth-error'))); + setTimeout(() => reject(new Error('register timeout')), 4000); + }); +} +const linesFor = (id, needle) => fs.readFileSync(LOG, 'utf8').split('\n').filter(l => l.includes(id) && l.includes(needle)).length; + +test('#143: cycling 4 different content ids at high rate is rate-limited (dedup misses it)', async () => { + const dev = await provision(); + assert.ok(dev, 'provisioned'); + const sock = await openRegistered(dev); + const ids = ['a', 'b', 'c', 'd']; + // 15 acks, ~100ms apart -> each id repeats every ~400ms (> 50ms dedup) so dedup + // never fires; budget is 5/10s, so only 5 pass and the rest are shed. + for (let i = 0; i < 15; i++) { sock.emit('device:content-ack', { device_id: dev.id, content_id: ids[i % 4], status: 'ready' }); await sleep(100); } + await sleep(400); + try { sock.close(); } catch { /* */ } + + const passed = linesFor(dev.id, 'content '); // `Device content : ready` + const shedStart = linesFor(dev.id, 'shedding device'); + assert.equal(passed, 5, `exactly the budget (5) passed/logged, got ${passed}`); + assert.ok(shedStart >= 1, 'a single shed-start line was logged when flood control engaged'); + assert.ok(shedStart === 1, `shed-start logged ONCE per window (no per-drop flood), got ${shedStart}`); +}); + +test('a device under budget has every ack processed', async () => { + const dev = await provision(); + const sock = await openRegistered(dev); + for (const id of ['p', 'q', 'r', 's']) { sock.emit('device:content-ack', { device_id: dev.id, content_id: id, status: 'ready' }); await sleep(60); } + await sleep(300); + try { sock.close(); } catch { /* */ } + assert.equal(linesFor(dev.id, 'content '), 4, 'all 4 under-budget acks processed'); + assert.equal(linesFor(dev.id, 'shedding device'), 0, 'no shedding for an under-budget device'); +}); diff --git a/server/test/content-ack-limiter.test.js b/server/test/content-ack-limiter.test.js new file mode 100644 index 0000000..3e35293 --- /dev/null +++ b/server/test/content-ack-limiter.test.js @@ -0,0 +1,91 @@ +'use strict'; + +// #143 — deterministic unit tests for the folded content-ack limiter (dedup + +// per-device rate budget + global critical-lag valve). Injected `now`/`band`, no +// sockets. DATA_DIR set before require so config's jwt-secret write goes to a temp +// dir (not the repo). Rate params pinned via env for clarity. + +const os = require('node:os'); +const path = require('node:path'); +const crypto = require('node:crypto'); +process.env.DATA_DIR = path.join(os.tmpdir(), 'st-ack143-' + crypto.randomBytes(4).toString('hex')); +process.env.CONTENT_ACK_MAX_PER_WINDOW = '5'; +process.env.CONTENT_ACK_RATE_WINDOW_MS = '10000'; +process.env.CONTENT_ACK_DEDUP_MS = '2000'; + +const { test, beforeEach } = require('node:test'); +const assert = require('node:assert/strict'); +const limiter = require('../lib/content-ack-limiter'); + +const T0 = 1_000_000; +beforeEach(() => limiter.reset()); + +function tally(results) { + const t = { pass: 0, dedup: 0, 'shed-rate': 0, 'shed-valve': 0, logStarts: 0 }; + for (const r of results) { t[r.action]++; if (r.logStart) t.logStarts++; } + return t; +} + +test('#143 REGRESSION: cycling 2-4 DIFFERENT content ids (evading dedup) IS rate-limited', () => { + // 4 ids, spaced 600ms so each id repeats every 2400ms (> 2000ms dedup) -> dedup + // never fires (the exact case it misses), so the RATE budget must cap them. + const ids = ['a', 'b', 'c', 'd']; + const out = []; + for (let i = 0; i < 12; i++) out.push(limiter.check('dev', ids[i % 4], 'ready', 'normal', T0 + i * 600)); + const t = tally(out); + assert.equal(t.dedup, 0, 'dedup must NOT mask the cycling flood (proves rate limiter is what caps)'); + assert.equal(t.pass, 5, 'first MAX(5) pass'); + assert.equal(t['shed-rate'], 7, 'the remaining 7 are rate-shed'); + assert.equal(t.logStarts, 1, 'shedding logged exactly once per device per window'); +}); + +test('unique ids (dedup never matches): under budget passes, over budget sheds', () => { + const out = []; + for (let i = 0; i < 8; i++) out.push(limiter.check('dev', 'u' + i, 'ready', 'normal', T0 + i * 10)); + const t = tally(out); + assert.equal(t.pass, 5); + assert.equal(t['shed-rate'], 3); +}); + +test('a device exactly at budget passes every ack', () => { + for (let i = 0; i < 5; i++) { + const v = limiter.check('dev', 'u' + i, 'ready', 'normal', T0 + i * 10); + assert.equal(v.action, 'pass', `ack ${i + 1} (<=budget) passes`); + } +}); + +test('dedup still works AND does not consume rate budget (no regression)', () => { + assert.equal(limiter.check('dev', 'a', 'ready', 'normal', T0).action, 'pass'); // count 1 + assert.equal(limiter.check('dev', 'a', 'ready', 'normal', T0 + 100).action, 'dedup'); // repeat -> dedup + assert.equal(limiter.check('dev', 'a', 'ready', 'normal', T0 + 200).action, 'dedup'); // repeat -> dedup + // budget should still have 4 left (the 2 dedups didn't count): 4 distinct pass, 5th sheds + for (const id of ['b', 'c', 'd', 'e']) assert.equal(limiter.check('dev', id, 'ready', 'normal', T0 + 300).action, 'pass'); + assert.equal(limiter.check('dev', 'f', 'ready', 'normal', T0 + 300).action, 'shed-rate', 'budget consumed only by non-duplicates'); +}); + +test('global valve: CRITICAL sheds an in-budget device; NORMAL leaves it untouched', () => { + assert.equal(limiter.check('A', 'x', 'ready', 'critical', T0).action, 'shed-valve', 'critical sheds even under budget'); + assert.equal(limiter.check('B', 'x', 'ready', 'normal', T0).action, 'pass', 'normal-band healthy device is unaffected by the valve'); + // a healthy device under normal band is never valve-touched across many acks + for (let i = 0; i < 5; i++) assert.equal(limiter.check('C', 'u' + i, 'ready', 'normal', T0 + i).action, 'pass'); +}); + +test('over-budget rate shedding takes precedence over the valve', () => { + let v; + for (let i = 0; i < 6; i++) v = limiter.check('dev', 'u' + i, 'ready', 'critical', T0 + i); + assert.equal(v.action, 'shed-rate', 'a flooding device reports rate shedding, not just valve'); +}); + +test('window reset: count + shed-notified reset, so a new window logs shedding again', () => { + for (let i = 0; i < 7; i++) limiter.check('dev', 'u' + i, 'ready', 'normal', T0 + i * 10); // sheds, logs once + // next window (>10s later): fresh budget + for (let i = 0; i < 5; i++) assert.equal(limiter.check('dev', 'w' + i, 'ready', 'normal', T0 + 11000 + i).action, 'pass'); + const v = limiter.check('dev', 'w9', 'ready', 'normal', T0 + 11000 + 100); + assert.equal(v.action, 'shed-rate'); + assert.equal(v.logStart, true, 'shedding is logged once in the NEW window too'); +}); + +test('per-device isolation: one device flooding does not shed another', () => { + for (let i = 0; i < 10; i++) limiter.check('STORM', 'u' + i, 'ready', 'normal', T0 + i); // STORM sheds + assert.equal(limiter.check('NEIGHBOR', 'x', 'ready', 'normal', T0 + 11).action, 'pass'); +}); diff --git a/server/test/content-ack-valve.test.js b/server/test/content-ack-valve.test.js new file mode 100644 index 0000000..31fb8a2 --- /dev/null +++ b/server/test/content-ack-valve.test.js @@ -0,0 +1,88 @@ +'use strict'; + +// #143 Step 2 integration — the global loop-lag pressure valve. Lag thresholds are +// forced to 0 so the band is CRITICAL from the first sample; rate budget is set high +// so we isolate the VALVE (not the per-device rate limit). Under critical: content- +// acks are shed (no log/emit), while a reconnect AND an HTTP/dashboard request still +// process, and the valve edge is logged once. Unique PORT 3986. + +const { test, before, after } = require('node:test'); +const assert = require('node:assert/strict'); +const { spawn } = require('node:child_process'); +const path = require('node:path'); +const os = require('node:os'); +const fs = require('node:fs'); +const crypto = require('node:crypto'); +const ioClient = require('socket.io-client'); + +const PORT = 3986; +const BASE = `http://127.0.0.1:${PORT}`; +const DATA_DIR = path.join(os.tmpdir(), 'st-valve-' + crypto.randomBytes(4).toString('hex')); +const LOG = path.join(os.tmpdir(), 'st-valve-' + crypto.randomBytes(4).toString('hex') + '.log'); +let proc; +const sleep = (ms) => new Promise(r => setTimeout(r, ms)); + +before(async () => { + const logFd = fs.openSync(LOG, 'w'); + proc = spawn('node', ['server.js'], { + cwd: path.join(__dirname, '..'), + env: { + ...process.env, DATA_DIR, SELF_HOSTED: 'true', PORT: String(PORT), NODE_ENV: 'test', + LAG_CRITICAL_MS: '1', LAG_ELEVATED_MS: '1', LAG_SAMPLE_INTERVAL_MS: '150', // tiny thresholds (NOT 0 — config uses `|| default`, 0 is falsy) -> band critical immediately + CONTENT_ACK_MAX_PER_WINDOW: '1000', CONTENT_ACK_RATE_WINDOW_MS: '10000', // high, so the VALVE is what sheds + }, + stdio: ['ignore', logFd, logFd], + }); + let up = false; + for (let i = 0; i < 80; i++) { try { const r = await fetch(BASE + '/api/status'); if (r.ok) { up = true; break; } } catch { /* */ } await sleep(250); } + if (!up) throw new Error('server did not boot:\n' + fs.readFileSync(LOG, 'utf8').slice(-2000)); + await sleep(600); // let the valve open (>=1 sample at 150ms in the critical band) +}); +after(() => { try { proc.kill('SIGKILL'); } catch { /* */ } }); + +function provision() { + const code = String(crypto.randomInt(100000, 1000000)); + return new Promise((resolve) => { + const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true }); + sock.on('connect', () => sock.emit('device:register', { pairing_code: code })); + sock.on('device:registered', (d) => { try { sock.close(); } catch { /* */ } resolve({ id: d.device_id, token: d.device_token }); }); + setTimeout(() => resolve(null), 4000); + }); +} +function openRegistered(dev) { + return new Promise((resolve) => { + const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true }); + let done = false; const finish = (v) => { if (!done) { done = true; resolve(v); } }; + sock.on('connect', () => sock.emit('device:register', { device_id: dev.id, device_token: dev.token, device_info: { app_version: 'test' } })); + sock.on('device:registered', () => finish({ sock, registered: true })); + sock.on('device:auth-error', () => finish({ sock, registered: false })); + setTimeout(() => finish({ sock, registered: false }), 4000); + }); +} + +test('valve OPEN under critical: content-acks are shed, reconnect + HTTP still process', async () => { + const log0 = fs.readFileSync(LOG, 'utf8'); + assert.ok(/\[shed\] global valve OPEN/.test(log0), 'valve edge logged once on entering critical'); + + // 1) content-acks under critical -> shed (no log/emit) + const dev = await provision(); + const { sock, registered } = await openRegistered(dev); + assert.ok(registered, 'a device can still register/reconnect under critical (reconnects always processed)'); + for (let i = 0; i < 6; i++) { sock.emit('device:content-ack', { device_id: dev.id, content_id: 'v' + i, status: 'ready' }); await sleep(40); } + await sleep(300); + try { sock.close(); } catch { /* */ } + const contentLines = fs.readFileSync(LOG, 'utf8').split('\n').filter(l => l.includes(dev.id) && l.includes('content ')).length; + assert.equal(contentLines, 0, 'all content-acks shed by the valve under critical (none logged/emitted)'); + + // 2) a fresh reconnect still registers (reconnects are never shed) + const dev2 = await provision(); + const r2 = await openRegistered(dev2); + assert.ok(r2.registered, 'reconnect processed under critical'); + try { r2.sock.close(); } catch { /* */ } + + // 3) HTTP / dashboard path still serves under critical + const res = await fetch(BASE + '/api/status'); + assert.equal(res.status, 200, 'HTTP/dashboard requests always processed'); + const body = await res.json(); + assert.equal(body.loop_lag.band, 'critical', 'sanity: band really is critical'); +}); diff --git a/server/ws/deviceSocket.js b/server/ws/deviceSocket.js index 7788dce..b9348ac 100644 --- a/server/ws/deviceSocket.js +++ b/server/ws/deviceSocket.js @@ -7,6 +7,8 @@ const config = require('../config'); const heartbeat = require('../services/heartbeat'); const commandQueue = require('../lib/command-queue'); const reconnectThrottle = require('../lib/reconnect-throttle'); +const contentAckLimiter = require('../lib/content-ack-limiter'); +const loopLag = require('../services/loop-lag'); // Debounce window for marking a device offline on socket disconnect. Brief // flap (Wi-Fi blip, Engine.IO ping miss, server-side eviction-then-reconnect) @@ -29,11 +31,9 @@ const OFFLINE_DEBOUNCE_MS = 5000; const lastPlayLogAt = new Map(); const PLAY_LOG_MIN_GAP_MS = 2000; -// #142 content-ack dedup. An older app can spam "content : ready" for the same -// item; each was logged + emitted individually (secondary load). Suppress identical -// (device_id, content_id, status) reports within config.contentAckDedupMs. A status -// CHANGE has a different key and passes immediately. In-memory; resets on restart. -const lastContentAck = new Map(); +// #142 dedup + #143 per-device rate budget + global loop-lag valve for content-acks +// all live in one control: lib/content-ack-limiter.js (required above as +// contentAckLimiter). Kept out of this file so there is a single limiter on the path. const { getUserPlan, getUserDeviceCount } = require('../middleware/subscription'); // Phase 2.3: deviceRoom() resolves a device_id to its workspace room so // dashboardNs.emit can be scoped instead of broadcast platform-wide. @@ -585,13 +585,18 @@ module.exports = function setupDeviceSocket(io) { if (!requireDeviceAuth()) return; const { device_id, content_id, status } = data; if (device_id !== currentDeviceId) return; - // #142: drop repeats of the same (device, content, status) within the dedup - // window. Only a change (new content/status) or a report after the window - // logs+emits, so a device spamming the same "ready" can't add load. - const ackKey = `${device_id}|${content_id}|${status}`; - const nowAck = Date.now(); - if (nowAck - (lastContentAck.get(ackKey) || 0) < config.contentAckDedupMs) return; - lastContentAck.set(ackKey, nowAck); + // #142 dedup + #143 per-device rate budget + global critical-lag valve, in one + // control. Anything but 'pass' is dropped BEFORE the log+emit (that per-ack work + // is the cost we shed). Drops are SILENT except a single line per device per + // window when rate-shedding STARTS (re-logging per drop would recreate the + // flood). The valve's open/close is logged once at the band edge in loop-lag. + const verdict = contentAckLimiter.check(device_id, content_id, status, loopLag.getBand()); + if (verdict.action !== 'pass') { + if (verdict.action === 'shed-rate' && verdict.logStart) { + console.warn(`[content-ack] shedding device ${device_id}: ${verdict.observed}/${verdict.budget} per ${config.contentAckRateWindowMs}ms — flood control engaged`); + } + return; + } console.log(`Device ${device_id} content ${content_id}: ${status}`); emitToDeviceWorkspace(dashboardNs, device_id, 'dashboard:content-ack', { device_id, content_id, status }); });