mirror of
https://github.com/screentinker/screentinker.git
synced 2026-06-29 09:23:16 -06:00
#142's content-ack dedup is insufficient: a device cycling 2-4 content IDs makes every ack look unique so dedup never fires, while aggregate volume from ~30 devices saturates the event loop (the #142 reconnect throttle kept the server responsive, which is how this was even observable). Folded ONE control on the content-ack path (no competing limiters; reconnect- throttle.js untouched) in lib/content-ack-limiter.js: - Step 1 — per-device RATE budget: caps TOTAL non-duplicate acks per device per window regardless of differing content_id (the case dedup misses). Over budget = DROP silently (the per-ack log+emit is the cost); log ONCE per device per window when shedding starts. Keeps the #142 dedup (dedup'd repeats don't consume budget). Per-device, in-memory, resets on restart (modeled on lastPlayLogAt; does NOT reuse reconnect-throttle's ban-semantics bucket). Env (TUNING GUESSES, validate vs Bold's fleet): CONTENT_ACK_MAX_PER_WINDOW=20, CONTENT_ACK_RATE_WINDOW_MS=10000 (=2/s, above legit ~<=1/s, below the flood). - Step 2 — global pressure valve: reuses the #142 loop-lag band (+ its hysteresis, no second control loop). Under CRITICAL band, shed content-acks even for an in-budget device; reconnects + dashboard/HTTP are ALWAYS processed; a healthy device in a non-critical band is never touched by the valve. Valve open/close logged once at the band edge in services/loop-lag.js (not per shed message). Tests (unique ports 3985/3986, not the 3982/3983/3984 set): - unit: the #143 regression (cycling ids evading dedup IS rate-limited), under/over budget, dedup still works + doesn't consume budget, valve sheds in-budget under critical while normal is untouched, rate precedence, window reset, per-device isolation. - integration: socket flood is capped to budget with a single shed-start log; under-budget passes every ack; valve OPEN sheds content-acks while a reconnect + /api/status still succeed. Full suite green serial AND parallel (208 tests). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
88 lines
4.5 KiB
JavaScript
88 lines
4.5 KiB
JavaScript
'use strict';
|
|
|
|
// #143 integration — a device cycling DIFFERENT content ids at high rate (the case
|
|
// dedup misses) is rate-limited; an under-budget device passes every ack. Observed
|
|
// via the server log: passing acks log `Device <id> content <cid>: ready`; over-
|
|
// budget drops are silent except ONE `[content-ack] shedding device <id>` line.
|
|
// Normal band (default lag thresholds), unique PORT 3985.
|
|
|
|
const { test, before, after } = require('node:test');
|
|
const assert = require('node:assert/strict');
|
|
const { spawn } = require('node:child_process');
|
|
const path = require('node:path');
|
|
const os = require('node:os');
|
|
const fs = require('node:fs');
|
|
const crypto = require('node:crypto');
|
|
const ioClient = require('socket.io-client');
|
|
|
|
const PORT = 3985;
|
|
const BASE = `http://127.0.0.1:${PORT}`;
|
|
const DATA_DIR = path.join(os.tmpdir(), 'st-flood-' + crypto.randomBytes(4).toString('hex'));
|
|
const LOG = path.join(os.tmpdir(), 'st-flood-' + crypto.randomBytes(4).toString('hex') + '.log');
|
|
let proc;
|
|
const sleep = (ms) => new Promise(r => setTimeout(r, ms));
|
|
|
|
before(async () => {
|
|
const logFd = fs.openSync(LOG, 'w');
|
|
proc = spawn('node', ['server.js'], {
|
|
cwd: path.join(__dirname, '..'),
|
|
env: {
|
|
...process.env, DATA_DIR, SELF_HOSTED: 'true', PORT: String(PORT), NODE_ENV: 'test',
|
|
CONTENT_ACK_MAX_PER_WINDOW: '5', CONTENT_ACK_RATE_WINDOW_MS: '10000', CONTENT_ACK_DEDUP_MS: '50',
|
|
},
|
|
stdio: ['ignore', logFd, logFd],
|
|
});
|
|
let up = false;
|
|
for (let i = 0; i < 80; i++) { try { const r = await fetch(BASE + '/api/status'); if (r.ok) { up = true; break; } } catch { /* */ } await sleep(250); }
|
|
if (!up) throw new Error('server did not boot:\n' + fs.readFileSync(LOG, 'utf8').slice(-2000));
|
|
});
|
|
after(() => { try { proc.kill('SIGKILL'); } catch { /* */ } });
|
|
|
|
function provision() {
|
|
const code = String(crypto.randomInt(100000, 1000000));
|
|
return new Promise((resolve) => {
|
|
const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true });
|
|
sock.on('connect', () => sock.emit('device:register', { pairing_code: code }));
|
|
sock.on('device:registered', (d) => { try { sock.close(); } catch { /* */ } resolve({ id: d.device_id, token: d.device_token }); });
|
|
setTimeout(() => resolve(null), 4000);
|
|
});
|
|
}
|
|
function openRegistered(dev) {
|
|
return new Promise((resolve, reject) => {
|
|
const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true });
|
|
sock.on('connect', () => sock.emit('device:register', { device_id: dev.id, device_token: dev.token, device_info: { app_version: 'test' } }));
|
|
sock.on('device:registered', () => resolve(sock));
|
|
sock.on('device:auth-error', () => reject(new Error('auth-error')));
|
|
setTimeout(() => reject(new Error('register timeout')), 4000);
|
|
});
|
|
}
|
|
const linesFor = (id, needle) => fs.readFileSync(LOG, 'utf8').split('\n').filter(l => l.includes(id) && l.includes(needle)).length;
|
|
|
|
test('#143: cycling 4 different content ids at high rate is rate-limited (dedup misses it)', async () => {
|
|
const dev = await provision();
|
|
assert.ok(dev, 'provisioned');
|
|
const sock = await openRegistered(dev);
|
|
const ids = ['a', 'b', 'c', 'd'];
|
|
// 15 acks, ~100ms apart -> each id repeats every ~400ms (> 50ms dedup) so dedup
|
|
// never fires; budget is 5/10s, so only 5 pass and the rest are shed.
|
|
for (let i = 0; i < 15; i++) { sock.emit('device:content-ack', { device_id: dev.id, content_id: ids[i % 4], status: 'ready' }); await sleep(100); }
|
|
await sleep(400);
|
|
try { sock.close(); } catch { /* */ }
|
|
|
|
const passed = linesFor(dev.id, 'content '); // `Device <id> content <cid>: ready`
|
|
const shedStart = linesFor(dev.id, 'shedding device');
|
|
assert.equal(passed, 5, `exactly the budget (5) passed/logged, got ${passed}`);
|
|
assert.ok(shedStart >= 1, 'a single shed-start line was logged when flood control engaged');
|
|
assert.ok(shedStart === 1, `shed-start logged ONCE per window (no per-drop flood), got ${shedStart}`);
|
|
});
|
|
|
|
test('a device under budget has every ack processed', async () => {
|
|
const dev = await provision();
|
|
const sock = await openRegistered(dev);
|
|
for (const id of ['p', 'q', 'r', 's']) { sock.emit('device:content-ack', { device_id: dev.id, content_id: id, status: 'ready' }); await sleep(60); }
|
|
await sleep(300);
|
|
try { sock.close(); } catch { /* */ }
|
|
assert.equal(linesFor(dev.id, 'content '), 4, 'all 4 under-budget acks processed');
|
|
assert.equal(linesFor(dev.id, 'shedding device'), 0, 'no shedding for an under-budget device');
|
|
});
|