mirror of
https://github.com/screentinker/screentinker.git
synced 2026-06-29 09:23:16 -06:00
fix(#143): content-ack flood control — per-device rate budget + loop-lag valve
#142's content-ack dedup is insufficient: a device cycling 2-4 content IDs makes every ack look unique so dedup never fires, while aggregate volume from ~30 devices saturates the event loop (the #142 reconnect throttle kept the server responsive, which is how this was even observable). Folded ONE control on the content-ack path (no competing limiters; reconnect- throttle.js untouched) in lib/content-ack-limiter.js: - Step 1 — per-device RATE budget: caps TOTAL non-duplicate acks per device per window regardless of differing content_id (the case dedup misses). Over budget = DROP silently (the per-ack log+emit is the cost); log ONCE per device per window when shedding starts. Keeps the #142 dedup (dedup'd repeats don't consume budget). Per-device, in-memory, resets on restart (modeled on lastPlayLogAt; does NOT reuse reconnect-throttle's ban-semantics bucket). Env (TUNING GUESSES, validate vs Bold's fleet): CONTENT_ACK_MAX_PER_WINDOW=20, CONTENT_ACK_RATE_WINDOW_MS=10000 (=2/s, above legit ~<=1/s, below the flood). - Step 2 — global pressure valve: reuses the #142 loop-lag band (+ its hysteresis, no second control loop). Under CRITICAL band, shed content-acks even for an in-budget device; reconnects + dashboard/HTTP are ALWAYS processed; a healthy device in a non-critical band is never touched by the valve. Valve open/close logged once at the band edge in services/loop-lag.js (not per shed message). Tests (unique ports 3985/3986, not the 3982/3983/3984 set): - unit: the #143 regression (cycling ids evading dedup IS rate-limited), under/over budget, dedup still works + doesn't consume budget, valve sheds in-budget under critical while normal is untouched, rate precedence, window reset, per-device isolation. - integration: socket flood is capped to budget with a single shed-start log; under-budget passes every ack; valve OPEN sheds content-acks while a reconnect + /api/status still succeed. Full suite green serial AND parallel (208 tests). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
d9fb914b9e
commit
dbac699854
|
|
@ -149,4 +149,14 @@ module.exports = {
|
|||
// (device_id, content_id, status) reports within this window. A status CHANGE
|
||||
// has a different key and passes immediately. In-memory; resets on restart.
|
||||
contentAckDedupMs: parseInt(process.env.CONTENT_ACK_DEDUP_MS) || 10000,
|
||||
|
||||
// #143 content-ack RATE budget (lib/content-ack-limiter.js), layered on top of the
|
||||
// dedup above. Caps TOTAL acks per device per window REGARDLESS of differing
|
||||
// content_id — the flood the dedup misses (a device cycling 2-4 ids makes every
|
||||
// ack look unique, so dedup never fires, yet aggregate volume blocks the loop).
|
||||
// TUNING GUESSES — validate against Bold's real fleet. Legit playlist cadence is
|
||||
// roughly <=1 ack/s/device; the flood is many/s. 20 per 10s (=2/s) sits above
|
||||
// legit and below the flood. Easy to retune via env.
|
||||
contentAckMaxPerWindow: parseInt(process.env.CONTENT_ACK_MAX_PER_WINDOW) || 20,
|
||||
contentAckRateWindowMs: parseInt(process.env.CONTENT_ACK_RATE_WINDOW_MS) || 10000,
|
||||
};
|
||||
|
|
|
|||
64
server/lib/content-ack-limiter.js
Normal file
64
server/lib/content-ack-limiter.js
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
// #143 — content-ack flood control (the single control on the content-ack path).
|
||||
//
|
||||
// Folds three concerns into ONE per-device limiter so there are no competing
|
||||
// limiters on this path (reconnect-throttle.js is left untouched):
|
||||
// 1. #142 dedup — drop an exact (content_id, status) repeat within the dedup
|
||||
// window. Legit repeat suppression; does NOT consume rate budget.
|
||||
// 2. #143 per-device RATE budget — cap TOTAL non-duplicate acks per device per
|
||||
// window regardless of differing content_id. This is what dedup misses: a
|
||||
// device cycling 2-4 ids makes each ack look unique, so dedup never fires,
|
||||
// but aggregate volume still floods the loop. Over budget -> shed silently.
|
||||
// 3. #143 global pressure valve — when loop-lag (services/loop-lag.js) reports
|
||||
// the CRITICAL band, shed non-essential acks even for a device within its own
|
||||
// budget. Reuses the existing band + hysteresis; never fires below critical.
|
||||
//
|
||||
// Per-device, in-memory, resets on restart (like lastPlayLogAt / pair-lockout).
|
||||
// Fixed window (counter reset per window) — simple and makes "log once per window"
|
||||
// natural. `band` is injected so this is testable without the loop-lag monitor.
|
||||
|
||||
const config = require('../config');
|
||||
|
||||
// deviceId -> { winStart, count, shedNotified, dup: Map(content|status -> ts) }
|
||||
const state = new Map();
|
||||
|
||||
// Returns one of:
|
||||
// { action: 'pass' } -> caller logs + emits
|
||||
// { action: 'dedup' } -> drop (exact repeat)
|
||||
// { action: 'shed-rate', logStart, observed, budget } -> drop (over per-device budget)
|
||||
// { action: 'shed-valve' } -> drop (global critical-lag valve)
|
||||
function check(deviceId, contentId, status, band = 'normal', now = Date.now()) {
|
||||
let s = state.get(deviceId);
|
||||
if (!s) { s = { winStart: now, count: 0, shedNotified: false, dup: new Map() }; state.set(deviceId, s); }
|
||||
|
||||
// Roll the fixed rate window.
|
||||
if (now - s.winStart >= config.contentAckRateWindowMs) {
|
||||
s.winStart = now;
|
||||
s.count = 0;
|
||||
s.shedNotified = false;
|
||||
// Bound the dedup map: drop entries older than the dedup window.
|
||||
for (const [k, t] of s.dup) if (now - t >= config.contentAckDedupMs) s.dup.delete(k);
|
||||
}
|
||||
|
||||
// 1) Dedup — exact (content, status) repeat within the dedup window. Does NOT
|
||||
// consume rate budget (it's a legit repeat we simply suppress).
|
||||
const key = `${contentId}|${status}`;
|
||||
if (now - (s.dup.get(key) || 0) < config.contentAckDedupMs) return { action: 'dedup' };
|
||||
s.dup.set(key, now);
|
||||
|
||||
// 2) Per-device rate budget — always applies, counts all non-duplicate acks.
|
||||
s.count++;
|
||||
if (s.count > config.contentAckMaxPerWindow) {
|
||||
const logStart = !s.shedNotified; // log ONCE per device per window when shedding starts
|
||||
s.shedNotified = true;
|
||||
return { action: 'shed-rate', logStart, observed: s.count, budget: config.contentAckMaxPerWindow };
|
||||
}
|
||||
|
||||
// 3) Global valve — extra shedding only under critical lag; a within-budget device
|
||||
// in a non-critical band is never touched here.
|
||||
if (band === 'critical') return { action: 'shed-valve' };
|
||||
|
||||
return { action: 'pass' };
|
||||
}
|
||||
|
||||
function reset() { state.clear(); } // tests
|
||||
module.exports = { check, reset };
|
||||
|
|
@ -79,6 +79,14 @@ function sample() {
|
|||
const tag = band !== prev ? ` (was ${prev})` : '';
|
||||
console.log(`[loop-lag] band=${band}${tag} mean=${snap.mean_ms}ms p99=${snap.p99_ms}ms max=${snap.max_ms}ms`);
|
||||
}
|
||||
|
||||
// #143 global pressure valve — log ONLY the band edge (open/close), not per shed
|
||||
// message. When critical, deviceSocket sheds non-essential acks (it reads getBand()).
|
||||
if (band === 'critical' && prev !== 'critical') {
|
||||
console.warn(`[shed] global valve OPEN — loop-lag critical (p99=${snap.p99_ms}ms); shedding non-essential device messages (content-acks). reconnects + dashboard still processed.`);
|
||||
} else if (prev === 'critical' && band !== 'critical') {
|
||||
console.log(`[shed] global valve CLOSED — loop-lag recovered (band=${band}, p99=${snap.p99_ms}ms)`);
|
||||
}
|
||||
}
|
||||
|
||||
function pruneLag() {
|
||||
|
|
|
|||
87
server/test/content-ack-flood.test.js
Normal file
87
server/test/content-ack-flood.test.js
Normal file
|
|
@ -0,0 +1,87 @@
|
|||
'use strict';
|
||||
|
||||
// #143 integration — a device cycling DIFFERENT content ids at high rate (the case
|
||||
// dedup misses) is rate-limited; an under-budget device passes every ack. Observed
|
||||
// via the server log: passing acks log `Device <id> content <cid>: ready`; over-
|
||||
// budget drops are silent except ONE `[content-ack] shedding device <id>` line.
|
||||
// Normal band (default lag thresholds), unique PORT 3985.
|
||||
|
||||
const { test, before, after } = require('node:test');
|
||||
const assert = require('node:assert/strict');
|
||||
const { spawn } = require('node:child_process');
|
||||
const path = require('node:path');
|
||||
const os = require('node:os');
|
||||
const fs = require('node:fs');
|
||||
const crypto = require('node:crypto');
|
||||
const ioClient = require('socket.io-client');
|
||||
|
||||
const PORT = 3985;
|
||||
const BASE = `http://127.0.0.1:${PORT}`;
|
||||
const DATA_DIR = path.join(os.tmpdir(), 'st-flood-' + crypto.randomBytes(4).toString('hex'));
|
||||
const LOG = path.join(os.tmpdir(), 'st-flood-' + crypto.randomBytes(4).toString('hex') + '.log');
|
||||
let proc;
|
||||
const sleep = (ms) => new Promise(r => setTimeout(r, ms));
|
||||
|
||||
before(async () => {
|
||||
const logFd = fs.openSync(LOG, 'w');
|
||||
proc = spawn('node', ['server.js'], {
|
||||
cwd: path.join(__dirname, '..'),
|
||||
env: {
|
||||
...process.env, DATA_DIR, SELF_HOSTED: 'true', PORT: String(PORT), NODE_ENV: 'test',
|
||||
CONTENT_ACK_MAX_PER_WINDOW: '5', CONTENT_ACK_RATE_WINDOW_MS: '10000', CONTENT_ACK_DEDUP_MS: '50',
|
||||
},
|
||||
stdio: ['ignore', logFd, logFd],
|
||||
});
|
||||
let up = false;
|
||||
for (let i = 0; i < 80; i++) { try { const r = await fetch(BASE + '/api/status'); if (r.ok) { up = true; break; } } catch { /* */ } await sleep(250); }
|
||||
if (!up) throw new Error('server did not boot:\n' + fs.readFileSync(LOG, 'utf8').slice(-2000));
|
||||
});
|
||||
after(() => { try { proc.kill('SIGKILL'); } catch { /* */ } });
|
||||
|
||||
function provision() {
|
||||
const code = String(crypto.randomInt(100000, 1000000));
|
||||
return new Promise((resolve) => {
|
||||
const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true });
|
||||
sock.on('connect', () => sock.emit('device:register', { pairing_code: code }));
|
||||
sock.on('device:registered', (d) => { try { sock.close(); } catch { /* */ } resolve({ id: d.device_id, token: d.device_token }); });
|
||||
setTimeout(() => resolve(null), 4000);
|
||||
});
|
||||
}
|
||||
function openRegistered(dev) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true });
|
||||
sock.on('connect', () => sock.emit('device:register', { device_id: dev.id, device_token: dev.token, device_info: { app_version: 'test' } }));
|
||||
sock.on('device:registered', () => resolve(sock));
|
||||
sock.on('device:auth-error', () => reject(new Error('auth-error')));
|
||||
setTimeout(() => reject(new Error('register timeout')), 4000);
|
||||
});
|
||||
}
|
||||
const linesFor = (id, needle) => fs.readFileSync(LOG, 'utf8').split('\n').filter(l => l.includes(id) && l.includes(needle)).length;
|
||||
|
||||
test('#143: cycling 4 different content ids at high rate is rate-limited (dedup misses it)', async () => {
|
||||
const dev = await provision();
|
||||
assert.ok(dev, 'provisioned');
|
||||
const sock = await openRegistered(dev);
|
||||
const ids = ['a', 'b', 'c', 'd'];
|
||||
// 15 acks, ~100ms apart -> each id repeats every ~400ms (> 50ms dedup) so dedup
|
||||
// never fires; budget is 5/10s, so only 5 pass and the rest are shed.
|
||||
for (let i = 0; i < 15; i++) { sock.emit('device:content-ack', { device_id: dev.id, content_id: ids[i % 4], status: 'ready' }); await sleep(100); }
|
||||
await sleep(400);
|
||||
try { sock.close(); } catch { /* */ }
|
||||
|
||||
const passed = linesFor(dev.id, 'content '); // `Device <id> content <cid>: ready`
|
||||
const shedStart = linesFor(dev.id, 'shedding device');
|
||||
assert.equal(passed, 5, `exactly the budget (5) passed/logged, got ${passed}`);
|
||||
assert.ok(shedStart >= 1, 'a single shed-start line was logged when flood control engaged');
|
||||
assert.ok(shedStart === 1, `shed-start logged ONCE per window (no per-drop flood), got ${shedStart}`);
|
||||
});
|
||||
|
||||
test('a device under budget has every ack processed', async () => {
|
||||
const dev = await provision();
|
||||
const sock = await openRegistered(dev);
|
||||
for (const id of ['p', 'q', 'r', 's']) { sock.emit('device:content-ack', { device_id: dev.id, content_id: id, status: 'ready' }); await sleep(60); }
|
||||
await sleep(300);
|
||||
try { sock.close(); } catch { /* */ }
|
||||
assert.equal(linesFor(dev.id, 'content '), 4, 'all 4 under-budget acks processed');
|
||||
assert.equal(linesFor(dev.id, 'shedding device'), 0, 'no shedding for an under-budget device');
|
||||
});
|
||||
91
server/test/content-ack-limiter.test.js
Normal file
91
server/test/content-ack-limiter.test.js
Normal file
|
|
@ -0,0 +1,91 @@
|
|||
'use strict';
|
||||
|
||||
// #143 — deterministic unit tests for the folded content-ack limiter (dedup +
|
||||
// per-device rate budget + global critical-lag valve). Injected `now`/`band`, no
|
||||
// sockets. DATA_DIR set before require so config's jwt-secret write goes to a temp
|
||||
// dir (not the repo). Rate params pinned via env for clarity.
|
||||
|
||||
const os = require('node:os');
|
||||
const path = require('node:path');
|
||||
const crypto = require('node:crypto');
|
||||
process.env.DATA_DIR = path.join(os.tmpdir(), 'st-ack143-' + crypto.randomBytes(4).toString('hex'));
|
||||
process.env.CONTENT_ACK_MAX_PER_WINDOW = '5';
|
||||
process.env.CONTENT_ACK_RATE_WINDOW_MS = '10000';
|
||||
process.env.CONTENT_ACK_DEDUP_MS = '2000';
|
||||
|
||||
const { test, beforeEach } = require('node:test');
|
||||
const assert = require('node:assert/strict');
|
||||
const limiter = require('../lib/content-ack-limiter');
|
||||
|
||||
const T0 = 1_000_000;
|
||||
beforeEach(() => limiter.reset());
|
||||
|
||||
function tally(results) {
|
||||
const t = { pass: 0, dedup: 0, 'shed-rate': 0, 'shed-valve': 0, logStarts: 0 };
|
||||
for (const r of results) { t[r.action]++; if (r.logStart) t.logStarts++; }
|
||||
return t;
|
||||
}
|
||||
|
||||
test('#143 REGRESSION: cycling 2-4 DIFFERENT content ids (evading dedup) IS rate-limited', () => {
|
||||
// 4 ids, spaced 600ms so each id repeats every 2400ms (> 2000ms dedup) -> dedup
|
||||
// never fires (the exact case it misses), so the RATE budget must cap them.
|
||||
const ids = ['a', 'b', 'c', 'd'];
|
||||
const out = [];
|
||||
for (let i = 0; i < 12; i++) out.push(limiter.check('dev', ids[i % 4], 'ready', 'normal', T0 + i * 600));
|
||||
const t = tally(out);
|
||||
assert.equal(t.dedup, 0, 'dedup must NOT mask the cycling flood (proves rate limiter is what caps)');
|
||||
assert.equal(t.pass, 5, 'first MAX(5) pass');
|
||||
assert.equal(t['shed-rate'], 7, 'the remaining 7 are rate-shed');
|
||||
assert.equal(t.logStarts, 1, 'shedding logged exactly once per device per window');
|
||||
});
|
||||
|
||||
test('unique ids (dedup never matches): under budget passes, over budget sheds', () => {
|
||||
const out = [];
|
||||
for (let i = 0; i < 8; i++) out.push(limiter.check('dev', 'u' + i, 'ready', 'normal', T0 + i * 10));
|
||||
const t = tally(out);
|
||||
assert.equal(t.pass, 5);
|
||||
assert.equal(t['shed-rate'], 3);
|
||||
});
|
||||
|
||||
test('a device exactly at budget passes every ack', () => {
|
||||
for (let i = 0; i < 5; i++) {
|
||||
const v = limiter.check('dev', 'u' + i, 'ready', 'normal', T0 + i * 10);
|
||||
assert.equal(v.action, 'pass', `ack ${i + 1} (<=budget) passes`);
|
||||
}
|
||||
});
|
||||
|
||||
test('dedup still works AND does not consume rate budget (no regression)', () => {
|
||||
assert.equal(limiter.check('dev', 'a', 'ready', 'normal', T0).action, 'pass'); // count 1
|
||||
assert.equal(limiter.check('dev', 'a', 'ready', 'normal', T0 + 100).action, 'dedup'); // repeat -> dedup
|
||||
assert.equal(limiter.check('dev', 'a', 'ready', 'normal', T0 + 200).action, 'dedup'); // repeat -> dedup
|
||||
// budget should still have 4 left (the 2 dedups didn't count): 4 distinct pass, 5th sheds
|
||||
for (const id of ['b', 'c', 'd', 'e']) assert.equal(limiter.check('dev', id, 'ready', 'normal', T0 + 300).action, 'pass');
|
||||
assert.equal(limiter.check('dev', 'f', 'ready', 'normal', T0 + 300).action, 'shed-rate', 'budget consumed only by non-duplicates');
|
||||
});
|
||||
|
||||
test('global valve: CRITICAL sheds an in-budget device; NORMAL leaves it untouched', () => {
|
||||
assert.equal(limiter.check('A', 'x', 'ready', 'critical', T0).action, 'shed-valve', 'critical sheds even under budget');
|
||||
assert.equal(limiter.check('B', 'x', 'ready', 'normal', T0).action, 'pass', 'normal-band healthy device is unaffected by the valve');
|
||||
// a healthy device under normal band is never valve-touched across many acks
|
||||
for (let i = 0; i < 5; i++) assert.equal(limiter.check('C', 'u' + i, 'ready', 'normal', T0 + i).action, 'pass');
|
||||
});
|
||||
|
||||
test('over-budget rate shedding takes precedence over the valve', () => {
|
||||
let v;
|
||||
for (let i = 0; i < 6; i++) v = limiter.check('dev', 'u' + i, 'ready', 'critical', T0 + i);
|
||||
assert.equal(v.action, 'shed-rate', 'a flooding device reports rate shedding, not just valve');
|
||||
});
|
||||
|
||||
test('window reset: count + shed-notified reset, so a new window logs shedding again', () => {
|
||||
for (let i = 0; i < 7; i++) limiter.check('dev', 'u' + i, 'ready', 'normal', T0 + i * 10); // sheds, logs once
|
||||
// next window (>10s later): fresh budget
|
||||
for (let i = 0; i < 5; i++) assert.equal(limiter.check('dev', 'w' + i, 'ready', 'normal', T0 + 11000 + i).action, 'pass');
|
||||
const v = limiter.check('dev', 'w9', 'ready', 'normal', T0 + 11000 + 100);
|
||||
assert.equal(v.action, 'shed-rate');
|
||||
assert.equal(v.logStart, true, 'shedding is logged once in the NEW window too');
|
||||
});
|
||||
|
||||
test('per-device isolation: one device flooding does not shed another', () => {
|
||||
for (let i = 0; i < 10; i++) limiter.check('STORM', 'u' + i, 'ready', 'normal', T0 + i); // STORM sheds
|
||||
assert.equal(limiter.check('NEIGHBOR', 'x', 'ready', 'normal', T0 + 11).action, 'pass');
|
||||
});
|
||||
88
server/test/content-ack-valve.test.js
Normal file
88
server/test/content-ack-valve.test.js
Normal file
|
|
@ -0,0 +1,88 @@
|
|||
'use strict';
|
||||
|
||||
// #143 Step 2 integration — the global loop-lag pressure valve. Lag thresholds are
|
||||
// forced to 0 so the band is CRITICAL from the first sample; rate budget is set high
|
||||
// so we isolate the VALVE (not the per-device rate limit). Under critical: content-
|
||||
// acks are shed (no log/emit), while a reconnect AND an HTTP/dashboard request still
|
||||
// process, and the valve edge is logged once. Unique PORT 3986.
|
||||
|
||||
const { test, before, after } = require('node:test');
|
||||
const assert = require('node:assert/strict');
|
||||
const { spawn } = require('node:child_process');
|
||||
const path = require('node:path');
|
||||
const os = require('node:os');
|
||||
const fs = require('node:fs');
|
||||
const crypto = require('node:crypto');
|
||||
const ioClient = require('socket.io-client');
|
||||
|
||||
const PORT = 3986;
|
||||
const BASE = `http://127.0.0.1:${PORT}`;
|
||||
const DATA_DIR = path.join(os.tmpdir(), 'st-valve-' + crypto.randomBytes(4).toString('hex'));
|
||||
const LOG = path.join(os.tmpdir(), 'st-valve-' + crypto.randomBytes(4).toString('hex') + '.log');
|
||||
let proc;
|
||||
const sleep = (ms) => new Promise(r => setTimeout(r, ms));
|
||||
|
||||
before(async () => {
|
||||
const logFd = fs.openSync(LOG, 'w');
|
||||
proc = spawn('node', ['server.js'], {
|
||||
cwd: path.join(__dirname, '..'),
|
||||
env: {
|
||||
...process.env, DATA_DIR, SELF_HOSTED: 'true', PORT: String(PORT), NODE_ENV: 'test',
|
||||
LAG_CRITICAL_MS: '1', LAG_ELEVATED_MS: '1', LAG_SAMPLE_INTERVAL_MS: '150', // tiny thresholds (NOT 0 — config uses `|| default`, 0 is falsy) -> band critical immediately
|
||||
CONTENT_ACK_MAX_PER_WINDOW: '1000', CONTENT_ACK_RATE_WINDOW_MS: '10000', // high, so the VALVE is what sheds
|
||||
},
|
||||
stdio: ['ignore', logFd, logFd],
|
||||
});
|
||||
let up = false;
|
||||
for (let i = 0; i < 80; i++) { try { const r = await fetch(BASE + '/api/status'); if (r.ok) { up = true; break; } } catch { /* */ } await sleep(250); }
|
||||
if (!up) throw new Error('server did not boot:\n' + fs.readFileSync(LOG, 'utf8').slice(-2000));
|
||||
await sleep(600); // let the valve open (>=1 sample at 150ms in the critical band)
|
||||
});
|
||||
after(() => { try { proc.kill('SIGKILL'); } catch { /* */ } });
|
||||
|
||||
function provision() {
|
||||
const code = String(crypto.randomInt(100000, 1000000));
|
||||
return new Promise((resolve) => {
|
||||
const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true });
|
||||
sock.on('connect', () => sock.emit('device:register', { pairing_code: code }));
|
||||
sock.on('device:registered', (d) => { try { sock.close(); } catch { /* */ } resolve({ id: d.device_id, token: d.device_token }); });
|
||||
setTimeout(() => resolve(null), 4000);
|
||||
});
|
||||
}
|
||||
function openRegistered(dev) {
|
||||
return new Promise((resolve) => {
|
||||
const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true });
|
||||
let done = false; const finish = (v) => { if (!done) { done = true; resolve(v); } };
|
||||
sock.on('connect', () => sock.emit('device:register', { device_id: dev.id, device_token: dev.token, device_info: { app_version: 'test' } }));
|
||||
sock.on('device:registered', () => finish({ sock, registered: true }));
|
||||
sock.on('device:auth-error', () => finish({ sock, registered: false }));
|
||||
setTimeout(() => finish({ sock, registered: false }), 4000);
|
||||
});
|
||||
}
|
||||
|
||||
test('valve OPEN under critical: content-acks are shed, reconnect + HTTP still process', async () => {
|
||||
const log0 = fs.readFileSync(LOG, 'utf8');
|
||||
assert.ok(/\[shed\] global valve OPEN/.test(log0), 'valve edge logged once on entering critical');
|
||||
|
||||
// 1) content-acks under critical -> shed (no log/emit)
|
||||
const dev = await provision();
|
||||
const { sock, registered } = await openRegistered(dev);
|
||||
assert.ok(registered, 'a device can still register/reconnect under critical (reconnects always processed)');
|
||||
for (let i = 0; i < 6; i++) { sock.emit('device:content-ack', { device_id: dev.id, content_id: 'v' + i, status: 'ready' }); await sleep(40); }
|
||||
await sleep(300);
|
||||
try { sock.close(); } catch { /* */ }
|
||||
const contentLines = fs.readFileSync(LOG, 'utf8').split('\n').filter(l => l.includes(dev.id) && l.includes('content ')).length;
|
||||
assert.equal(contentLines, 0, 'all content-acks shed by the valve under critical (none logged/emitted)');
|
||||
|
||||
// 2) a fresh reconnect still registers (reconnects are never shed)
|
||||
const dev2 = await provision();
|
||||
const r2 = await openRegistered(dev2);
|
||||
assert.ok(r2.registered, 'reconnect processed under critical');
|
||||
try { r2.sock.close(); } catch { /* */ }
|
||||
|
||||
// 3) HTTP / dashboard path still serves under critical
|
||||
const res = await fetch(BASE + '/api/status');
|
||||
assert.equal(res.status, 200, 'HTTP/dashboard requests always processed');
|
||||
const body = await res.json();
|
||||
assert.equal(body.loop_lag.band, 'critical', 'sanity: band really is critical');
|
||||
});
|
||||
|
|
@ -7,6 +7,8 @@ const config = require('../config');
|
|||
const heartbeat = require('../services/heartbeat');
|
||||
const commandQueue = require('../lib/command-queue');
|
||||
const reconnectThrottle = require('../lib/reconnect-throttle');
|
||||
const contentAckLimiter = require('../lib/content-ack-limiter');
|
||||
const loopLag = require('../services/loop-lag');
|
||||
|
||||
// Debounce window for marking a device offline on socket disconnect. Brief
|
||||
// flap (Wi-Fi blip, Engine.IO ping miss, server-side eviction-then-reconnect)
|
||||
|
|
@ -29,11 +31,9 @@ const OFFLINE_DEBOUNCE_MS = 5000;
|
|||
const lastPlayLogAt = new Map();
|
||||
const PLAY_LOG_MIN_GAP_MS = 2000;
|
||||
|
||||
// #142 content-ack dedup. An older app can spam "content <id>: ready" for the same
|
||||
// item; each was logged + emitted individually (secondary load). Suppress identical
|
||||
// (device_id, content_id, status) reports within config.contentAckDedupMs. A status
|
||||
// CHANGE has a different key and passes immediately. In-memory; resets on restart.
|
||||
const lastContentAck = new Map();
|
||||
// #142 dedup + #143 per-device rate budget + global loop-lag valve for content-acks
|
||||
// all live in one control: lib/content-ack-limiter.js (required above as
|
||||
// contentAckLimiter). Kept out of this file so there is a single limiter on the path.
|
||||
const { getUserPlan, getUserDeviceCount } = require('../middleware/subscription');
|
||||
// Phase 2.3: deviceRoom() resolves a device_id to its workspace room so
|
||||
// dashboardNs.emit can be scoped instead of broadcast platform-wide.
|
||||
|
|
@ -585,13 +585,18 @@ module.exports = function setupDeviceSocket(io) {
|
|||
if (!requireDeviceAuth()) return;
|
||||
const { device_id, content_id, status } = data;
|
||||
if (device_id !== currentDeviceId) return;
|
||||
// #142: drop repeats of the same (device, content, status) within the dedup
|
||||
// window. Only a change (new content/status) or a report after the window
|
||||
// logs+emits, so a device spamming the same "ready" can't add load.
|
||||
const ackKey = `${device_id}|${content_id}|${status}`;
|
||||
const nowAck = Date.now();
|
||||
if (nowAck - (lastContentAck.get(ackKey) || 0) < config.contentAckDedupMs) return;
|
||||
lastContentAck.set(ackKey, nowAck);
|
||||
// #142 dedup + #143 per-device rate budget + global critical-lag valve, in one
|
||||
// control. Anything but 'pass' is dropped BEFORE the log+emit (that per-ack work
|
||||
// is the cost we shed). Drops are SILENT except a single line per device per
|
||||
// window when rate-shedding STARTS (re-logging per drop would recreate the
|
||||
// flood). The valve's open/close is logged once at the band edge in loop-lag.
|
||||
const verdict = contentAckLimiter.check(device_id, content_id, status, loopLag.getBand());
|
||||
if (verdict.action !== 'pass') {
|
||||
if (verdict.action === 'shed-rate' && verdict.logStart) {
|
||||
console.warn(`[content-ack] shedding device ${device_id}: ${verdict.observed}/${verdict.budget} per ${config.contentAckRateWindowMs}ms — flood control engaged`);
|
||||
}
|
||||
return;
|
||||
}
|
||||
console.log(`Device ${device_id} content ${content_id}: ${status}`);
|
||||
emitToDeviceWorkspace(dashboardNs, device_id, 'dashboard:content-ack', { device_id, content_id, status });
|
||||
});
|
||||
|
|
|
|||
Loading…
Reference in a new issue