fix(#143): content-ack flood control — per-device rate budget + loop-lag valve

#142's content-ack dedup is insufficient: a device cycling 2-4 content IDs makes
every ack look unique so dedup never fires, while aggregate volume from ~30 devices
saturates the event loop (the #142 reconnect throttle kept the server responsive,
which is how this was even observable).

Folded ONE control on the content-ack path (no competing limiters; reconnect-
throttle.js untouched) in lib/content-ack-limiter.js:
- Step 1 — per-device RATE budget: caps TOTAL non-duplicate acks per device per
  window regardless of differing content_id (the case dedup misses). Over budget =
  DROP silently (the per-ack log+emit is the cost); log ONCE per device per window
  when shedding starts. Keeps the #142 dedup (dedup'd repeats don't consume budget).
  Per-device, in-memory, resets on restart (modeled on lastPlayLogAt; does NOT reuse
  reconnect-throttle's ban-semantics bucket).
  Env (TUNING GUESSES, validate vs Bold's fleet): CONTENT_ACK_MAX_PER_WINDOW=20,
  CONTENT_ACK_RATE_WINDOW_MS=10000 (=2/s, above legit ~<=1/s, below the flood).
- Step 2 — global pressure valve: reuses the #142 loop-lag band (+ its hysteresis,
  no second control loop). Under CRITICAL band, shed content-acks even for an
  in-budget device; reconnects + dashboard/HTTP are ALWAYS processed; a healthy
  device in a non-critical band is never touched by the valve. Valve open/close
  logged once at the band edge in services/loop-lag.js (not per shed message).

Tests (unique ports 3985/3986, not the 3982/3983/3984 set):
- unit: the #143 regression (cycling ids evading dedup IS rate-limited), under/over
  budget, dedup still works + doesn't consume budget, valve sheds in-budget under
  critical while normal is untouched, rate precedence, window reset, per-device
  isolation.
- integration: socket flood is capped to budget with a single shed-start log;
  under-budget passes every ack; valve OPEN sheds content-acks while a reconnect +
  /api/status still succeed.
Full suite green serial AND parallel (208 tests).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
ScreenTinker 2026-06-27 22:21:57 -05:00
parent d9fb914b9e
commit dbac699854
7 changed files with 365 additions and 12 deletions

View file

@ -149,4 +149,14 @@ module.exports = {
// (device_id, content_id, status) reports within this window. A status CHANGE
// has a different key and passes immediately. In-memory; resets on restart.
contentAckDedupMs: parseInt(process.env.CONTENT_ACK_DEDUP_MS) || 10000,
// #143 content-ack RATE budget (lib/content-ack-limiter.js), layered on top of the
// dedup above. Caps TOTAL acks per device per window REGARDLESS of differing
// content_id — the flood the dedup misses (a device cycling 2-4 ids makes every
// ack look unique, so dedup never fires, yet aggregate volume blocks the loop).
// TUNING GUESSES — validate against Bold's real fleet. Legit playlist cadence is
// roughly <=1 ack/s/device; the flood is many/s. 20 per 10s (=2/s) sits above
// legit and below the flood. Easy to retune via env.
contentAckMaxPerWindow: parseInt(process.env.CONTENT_ACK_MAX_PER_WINDOW) || 20,
contentAckRateWindowMs: parseInt(process.env.CONTENT_ACK_RATE_WINDOW_MS) || 10000,
};

View file

@ -0,0 +1,64 @@
// #143 — content-ack flood control (the single control on the content-ack path).
//
// Folds three concerns into ONE per-device limiter so there are no competing
// limiters on this path (reconnect-throttle.js is left untouched):
// 1. #142 dedup — drop an exact (content_id, status) repeat within the dedup
// window. Legit repeat suppression; does NOT consume rate budget.
// 2. #143 per-device RATE budget — cap TOTAL non-duplicate acks per device per
// window regardless of differing content_id. This is what dedup misses: a
// device cycling 2-4 ids makes each ack look unique, so dedup never fires,
// but aggregate volume still floods the loop. Over budget -> shed silently.
// 3. #143 global pressure valve — when loop-lag (services/loop-lag.js) reports
// the CRITICAL band, shed non-essential acks even for a device within its own
// budget. Reuses the existing band + hysteresis; never fires below critical.
//
// Per-device, in-memory, resets on restart (like lastPlayLogAt / pair-lockout).
// Fixed window (counter reset per window) — simple and makes "log once per window"
// natural. `band` is injected so this is testable without the loop-lag monitor.
const config = require('../config');
// deviceId -> { winStart, count, shedNotified, dup: Map(content|status -> ts) }
const state = new Map();
// Returns one of:
// { action: 'pass' } -> caller logs + emits
// { action: 'dedup' } -> drop (exact repeat)
// { action: 'shed-rate', logStart, observed, budget } -> drop (over per-device budget)
// { action: 'shed-valve' } -> drop (global critical-lag valve)
function check(deviceId, contentId, status, band = 'normal', now = Date.now()) {
let s = state.get(deviceId);
if (!s) { s = { winStart: now, count: 0, shedNotified: false, dup: new Map() }; state.set(deviceId, s); }
// Roll the fixed rate window.
if (now - s.winStart >= config.contentAckRateWindowMs) {
s.winStart = now;
s.count = 0;
s.shedNotified = false;
// Bound the dedup map: drop entries older than the dedup window.
for (const [k, t] of s.dup) if (now - t >= config.contentAckDedupMs) s.dup.delete(k);
}
// 1) Dedup — exact (content, status) repeat within the dedup window. Does NOT
// consume rate budget (it's a legit repeat we simply suppress).
const key = `${contentId}|${status}`;
if (now - (s.dup.get(key) || 0) < config.contentAckDedupMs) return { action: 'dedup' };
s.dup.set(key, now);
// 2) Per-device rate budget — always applies, counts all non-duplicate acks.
s.count++;
if (s.count > config.contentAckMaxPerWindow) {
const logStart = !s.shedNotified; // log ONCE per device per window when shedding starts
s.shedNotified = true;
return { action: 'shed-rate', logStart, observed: s.count, budget: config.contentAckMaxPerWindow };
}
// 3) Global valve — extra shedding only under critical lag; a within-budget device
// in a non-critical band is never touched here.
if (band === 'critical') return { action: 'shed-valve' };
return { action: 'pass' };
}
function reset() { state.clear(); } // tests
module.exports = { check, reset };

View file

@ -79,6 +79,14 @@ function sample() {
const tag = band !== prev ? ` (was ${prev})` : '';
console.log(`[loop-lag] band=${band}${tag} mean=${snap.mean_ms}ms p99=${snap.p99_ms}ms max=${snap.max_ms}ms`);
}
// #143 global pressure valve — log ONLY the band edge (open/close), not per shed
// message. When critical, deviceSocket sheds non-essential acks (it reads getBand()).
if (band === 'critical' && prev !== 'critical') {
console.warn(`[shed] global valve OPEN — loop-lag critical (p99=${snap.p99_ms}ms); shedding non-essential device messages (content-acks). reconnects + dashboard still processed.`);
} else if (prev === 'critical' && band !== 'critical') {
console.log(`[shed] global valve CLOSED — loop-lag recovered (band=${band}, p99=${snap.p99_ms}ms)`);
}
}
function pruneLag() {

View file

@ -0,0 +1,87 @@
'use strict';
// #143 integration — a device cycling DIFFERENT content ids at high rate (the case
// dedup misses) is rate-limited; an under-budget device passes every ack. Observed
// via the server log: passing acks log `Device <id> content <cid>: ready`; over-
// budget drops are silent except ONE `[content-ack] shedding device <id>` line.
// Normal band (default lag thresholds), unique PORT 3985.
const { test, before, after } = require('node:test');
const assert = require('node:assert/strict');
const { spawn } = require('node:child_process');
const path = require('node:path');
const os = require('node:os');
const fs = require('node:fs');
const crypto = require('node:crypto');
const ioClient = require('socket.io-client');
const PORT = 3985;
const BASE = `http://127.0.0.1:${PORT}`;
const DATA_DIR = path.join(os.tmpdir(), 'st-flood-' + crypto.randomBytes(4).toString('hex'));
const LOG = path.join(os.tmpdir(), 'st-flood-' + crypto.randomBytes(4).toString('hex') + '.log');
let proc;
const sleep = (ms) => new Promise(r => setTimeout(r, ms));
before(async () => {
const logFd = fs.openSync(LOG, 'w');
proc = spawn('node', ['server.js'], {
cwd: path.join(__dirname, '..'),
env: {
...process.env, DATA_DIR, SELF_HOSTED: 'true', PORT: String(PORT), NODE_ENV: 'test',
CONTENT_ACK_MAX_PER_WINDOW: '5', CONTENT_ACK_RATE_WINDOW_MS: '10000', CONTENT_ACK_DEDUP_MS: '50',
},
stdio: ['ignore', logFd, logFd],
});
let up = false;
for (let i = 0; i < 80; i++) { try { const r = await fetch(BASE + '/api/status'); if (r.ok) { up = true; break; } } catch { /* */ } await sleep(250); }
if (!up) throw new Error('server did not boot:\n' + fs.readFileSync(LOG, 'utf8').slice(-2000));
});
after(() => { try { proc.kill('SIGKILL'); } catch { /* */ } });
function provision() {
const code = String(crypto.randomInt(100000, 1000000));
return new Promise((resolve) => {
const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true });
sock.on('connect', () => sock.emit('device:register', { pairing_code: code }));
sock.on('device:registered', (d) => { try { sock.close(); } catch { /* */ } resolve({ id: d.device_id, token: d.device_token }); });
setTimeout(() => resolve(null), 4000);
});
}
function openRegistered(dev) {
return new Promise((resolve, reject) => {
const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true });
sock.on('connect', () => sock.emit('device:register', { device_id: dev.id, device_token: dev.token, device_info: { app_version: 'test' } }));
sock.on('device:registered', () => resolve(sock));
sock.on('device:auth-error', () => reject(new Error('auth-error')));
setTimeout(() => reject(new Error('register timeout')), 4000);
});
}
const linesFor = (id, needle) => fs.readFileSync(LOG, 'utf8').split('\n').filter(l => l.includes(id) && l.includes(needle)).length;
test('#143: cycling 4 different content ids at high rate is rate-limited (dedup misses it)', async () => {
const dev = await provision();
assert.ok(dev, 'provisioned');
const sock = await openRegistered(dev);
const ids = ['a', 'b', 'c', 'd'];
// 15 acks, ~100ms apart -> each id repeats every ~400ms (> 50ms dedup) so dedup
// never fires; budget is 5/10s, so only 5 pass and the rest are shed.
for (let i = 0; i < 15; i++) { sock.emit('device:content-ack', { device_id: dev.id, content_id: ids[i % 4], status: 'ready' }); await sleep(100); }
await sleep(400);
try { sock.close(); } catch { /* */ }
const passed = linesFor(dev.id, 'content '); // `Device <id> content <cid>: ready`
const shedStart = linesFor(dev.id, 'shedding device');
assert.equal(passed, 5, `exactly the budget (5) passed/logged, got ${passed}`);
assert.ok(shedStart >= 1, 'a single shed-start line was logged when flood control engaged');
assert.ok(shedStart === 1, `shed-start logged ONCE per window (no per-drop flood), got ${shedStart}`);
});
test('a device under budget has every ack processed', async () => {
const dev = await provision();
const sock = await openRegistered(dev);
for (const id of ['p', 'q', 'r', 's']) { sock.emit('device:content-ack', { device_id: dev.id, content_id: id, status: 'ready' }); await sleep(60); }
await sleep(300);
try { sock.close(); } catch { /* */ }
assert.equal(linesFor(dev.id, 'content '), 4, 'all 4 under-budget acks processed');
assert.equal(linesFor(dev.id, 'shedding device'), 0, 'no shedding for an under-budget device');
});

View file

@ -0,0 +1,91 @@
'use strict';
// #143 — deterministic unit tests for the folded content-ack limiter (dedup +
// per-device rate budget + global critical-lag valve). Injected `now`/`band`, no
// sockets. DATA_DIR set before require so config's jwt-secret write goes to a temp
// dir (not the repo). Rate params pinned via env for clarity.
const os = require('node:os');
const path = require('node:path');
const crypto = require('node:crypto');
process.env.DATA_DIR = path.join(os.tmpdir(), 'st-ack143-' + crypto.randomBytes(4).toString('hex'));
process.env.CONTENT_ACK_MAX_PER_WINDOW = '5';
process.env.CONTENT_ACK_RATE_WINDOW_MS = '10000';
process.env.CONTENT_ACK_DEDUP_MS = '2000';
const { test, beforeEach } = require('node:test');
const assert = require('node:assert/strict');
const limiter = require('../lib/content-ack-limiter');
const T0 = 1_000_000;
beforeEach(() => limiter.reset());
function tally(results) {
const t = { pass: 0, dedup: 0, 'shed-rate': 0, 'shed-valve': 0, logStarts: 0 };
for (const r of results) { t[r.action]++; if (r.logStart) t.logStarts++; }
return t;
}
test('#143 REGRESSION: cycling 2-4 DIFFERENT content ids (evading dedup) IS rate-limited', () => {
// 4 ids, spaced 600ms so each id repeats every 2400ms (> 2000ms dedup) -> dedup
// never fires (the exact case it misses), so the RATE budget must cap them.
const ids = ['a', 'b', 'c', 'd'];
const out = [];
for (let i = 0; i < 12; i++) out.push(limiter.check('dev', ids[i % 4], 'ready', 'normal', T0 + i * 600));
const t = tally(out);
assert.equal(t.dedup, 0, 'dedup must NOT mask the cycling flood (proves rate limiter is what caps)');
assert.equal(t.pass, 5, 'first MAX(5) pass');
assert.equal(t['shed-rate'], 7, 'the remaining 7 are rate-shed');
assert.equal(t.logStarts, 1, 'shedding logged exactly once per device per window');
});
test('unique ids (dedup never matches): under budget passes, over budget sheds', () => {
const out = [];
for (let i = 0; i < 8; i++) out.push(limiter.check('dev', 'u' + i, 'ready', 'normal', T0 + i * 10));
const t = tally(out);
assert.equal(t.pass, 5);
assert.equal(t['shed-rate'], 3);
});
test('a device exactly at budget passes every ack', () => {
for (let i = 0; i < 5; i++) {
const v = limiter.check('dev', 'u' + i, 'ready', 'normal', T0 + i * 10);
assert.equal(v.action, 'pass', `ack ${i + 1} (<=budget) passes`);
}
});
test('dedup still works AND does not consume rate budget (no regression)', () => {
assert.equal(limiter.check('dev', 'a', 'ready', 'normal', T0).action, 'pass'); // count 1
assert.equal(limiter.check('dev', 'a', 'ready', 'normal', T0 + 100).action, 'dedup'); // repeat -> dedup
assert.equal(limiter.check('dev', 'a', 'ready', 'normal', T0 + 200).action, 'dedup'); // repeat -> dedup
// budget should still have 4 left (the 2 dedups didn't count): 4 distinct pass, 5th sheds
for (const id of ['b', 'c', 'd', 'e']) assert.equal(limiter.check('dev', id, 'ready', 'normal', T0 + 300).action, 'pass');
assert.equal(limiter.check('dev', 'f', 'ready', 'normal', T0 + 300).action, 'shed-rate', 'budget consumed only by non-duplicates');
});
test('global valve: CRITICAL sheds an in-budget device; NORMAL leaves it untouched', () => {
assert.equal(limiter.check('A', 'x', 'ready', 'critical', T0).action, 'shed-valve', 'critical sheds even under budget');
assert.equal(limiter.check('B', 'x', 'ready', 'normal', T0).action, 'pass', 'normal-band healthy device is unaffected by the valve');
// a healthy device under normal band is never valve-touched across many acks
for (let i = 0; i < 5; i++) assert.equal(limiter.check('C', 'u' + i, 'ready', 'normal', T0 + i).action, 'pass');
});
test('over-budget rate shedding takes precedence over the valve', () => {
let v;
for (let i = 0; i < 6; i++) v = limiter.check('dev', 'u' + i, 'ready', 'critical', T0 + i);
assert.equal(v.action, 'shed-rate', 'a flooding device reports rate shedding, not just valve');
});
test('window reset: count + shed-notified reset, so a new window logs shedding again', () => {
for (let i = 0; i < 7; i++) limiter.check('dev', 'u' + i, 'ready', 'normal', T0 + i * 10); // sheds, logs once
// next window (>10s later): fresh budget
for (let i = 0; i < 5; i++) assert.equal(limiter.check('dev', 'w' + i, 'ready', 'normal', T0 + 11000 + i).action, 'pass');
const v = limiter.check('dev', 'w9', 'ready', 'normal', T0 + 11000 + 100);
assert.equal(v.action, 'shed-rate');
assert.equal(v.logStart, true, 'shedding is logged once in the NEW window too');
});
test('per-device isolation: one device flooding does not shed another', () => {
for (let i = 0; i < 10; i++) limiter.check('STORM', 'u' + i, 'ready', 'normal', T0 + i); // STORM sheds
assert.equal(limiter.check('NEIGHBOR', 'x', 'ready', 'normal', T0 + 11).action, 'pass');
});

View file

@ -0,0 +1,88 @@
'use strict';
// #143 Step 2 integration — the global loop-lag pressure valve. Lag thresholds are
// forced to 0 so the band is CRITICAL from the first sample; rate budget is set high
// so we isolate the VALVE (not the per-device rate limit). Under critical: content-
// acks are shed (no log/emit), while a reconnect AND an HTTP/dashboard request still
// process, and the valve edge is logged once. Unique PORT 3986.
const { test, before, after } = require('node:test');
const assert = require('node:assert/strict');
const { spawn } = require('node:child_process');
const path = require('node:path');
const os = require('node:os');
const fs = require('node:fs');
const crypto = require('node:crypto');
const ioClient = require('socket.io-client');
const PORT = 3986;
const BASE = `http://127.0.0.1:${PORT}`;
const DATA_DIR = path.join(os.tmpdir(), 'st-valve-' + crypto.randomBytes(4).toString('hex'));
const LOG = path.join(os.tmpdir(), 'st-valve-' + crypto.randomBytes(4).toString('hex') + '.log');
let proc;
const sleep = (ms) => new Promise(r => setTimeout(r, ms));
before(async () => {
const logFd = fs.openSync(LOG, 'w');
proc = spawn('node', ['server.js'], {
cwd: path.join(__dirname, '..'),
env: {
...process.env, DATA_DIR, SELF_HOSTED: 'true', PORT: String(PORT), NODE_ENV: 'test',
LAG_CRITICAL_MS: '1', LAG_ELEVATED_MS: '1', LAG_SAMPLE_INTERVAL_MS: '150', // tiny thresholds (NOT 0 — config uses `|| default`, 0 is falsy) -> band critical immediately
CONTENT_ACK_MAX_PER_WINDOW: '1000', CONTENT_ACK_RATE_WINDOW_MS: '10000', // high, so the VALVE is what sheds
},
stdio: ['ignore', logFd, logFd],
});
let up = false;
for (let i = 0; i < 80; i++) { try { const r = await fetch(BASE + '/api/status'); if (r.ok) { up = true; break; } } catch { /* */ } await sleep(250); }
if (!up) throw new Error('server did not boot:\n' + fs.readFileSync(LOG, 'utf8').slice(-2000));
await sleep(600); // let the valve open (>=1 sample at 150ms in the critical band)
});
after(() => { try { proc.kill('SIGKILL'); } catch { /* */ } });
function provision() {
const code = String(crypto.randomInt(100000, 1000000));
return new Promise((resolve) => {
const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true });
sock.on('connect', () => sock.emit('device:register', { pairing_code: code }));
sock.on('device:registered', (d) => { try { sock.close(); } catch { /* */ } resolve({ id: d.device_id, token: d.device_token }); });
setTimeout(() => resolve(null), 4000);
});
}
function openRegistered(dev) {
return new Promise((resolve) => {
const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true });
let done = false; const finish = (v) => { if (!done) { done = true; resolve(v); } };
sock.on('connect', () => sock.emit('device:register', { device_id: dev.id, device_token: dev.token, device_info: { app_version: 'test' } }));
sock.on('device:registered', () => finish({ sock, registered: true }));
sock.on('device:auth-error', () => finish({ sock, registered: false }));
setTimeout(() => finish({ sock, registered: false }), 4000);
});
}
test('valve OPEN under critical: content-acks are shed, reconnect + HTTP still process', async () => {
const log0 = fs.readFileSync(LOG, 'utf8');
assert.ok(/\[shed\] global valve OPEN/.test(log0), 'valve edge logged once on entering critical');
// 1) content-acks under critical -> shed (no log/emit)
const dev = await provision();
const { sock, registered } = await openRegistered(dev);
assert.ok(registered, 'a device can still register/reconnect under critical (reconnects always processed)');
for (let i = 0; i < 6; i++) { sock.emit('device:content-ack', { device_id: dev.id, content_id: 'v' + i, status: 'ready' }); await sleep(40); }
await sleep(300);
try { sock.close(); } catch { /* */ }
const contentLines = fs.readFileSync(LOG, 'utf8').split('\n').filter(l => l.includes(dev.id) && l.includes('content ')).length;
assert.equal(contentLines, 0, 'all content-acks shed by the valve under critical (none logged/emitted)');
// 2) a fresh reconnect still registers (reconnects are never shed)
const dev2 = await provision();
const r2 = await openRegistered(dev2);
assert.ok(r2.registered, 'reconnect processed under critical');
try { r2.sock.close(); } catch { /* */ }
// 3) HTTP / dashboard path still serves under critical
const res = await fetch(BASE + '/api/status');
assert.equal(res.status, 200, 'HTTP/dashboard requests always processed');
const body = await res.json();
assert.equal(body.loop_lag.band, 'critical', 'sanity: band really is critical');
});

View file

@ -7,6 +7,8 @@ const config = require('../config');
const heartbeat = require('../services/heartbeat');
const commandQueue = require('../lib/command-queue');
const reconnectThrottle = require('../lib/reconnect-throttle');
const contentAckLimiter = require('../lib/content-ack-limiter');
const loopLag = require('../services/loop-lag');
// Debounce window for marking a device offline on socket disconnect. Brief
// flap (Wi-Fi blip, Engine.IO ping miss, server-side eviction-then-reconnect)
@ -29,11 +31,9 @@ const OFFLINE_DEBOUNCE_MS = 5000;
const lastPlayLogAt = new Map();
const PLAY_LOG_MIN_GAP_MS = 2000;
// #142 content-ack dedup. An older app can spam "content <id>: ready" for the same
// item; each was logged + emitted individually (secondary load). Suppress identical
// (device_id, content_id, status) reports within config.contentAckDedupMs. A status
// CHANGE has a different key and passes immediately. In-memory; resets on restart.
const lastContentAck = new Map();
// #142 dedup + #143 per-device rate budget + global loop-lag valve for content-acks
// all live in one control: lib/content-ack-limiter.js (required above as
// contentAckLimiter). Kept out of this file so there is a single limiter on the path.
const { getUserPlan, getUserDeviceCount } = require('../middleware/subscription');
// Phase 2.3: deviceRoom() resolves a device_id to its workspace room so
// dashboardNs.emit can be scoped instead of broadcast platform-wide.
@ -585,13 +585,18 @@ module.exports = function setupDeviceSocket(io) {
if (!requireDeviceAuth()) return;
const { device_id, content_id, status } = data;
if (device_id !== currentDeviceId) return;
// #142: drop repeats of the same (device, content, status) within the dedup
// window. Only a change (new content/status) or a report after the window
// logs+emits, so a device spamming the same "ready" can't add load.
const ackKey = `${device_id}|${content_id}|${status}`;
const nowAck = Date.now();
if (nowAck - (lastContentAck.get(ackKey) || 0) < config.contentAckDedupMs) return;
lastContentAck.set(ackKey, nowAck);
// #142 dedup + #143 per-device rate budget + global critical-lag valve, in one
// control. Anything but 'pass' is dropped BEFORE the log+emit (that per-ack work
// is the cost we shed). Drops are SILENT except a single line per device per
// window when rate-shedding STARTS (re-logging per drop would recreate the
// flood). The valve's open/close is logged once at the band edge in loop-lag.
const verdict = contentAckLimiter.check(device_id, content_id, status, loopLag.getBand());
if (verdict.action !== 'pass') {
if (verdict.action === 'shed-rate' && verdict.logStart) {
console.warn(`[content-ack] shedding device ${device_id}: ${verdict.observed}/${verdict.budget} per ${config.contentAckRateWindowMs}ms — flood control engaged`);
}
return;
}
console.log(`Device ${device_id} content ${content_id}: ${status}`);
emitToDeviceWorkspace(dashboardNs, device_id, 'dashboard:content-ack', { device_id, content_id, status });
});