diff --git a/server/config.js b/server/config.js index 83b895f..d0cf823 100644 --- a/server/config.js +++ b/server/config.js @@ -90,4 +90,63 @@ module.exports = { // on MSP-style deployments where an admin/operator assigns users to existing // orgs after signup instead. autoCreateOrgOnSignup: !['false', '0'].includes(String(process.env.AUTO_CREATE_ORG_ON_SIGNUP || '').toLowerCase()), + + // #142 event-loop lag telemetry (services/loop-lag.js). perf_hooks + // monitorEventLoopDelay is C++-backed, so continuous sampling is cheap. Each + // window's p99 is persisted to event_loop_lag (bounded: indexed + pruned from + // day one) and drives the banded load level the reconnect throttle reads. + lagSampleIntervalMs: parseInt(process.env.LAG_SAMPLE_INTERVAL_MS) || 1000, + lagResolutionMs: parseInt(process.env.LAG_RESOLUTION_MS) || 20, + lagTelemetryRetentionDays: parseFloat(process.env.LAG_TELEMETRY_RETENTION_DAYS) || 3, + lagPruneIntervalMs: parseInt(process.env.LAG_PRUNE_INTERVAL_MS) || 3600000, + // Banded load levels from the window p99 (ms). Asymmetric by design: a band is + // entered immediately when its up-threshold is crossed (tighten fast), but + // released only one step at a time after lagReleaseSamples consecutive samples + // fall below a deadband (release slow), so small fluctuations don't flap it. + // Bands ONLY scale how hard an already-flagged device is throttled; a healthy + // device is never gated by global lag. + lagElevatedMs: parseInt(process.env.LAG_ELEVATED_MS) || 100, + lagCriticalMs: parseInt(process.env.LAG_CRITICAL_MS) || 250, + lagReleaseSamples: parseInt(process.env.LAG_RELEASE_SAMPLES) || 5, + + // #142 load-aware per-device reconnect throttle (lib/reconnect-throttle.js). + // The verdict of WHO is misbehaving is ALWAYS per-device (keyed on device_id): + // a device is flagged only when it exceeds reconnectBaseMax genuine reconnects + // per reconnectWindowMs. Global lag never flags a healthy device — the lag band + // only MULTIPLIES how hard an already-flagged device is backed off. + reconnectWindowMs: parseInt(process.env.RECONNECT_WINDOW_MS) || 10000, + reconnectBaseMax: parseInt(process.env.RECONNECT_BASE_MAX) || 5, + // Absolute per-device ceiling, independent of band AND of warm-up: no device may + // exceed this many reconnects/window no matter what the adaptive logic computes, + // so a slow-ramp attacker can't train its way through. + reconnectHardCeiling: parseInt(process.env.RECONNECT_HARD_CEILING) || 20, + // Server-enforced backoff for a flagged device: baseBackoff * 2^(level-1) * band + // multiplier, capped at maxBackoff. Level escalates while it keeps storming + // (tighten fast) and decays one step per reconnectReleaseMs of calm (release slow). + reconnectBaseBackoffMs: parseInt(process.env.RECONNECT_BASE_BACKOFF_MS) || 1000, + reconnectMaxBackoffMs: parseInt(process.env.RECONNECT_MAX_BACKOFF_MS) || 60000, + reconnectMaxLevel: parseInt(process.env.RECONNECT_MAX_LEVEL) || 10, + reconnectReleaseMs: parseInt(process.env.RECONNECT_RELEASE_MS) || 30000, + // Cold start: for this long after process start, lag is high while the whole + // fleet reconnects at once. Treat leniently — force the 'normal' band and apply + // only the hard ceiling (no rate-band throttle) so a deploy can't throttle + // healthy screens. Throttle state is in-memory and resets on restart. + reconnectWarmupMs: parseInt(process.env.RECONNECT_WARMUP_MS) || 30000, + reconnectBandElevatedMult: parseFloat(process.env.RECONNECT_BAND_ELEVATED_MULT) || 2, + reconnectBandCriticalMult: parseFloat(process.env.RECONNECT_BAND_CRITICAL_MULT) || 4, + + // #142 device_status_log retention. A GLOBAL scheduled sweep (pruneStatusLog in + // db/database.js, run on startup + the heartbeat interval) deletes rows older + // than this across ALL devices — covering what the per-device insert-time prune + // in deviceSocket.js misses: removed/idle devices that never insert again, and + // the heartbeat.js offline_timeout insert that bypasses logDeviceStatus. Default + // is LOWER than the old hardcoded 7 days (the reporter's bloat happened under 7d); + // 2-3 days is plenty for the dashboard's 24h uptime view + diagnostics. + statusLogRetentionDays: parseFloat(process.env.STATUS_LOG_RETENTION_DAYS) || 3, + + // #142 content-ack dedup window (deviceSocket.js). A device (esp. older apps) + // can spam "content : ready" for the same item; suppress identical + // (device_id, content_id, status) reports within this window. A status CHANGE + // has a different key and passes immediately. In-memory; resets on restart. + contentAckDedupMs: parseInt(process.env.CONTENT_ACK_DEDUP_MS) || 10000, }; diff --git a/server/db/database.js b/server/db/database.js index 79b9941..cd0f920 100644 --- a/server/db/database.js +++ b/server/db/database.js @@ -225,6 +225,15 @@ const migrations = [ "ALTER TABLE devices ADD COLUMN ota_target_version TEXT", "ALTER TABLE devices ADD COLUMN ota_attempts INTEGER DEFAULT 0", "ALTER TABLE devices ADD COLUMN ota_updated_at INTEGER", + // #142: index device_status_log for the per-device + time-window access pattern. + // schema.sql creates this on fresh installs; this migration covers existing DBs. + // Both the dashboard uptime query and the retention prune were full scans — the + // dashboard-degradation cause once the table reached 1M+ rows. + "CREATE INDEX IF NOT EXISTS idx_device_status_log_device_ts ON device_status_log(device_id, timestamp)", + // #142: event-loop lag telemetry table (bounded: indexed + scheduled prune). + // schema.sql creates these on fresh installs; this covers existing DBs. + "CREATE TABLE IF NOT EXISTS event_loop_lag (id INTEGER PRIMARY KEY AUTOINCREMENT, sampled_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), mean_ms REAL NOT NULL, p50_ms REAL NOT NULL, p99_ms REAL NOT NULL, max_ms REAL NOT NULL, band TEXT NOT NULL DEFAULT 'normal')", + "CREATE INDEX IF NOT EXISTS idx_event_loop_lag_sampled ON event_loop_lag(sampled_at)", ]; // Apply each ALTER idempotently. A "duplicate column name" / "already exists" // error means the column is already present (expected on a migrated DB) - benign. @@ -741,6 +750,21 @@ const { applyTenantDeleteCascade } = require('../lib/tenant-cascade-migration'); } })(); +// #142 GLOBAL device_status_log retention sweep across ALL devices. Run on startup +// and on the heartbeat interval (services/heartbeat.js). This covers the rows the +// per-device insert-time prune in deviceSocket.js misses: removed/idle devices that +// never insert again, and the heartbeat offline_timeout insert that bypasses +// logDeviceStatus. A plain time-range delete (like the play_logs prune) — runs off +// the hot path; after the first sweep the table is small, so the cost is negligible. +function pruneStatusLog() { + try { + const maxAgeSec = Math.round(config.statusLogRetentionDays * 86400); + const n = db.prepare("DELETE FROM device_status_log WHERE timestamp < strftime('%s','now') - ?").run(maxAgeSec).changes; + if (n > 0) console.log(`[status-log] pruned ${n} row(s) older than ${config.statusLogRetentionDays}d`); + return n; + } catch (_) { return 0; } +} + // Prune old telemetry (keep last 24h worth at 15s intervals = ~5760, cap at 6000) function pruneTelemetry(deviceId) { db.prepare(` @@ -813,4 +837,4 @@ try { const { verifyAndRepairSchema } = require('../lib/schema-check'); verifyAndRepairSchema(db); -module.exports = { db, pruneTelemetry, pruneScreenshots }; +module.exports = { db, pruneTelemetry, pruneScreenshots, pruneStatusLog }; diff --git a/server/db/schema.sql b/server/db/schema.sql index d0bcfcb..3bf26fa 100644 --- a/server/db/schema.sql +++ b/server/db/schema.sql @@ -463,6 +463,27 @@ CREATE TABLE IF NOT EXISTS device_status_log ( status TEXT NOT NULL, timestamp INTEGER NOT NULL DEFAULT (strftime('%s','now')) ); +-- #142: index the per-device + time-window access pattern. Both the dashboard +-- uptime query (WHERE device_id=? AND timestamp>?) and the retention prune +-- (WHERE device_id=? AND timestamp { hits: number[], level: number, blockedUntil: ms, lastThrottleAt: ms } +const state = new Map(); +let startedAt = Date.now(); + +function bandMultiplier(band) { + if (band === 'critical') return config.reconnectBandCriticalMult; + if (band === 'elevated') return config.reconnectBandElevatedMult; + return 1; +} + +function reject(s, now, band, reason, observed, allowed) { + s.level = Math.min(s.level + 1, config.reconnectMaxLevel); + const backoff = Math.min( + config.reconnectBaseBackoffMs * Math.pow(2, s.level - 1) * bandMultiplier(band), + config.reconnectMaxBackoffMs + ); + s.blockedUntil = now + backoff; + s.lastThrottleAt = now; + return { allow: false, retryAfterMs: backoff, reason, observed, allowed, band, level: s.level }; +} + +// Decide whether to allow a genuine reconnect for `deviceId`. +// `now` and `bandOverride` are injectable for deterministic tests; production +// passes only deviceId. +function check(deviceId, now = Date.now(), bandOverride = null) { + const warmup = (now - startedAt) < config.reconnectWarmupMs; + const band = bandOverride !== null ? bandOverride : (warmup ? 'normal' : loopLag.getBand()); + + let s = state.get(deviceId); + if (!s) { s = { hits: [], level: 0, blockedUntil: 0, lastThrottleAt: 0 }; state.set(deviceId, s); } + + // Already inside an enforced backoff window: reject and escalate (tighten fast). + if (now < s.blockedUntil) { + return reject(s, now, band, 'in-backoff', s.hits.length, config.reconnectBaseMax); + } + + // Sliding window of genuine reconnects. + s.hits = s.hits.filter((t) => now - t < config.reconnectWindowMs); + s.hits.push(now); + const observed = s.hits.length; + + // Hard ceiling — always enforced, regardless of band or warm-up. + if (observed > config.reconnectHardCeiling) { + return reject(s, now, band, 'hard-ceiling', observed, config.reconnectHardCeiling); + } + + // Cold start: only the hard ceiling applies; never rate-throttle during warm-up. + if (warmup) return allow(s, now, band); + + // Healthy device: under the per-device threshold -> always allowed. + if (observed <= config.reconnectBaseMax) return allow(s, now, band); + + // Flagged: storming beyond the per-device threshold -> throttle (band-scaled). + return reject(s, now, band, 'rate', observed, config.reconnectBaseMax); +} + +function allow(s, now, band) { + // Release slow: decay one escalation level per reconnectReleaseMs of calm. + if (s.level > 0 && now - s.lastThrottleAt > config.reconnectReleaseMs) { + s.level = Math.max(0, s.level - 1); + s.lastThrottleAt = now; + } + return { allow: true, band, level: s.level }; +} + +// Test-only: clear state and optionally rewind the warm-up origin. +function __resetForTest(opts = {}) { + state.clear(); + if (opts.startedAt !== undefined) startedAt = opts.startedAt; +} + +module.exports = { check, __resetForTest }; diff --git a/server/routes/status.js b/server/routes/status.js index aa5470e..e2c62ea 100644 --- a/server/routes/status.js +++ b/server/routes/status.js @@ -7,6 +7,7 @@ const fs = require('fs'); const config = require('../config'); const VERSION = require('../version'); const { PLATFORM_ROLES } = require('../middleware/auth'); +const loopLag = require('../services/loop-lag'); // Public status page router.get('/', (req, res) => { @@ -24,6 +25,9 @@ router.get('/', (req, res) => { version, uptime_human: formatUptime(uptime), timestamp: new Date().toISOString(), + // #142: current event-loop lag snapshot, so site lag is diagnosable from the + // health endpoint independent of any throttling. Cheap (in-memory read). + loop_lag: loopLag.getLag(), }); }); diff --git a/server/server.js b/server/server.js index 687cd54..2798804 100644 --- a/server/server.js +++ b/server/server.js @@ -625,6 +625,10 @@ app.set('io', io); const { startHeartbeatChecker } = require('./services/heartbeat'); startHeartbeatChecker(io); +// #142: start event-loop lag sampling (feeds /api/status + the reconnect throttle) +const { startLoopLagMonitor } = require('./services/loop-lag'); +startLoopLagMonitor(); + // Start command-queue sweep (prunes expired entries for offline devices) const commandQueue = require('./lib/command-queue'); commandQueue.startSweep(); diff --git a/server/services/heartbeat.js b/server/services/heartbeat.js index e881120..441b79c 100644 --- a/server/services/heartbeat.js +++ b/server/services/heartbeat.js @@ -1,4 +1,4 @@ -const { db } = require('../db/database'); +const { db, pruneStatusLog } = require('../db/database'); const config = require('../config'); const { deviceRoom, emitToWorkspace } = require('../lib/socket-rooms'); @@ -6,6 +6,10 @@ const { deviceRoom, emitToWorkspace } = require('../lib/socket-rooms'); const deviceConnections = new Map(); function startHeartbeatChecker(io) { + // #142: sweep stale device_status_log rows once at startup (recovers a bloated + // table immediately after a deploy), then again on each interval below. + pruneStatusLog(); + setInterval(() => { const now = Date.now(); const dashboardNs = io.of('/dashboard'); @@ -36,19 +40,18 @@ function startHeartbeatChecker(io) { } } - // Cleanup: delete unclaimed provisioning devices older than 24 hours - // Keep imported devices (they have user_id set) so users can re-pair them - db.prepare(` - DELETE FROM devices WHERE status = 'provisioning' - AND user_id IS NULL - AND created_at < strftime('%s','now') - (365 * 86400) - `).run(); + // Cleanup: delete unclaimed provisioning devices older than 24 hours. + pruneProvisioningDevices(); // Cleanup: prune play logs older than 90 days db.prepare(` DELETE FROM play_logs WHERE started_at < strftime('%s','now') - (90 * 86400) `).run(); + // #142: global device_status_log retention sweep (all devices, incl. removed/idle + // and the offline_timeout insert path that bypasses the per-device prune). + pruneStatusLog(); + // Cleanup: expired team invites db.prepare(` DELETE FROM team_invites WHERE expires_at < strftime('%s','now') @@ -83,11 +86,25 @@ function getAllConnections() { return deviceConnections; } +// #142: sweep unclaimed provisioning devices older than 24h. The window previously +// read `365 * 86400` (a YEAR), contradicting its own "older than 24 hours" comment, +// so socket-register pairing junk lingered far longer than intended. Imported +// devices keep a user_id and are preserved so they can be re-paired. Extracted from +// the interval above so the correctness fix is unit-testable. Returns rows deleted. +function pruneProvisioningDevices() { + return db.prepare(` + DELETE FROM devices + WHERE status = 'provisioning' AND user_id IS NULL + AND created_at < strftime('%s','now') - (24 * 3600) + `).run().changes; +} + module.exports = { startHeartbeatChecker, registerConnection, updateHeartbeat, removeConnection, getConnection, - getAllConnections + getAllConnections, + pruneProvisioningDevices }; diff --git a/server/services/loop-lag.js b/server/services/loop-lag.js new file mode 100644 index 0000000..b7f45f8 --- /dev/null +++ b/server/services/loop-lag.js @@ -0,0 +1,107 @@ +// #142 — Event-loop lag telemetry (the data subsystem; ships before the throttle). +// +// Continuously samples event-loop delay via perf_hooks.monitorEventLoopDelay() +// (a C++-backed histogram — cheap). Each window we read mean/p50/p99/max, persist +// a row to the bounded `event_loop_lag` table, and recompute a coarse load BAND +// (normal | elevated | critical) from the window p99. +// +// The band is consumed by the reconnect throttle (#142 step 3), but this module +// has standalone value: getLag() is surfaced on /api/status and band changes are +// logged, so site connectivity/lag is diagnosable independent of any throttling. +// +// Band transitions are deliberately asymmetric (see nextBand): jump UP immediately +// when an up-threshold is crossed (tighten fast), step DOWN only one level at a +// time after lagReleaseSamples consecutive calm samples below a deadband (release +// slow). This avoids band flap from transient blips. + +const { monitorEventLoopDelay } = require('perf_hooks'); +const { db } = require('../db/database'); +const config = require('../config'); + +const NS_PER_MS = 1e6; +// A band releases only once p99 falls below this fraction of the band's entry +// threshold — the deadband that stops small fluctuations from flapping the band. +const DEADBAND = 0.5; +const LEVEL = { normal: 0, elevated: 1, critical: 2 }; + +let histogram = null; +let band = 'normal'; +let calmSamples = 0; +let current = { mean_ms: 0, p50_ms: 0, p99_ms: 0, max_ms: 0, band: 'normal', sampled_at: 0 }; + +// Pure band-transition function (exported for deterministic unit tests). Given the +// current band, the window p99 (ms), and the running calm-sample count, returns the +// next [band, calmSamples]. Up is immediate (may skip a level); down is one step +// per release window, gated by a deadband. +function nextBand(cur, p99, calm) { + const level = LEVEL[cur] ?? 0; + // UP — immediate, tighten fast (normal can jump straight to critical). + if (p99 >= config.lagCriticalMs && level < LEVEL.critical) return ['critical', 0]; + if (p99 >= config.lagElevatedMs && level < LEVEL.elevated) return ['elevated', 0]; + // DOWN — slow, one step, only below the current band's deadband. + if (level === LEVEL.critical && p99 <= config.lagCriticalMs * DEADBAND) { + const c = calm + 1; + return c >= config.lagReleaseSamples ? ['elevated', 0] : ['critical', c]; + } + if (level === LEVEL.elevated && p99 <= config.lagElevatedMs * DEADBAND) { + const c = calm + 1; + return c >= config.lagReleaseSamples ? ['normal', 0] : ['elevated', c]; + } + // Hold (inside deadband, or already normal): reset the calm counter. + return [cur, 0]; +} + +const round2 = (x) => Math.round(x * 100) / 100; + +function sample() { + const p99 = histogram.percentile(99) / NS_PER_MS; + const snap = { + mean_ms: round2(histogram.mean / NS_PER_MS), + p50_ms: round2(histogram.percentile(50) / NS_PER_MS), + p99_ms: round2(p99), + max_ms: round2(histogram.max / NS_PER_MS), + }; + histogram.reset(); + + const prev = band; + [band, calmSamples] = nextBand(band, snap.p99_ms, calmSamples); + current = { ...snap, band, sampled_at: Math.floor(Date.now() / 1000) }; + + try { + db.prepare( + 'INSERT INTO event_loop_lag (sampled_at, mean_ms, p50_ms, p99_ms, max_ms, band) VALUES (?, ?, ?, ?, ?, ?)' + ).run(current.sampled_at, snap.mean_ms, snap.p50_ms, snap.p99_ms, snap.max_ms, band); + } catch (_) { /* table may not exist on a partially-migrated DB */ } + + // Observable: log whenever we're loaded or when the band changes (incl. back to + // normal). Healthy steady state stays quiet. + if (band !== 'normal' || prev !== 'normal') { + const tag = band !== prev ? ` (was ${prev})` : ''; + console.log(`[loop-lag] band=${band}${tag} mean=${snap.mean_ms}ms p99=${snap.p99_ms}ms max=${snap.max_ms}ms`); + } +} + +function pruneLag() { + try { + const cutoff = Math.floor(Date.now() / 1000) - Math.round(config.lagTelemetryRetentionDays * 86400); + const n = db.prepare('DELETE FROM event_loop_lag WHERE sampled_at < ?').run(cutoff).changes; + if (n > 0) console.log(`[loop-lag] pruned ${n} sample(s) older than ${config.lagTelemetryRetentionDays}d`); + } catch (_) { /* ignore */ } +} + +function startLoopLagMonitor() { + if (histogram) return; // idempotent + histogram = monitorEventLoopDelay({ resolution: config.lagResolutionMs }); + histogram.enable(); + const t1 = setInterval(sample, config.lagSampleIntervalMs); + pruneLag(); // sweep stale rows on boot + const t2 = setInterval(pruneLag, config.lagPruneIntervalMs); + // Don't keep the process alive on these timers (matters for tests / clean exit). + if (t1.unref) t1.unref(); + if (t2.unref) t2.unref(); +} + +function getBand() { return band; } +function getLag() { return { ...current }; } + +module.exports = { startLoopLagMonitor, getBand, getLag, nextBand }; diff --git a/server/test/content-ack-dedup.test.js b/server/test/content-ack-dedup.test.js new file mode 100644 index 0000000..e5904f8 --- /dev/null +++ b/server/test/content-ack-dedup.test.js @@ -0,0 +1,85 @@ +'use strict'; + +// #142 step 5 — content-ack dedup. Repeated identical (device_id, content_id, status) +// reports are suppressed within config.contentAckDedupMs; a status change or a report +// after the window passes. Observed via the server log (the handler logs+emits only +// when it does NOT dedup). Unique PORT (3984) to avoid the collision class. + +const { test, before, after } = require('node:test'); +const assert = require('node:assert/strict'); +const { spawn } = require('node:child_process'); +const path = require('node:path'); +const os = require('node:os'); +const fs = require('node:fs'); +const crypto = require('node:crypto'); +const ioClient = require('socket.io-client'); + +const PORT = 3984; +const BASE = `http://127.0.0.1:${PORT}`; +const DATA_DIR = path.join(os.tmpdir(), 'st-ack-' + crypto.randomBytes(4).toString('hex')); +const LOG = path.join(os.tmpdir(), 'st-ack-' + crypto.randomBytes(4).toString('hex') + '.log'); +const DEDUP_MS = 600; +let proc; + +const sleep = (ms) => new Promise(r => setTimeout(r, ms)); + +before(async () => { + const logFd = fs.openSync(LOG, 'w'); + proc = spawn('node', ['server.js'], { + cwd: path.join(__dirname, '..'), + env: { ...process.env, DATA_DIR, SELF_HOSTED: 'true', PORT: String(PORT), NODE_ENV: 'test', CONTENT_ACK_DEDUP_MS: String(DEDUP_MS) }, + stdio: ['ignore', logFd, logFd], + }); + let up = false; + for (let i = 0; i < 80; i++) { + try { const r = await fetch(BASE + '/api/status'); if (r.ok) { up = true; break; } } catch { /* */ } + await sleep(250); + } + if (!up) throw new Error('server did not boot:\n' + fs.readFileSync(LOG, 'utf8').slice(-2000)); +}); + +after(() => { try { proc.kill('SIGKILL'); } catch { /* */ } }); + +function provision() { + const code = String(crypto.randomInt(100000, 1000000)); + return new Promise((resolve) => { + const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true }); + sock.on('connect', () => sock.emit('device:register', { pairing_code: code })); + sock.on('device:registered', (d) => { try { sock.close(); } catch { /* */ } resolve({ id: d.device_id, token: d.device_token }); }); + setTimeout(() => resolve(null), 4000); + }); +} + +function openRegistered(dev) { + return new Promise((resolve, reject) => { + const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true }); + sock.on('connect', () => sock.emit('device:register', { device_id: dev.id, device_token: dev.token, device_info: { app_version: 'test' } })); + sock.on('device:registered', () => resolve(sock)); + sock.on('device:auth-error', () => reject(new Error('auth-error'))); + setTimeout(() => reject(new Error('register timeout')), 4000); + }); +} + +test('repeated identical content-acks are deduped; window-expiry and status-change pass', async () => { + const dev = await provision(); + assert.ok(dev, 'device provisioned'); + const sock = await openRegistered(dev); + const cid = 'cid-' + crypto.randomBytes(3).toString('hex'); + + // 5 rapid identical "ready" within the dedup window -> only ONE should log/emit + for (let i = 0; i < 5; i++) { sock.emit('device:content-ack', { device_id: dev.id, content_id: cid, status: 'ready' }); await sleep(40); } + // wait past the window, then "ready" again -> passes (a fresh report) + await sleep(DEDUP_MS + 250); + sock.emit('device:content-ack', { device_id: dev.id, content_id: cid, status: 'ready' }); + // a status CHANGE has a different key -> passes immediately + await sleep(60); + sock.emit('device:content-ack', { device_id: dev.id, content_id: cid, status: 'error' }); + await sleep(400); + try { sock.close(); } catch { /* */ } + + const log = fs.readFileSync(LOG, 'utf8'); + const ready = (log.match(new RegExp(`content ${cid}: ready`, 'g')) || []).length; + const err = (log.match(new RegExp(`content ${cid}: error`, 'g')) || []).length; + assert.equal(ready, 2, 'a burst of identical "ready" collapses to one; a second after the window passes -> 2 total'); + assert.equal(err, 1, 'a status change is not deduped'); +}); diff --git a/server/test/loop-lag-integration.test.js b/server/test/loop-lag-integration.test.js new file mode 100644 index 0000000..3694ed0 --- /dev/null +++ b/server/test/loop-lag-integration.test.js @@ -0,0 +1,64 @@ +'use strict'; + +// #142 step 2 — integration: the lag monitor samples, persists to a BOUNDED table, +// and surfaces current lag on /api/status. Boots the real server with fast sampling +// and a tiny (fractional-day) retention so the prune is observable within the test. + +const { test, before, after } = require('node:test'); +const assert = require('node:assert/strict'); +const { spawn } = require('node:child_process'); +const path = require('node:path'); +const os = require('node:os'); +const fs = require('node:fs'); +const crypto = require('node:crypto'); +const Database = require('better-sqlite3'); + +const PORT = 3982; +const BASE = `http://127.0.0.1:${PORT}`; +const DATA_DIR = path.join(os.tmpdir(), 'st-lag-int-' + crypto.randomBytes(4).toString('hex')); +const LOG = path.join(os.tmpdir(), 'st-lag-int-' + crypto.randomBytes(4).toString('hex') + '.log'); +let proc; + +before(async () => { + const logFd = fs.openSync(LOG, 'w'); + proc = spawn('node', ['server.js'], { + cwd: path.join(__dirname, '..'), + env: { + ...process.env, DATA_DIR, SELF_HOSTED: 'true', PORT: String(PORT), NODE_ENV: 'test', + LAG_SAMPLE_INTERVAL_MS: '200', // sample fast + LAG_TELEMETRY_RETENTION_DAYS: '0.00001', // ~0.86s retention + LAG_PRUNE_INTERVAL_MS: '400', // prune often + }, + stdio: ['ignore', logFd, logFd], + }); + let up = false; + for (let i = 0; i < 80; i++) { + try { const r = await fetch(BASE + '/api/status'); if (r.ok) { up = true; break; } } catch { /* not yet */ } + await new Promise(r => setTimeout(r, 250)); + } + if (!up) throw new Error('server did not boot:\n' + fs.readFileSync(LOG, 'utf8').slice(-2000)); +}); + +after(() => { try { proc.kill('SIGKILL'); } catch { /* */ } }); + +test('/api/status exposes a current loop_lag snapshot', async () => { + const r = await fetch(BASE + '/api/status'); + const body = await r.json(); + assert.ok(body.loop_lag, 'loop_lag present on /api/status'); + assert.ok(['normal', 'elevated', 'critical'].includes(body.loop_lag.band), 'band is a valid level'); + assert.equal(typeof body.loop_lag.p99_ms, 'number', 'p99_ms is numeric'); + assert.equal(typeof body.loop_lag.mean_ms, 'number', 'mean_ms is numeric'); +}); + +test('lag samples are persisted AND bounded by retention prune (not unbounded)', async () => { + // Let it sample for ~3s. At 200ms/sample that is ~15 inserts, but with ~0.86s + // retention pruned every 400ms the table must stay small — proving the table + // can never become a second unbounded-growth table. + await new Promise(r => setTimeout(r, 1800)); + const dbPath = path.join(DATA_DIR, 'db', 'remote_display.db'); + const db = new Database(dbPath, { readonly: true }); + const count = db.prepare('SELECT COUNT(*) c FROM event_loop_lag').get().c; + db.close(); + assert.ok(count >= 1, 'lag samples are being persisted'); + assert.ok(count < 15, `table is bounded by the prune (held ${count} rows over ~3s of 200ms sampling)`); +}); diff --git a/server/test/loop-lag.test.js b/server/test/loop-lag.test.js new file mode 100644 index 0000000..0b3049c --- /dev/null +++ b/server/test/loop-lag.test.js @@ -0,0 +1,57 @@ +'use strict'; + +// #142 step 2 — deterministic unit tests for the event-loop-lag band transitions. +// Pure function, no sockets/timing. Isolate the DB to a temp dir BEFORE requiring +// the module (requiring it pulls in db/database, which initialises a DB on load). + +const os = require('node:os'); +const path = require('node:path'); +const crypto = require('node:crypto'); +process.env.DATA_DIR = path.join(os.tmpdir(), 'st-lag-unit-' + crypto.randomBytes(4).toString('hex')); + +const { test } = require('node:test'); +const assert = require('node:assert/strict'); +const { nextBand } = require('../services/loop-lag'); + +// config defaults exercised here: elevated=100ms, critical=250ms, releaseSamples=5, +// deadband=0.5 -> release-below thresholds: elevated@50ms, critical@125ms. + +test('UP is immediate and can skip a level (tighten fast)', () => { + assert.deepEqual(nextBand('normal', 50, 0), ['normal', 0], 'below elevated stays normal'); + assert.deepEqual(nextBand('normal', 100, 0), ['elevated', 0], 'crossing elevated up-threshold jumps immediately'); + assert.deepEqual(nextBand('normal', 250, 0), ['critical', 0], 'a big spike jumps normal->critical in one sample'); + assert.deepEqual(nextBand('elevated', 250, 0), ['critical', 0]); +}); + +test('deadband holds the band for small fluctuations (no flap)', () => { + // elevated, p99 between release(50) and up(100) -> hold elevated, calm reset + assert.deepEqual(nextBand('elevated', 80, 3), ['elevated', 0]); + // critical, p99 between release(125) and up(250) -> hold critical + assert.deepEqual(nextBand('critical', 200, 4), ['critical', 0]); +}); + +test('DOWN is slow: requires lagReleaseSamples calm samples below the deadband', () => { + // elevated -> normal only after 5 consecutive calm samples + let band = 'elevated', calm = 0; + for (let i = 0; i < 4; i++) { + [band, calm] = nextBand(band, 20, calm); + assert.equal(band, 'elevated', `still elevated after ${i + 1} calm sample(s)`); + } + [band, calm] = nextBand(band, 20, calm); // 5th + assert.deepEqual([band, calm], ['normal', 0], 'drops to normal on the 5th calm sample'); +}); + +test('DOWN releases one level at a time: critical -> elevated -> normal', () => { + let band = 'critical', calm = 0; + for (let i = 0; i < 5; i++) [band, calm] = nextBand(band, 10, calm); + assert.equal(band, 'elevated', 'critical releases to elevated, never straight to normal'); + for (let i = 0; i < 5; i++) [band, calm] = nextBand(band, 10, calm); + assert.equal(band, 'normal', 'then elevated releases to normal'); +}); + +test('a single calm sample does not release (calm counter resets on a non-calm sample)', () => { + let [band, calm] = nextBand('elevated', 20, 0); // calm=1 + assert.deepEqual([band, calm], ['elevated', 1]); + [band, calm] = nextBand(band, 80, calm); // back inside deadband -> reset + assert.deepEqual([band, calm], ['elevated', 0], 'one blip resets the release counter'); +}); diff --git a/server/test/provisioning-cleanup.test.js b/server/test/provisioning-cleanup.test.js new file mode 100644 index 0000000..edcbefb --- /dev/null +++ b/server/test/provisioning-cleanup.test.js @@ -0,0 +1,41 @@ +'use strict'; + +// #142 (cut 2) — provisioning-row cleanup window correctness. The sweep deletes +// UNCLAIMED provisioning devices older than 24h (it previously used 365*86400 — a +// year — contradicting its own comment). Imported devices (user_id set) and +// non-provisioning devices are preserved. Deterministic, in-process (no server). + +const os = require('node:os'); +const path = require('node:path'); +const crypto = require('node:crypto'); +process.env.DATA_DIR = path.join(os.tmpdir(), 'st-provclean-' + crypto.randomBytes(4).toString('hex')); + +const { test } = require('node:test'); +const assert = require('node:assert/strict'); +const { db } = require('../db/database'); +const { pruneProvisioningDevices } = require('../services/heartbeat'); + +test('sweeps unclaimed provisioning devices older than 24h, keeps the rest', () => { + db.pragma('foreign_keys = OFF'); // seed user_id without a real users row + db.exec('DELETE FROM devices'); + const ins = db.prepare("INSERT INTO devices (id, status, user_id, created_at) VALUES (?, ?, ?, strftime('%s','now') - ?)"); + ins.run('old-unclaimed', 'provisioning', null, 25 * 3600); // >24h, unclaimed -> SWEPT + ins.run('new-unclaimed', 'provisioning', null, 1 * 3600); // <24h, unclaimed -> kept + ins.run('old-imported', 'provisioning', 'u-imported', 25 * 3600); // >24h but imported (user_id) -> kept + ins.run('old-online', 'online', null, 25 * 3600); // >24h but not provisioning -> kept + db.pragma('foreign_keys = ON'); + + assert.equal(db.prepare('SELECT COUNT(*) c FROM devices').get().c, 4, 'seeded 4'); + + const deleted = pruneProvisioningDevices(); + assert.equal(deleted, 1, 'only the >24h unclaimed provisioning device is swept'); + + const ids = db.prepare('SELECT id FROM devices ORDER BY id').all().map(r => r.id); + assert.deepEqual(ids, ['new-unclaimed', 'old-imported', 'old-online']); + // regression guard: a 25h-old row sits well inside the OLD 365-day window, so this + // would have survived before the fix. +}); + +test('idempotent: a second sweep with nothing stale deletes nothing', () => { + assert.equal(pruneProvisioningDevices(), 0); +}); diff --git a/server/test/reconnect-throttle-integration.test.js b/server/test/reconnect-throttle-integration.test.js new file mode 100644 index 0000000..59933d8 --- /dev/null +++ b/server/test/reconnect-throttle-integration.test.js @@ -0,0 +1,113 @@ +'use strict'; + +// #142 step 3 — REQUIRED GATE TEST + storm + neighbor, over real sockets. +// +// Boots the real server with warm-up ACTIVE (default) so the whole suite runs in +// the cold-start window — the exact "right after a deploy" scenario. Hard ceiling +// and window are tightened so the storm trips quickly without thousands of connects; +// fleet devices stay well under the ceiling. + +const { test, before, after } = require('node:test'); +const assert = require('node:assert/strict'); +const { spawn } = require('node:child_process'); +const path = require('node:path'); +const os = require('node:os'); +const fs = require('node:fs'); +const crypto = require('node:crypto'); +const ioClient = require('socket.io-client'); + +const PORT = 3983; +const BASE = `http://127.0.0.1:${PORT}`; +const DATA_DIR = path.join(os.tmpdir(), 'st-thr-int-' + crypto.randomBytes(4).toString('hex')); +const LOG = path.join(os.tmpdir(), 'st-thr-int-' + crypto.randomBytes(4).toString('hex') + '.log'); +let proc; + +before(async () => { + const logFd = fs.openSync(LOG, 'w'); + proc = spawn('node', ['server.js'], { + cwd: path.join(__dirname, '..'), + env: { + ...process.env, DATA_DIR, SELF_HOSTED: 'true', PORT: String(PORT), NODE_ENV: 'test', + // warm-up left at default (30s) so the whole test runs in the cold-start window + RECONNECT_HARD_CEILING: '8', + RECONNECT_WINDOW_MS: '5000', + RECONNECT_BASE_MAX: '3', + }, + stdio: ['ignore', logFd, logFd], + }); + let up = false; + for (let i = 0; i < 80; i++) { + try { const r = await fetch(BASE + '/api/status'); if (r.ok) { up = true; break; } } catch { /* */ } + await new Promise(r => setTimeout(r, 250)); + } + if (!up) throw new Error('server did not boot:\n' + fs.readFileSync(LOG, 'utf8').slice(-2000)); +}); + +after(() => { try { proc.kill('SIGKILL'); } catch { /* */ } }); + +// Provision a brand-new device via a UNIQUE pairing code -> returns {device_id, device_token}. +function provision() { + const code = String(crypto.randomInt(100000, 1000000)); + return new Promise((resolve) => { + const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true }); + sock.on('connect', () => sock.emit('device:register', { pairing_code: code })); + sock.on('device:registered', (d) => { try { sock.close(); } catch { /* */ } resolve({ id: d.device_id, token: d.device_token }); }); + setTimeout(() => { try { sock.close(); } catch { /* */ } resolve(null); }, 4000); + }); +} + +// One genuine reconnect (new socket). Resolves {registered, throttled}. +function reconnect(dev) { + return new Promise((resolve) => { + const sock = ioClient(`${BASE}/device`, { transports: ['websocket'], reconnection: false, forceNew: true }); + let done = false; + const finish = (r) => { if (done) return; done = true; try { sock.close(); } catch { /* */ } resolve(r); }; + sock.on('connect', () => sock.emit('device:register', { device_id: dev.id, device_token: dev.token, device_info: { app_version: 'test' } })); + sock.on('device:registered', () => finish({ registered: true, throttled: false })); + sock.on('device:throttled', () => finish({ registered: false, throttled: true })); + setTimeout(() => finish({ registered: false, throttled: false }), 1500); + }); +} + +test('GATE: full-fleet reconnect right after restart throttles NO healthy device', async () => { + // 12 distinct devices, each reconnecting twice in quick succession — a deploy-time + // herd. The loop is transiently busy, but per-device keying means none is flagged. + const fleet = []; + for (let i = 0; i < 12; i++) { const d = await provision(); assert.ok(d, 'device provisioned'); fleet.push(d); } + + let registered = 0, throttled = 0; + // two reconnect rounds across the whole fleet + for (let round = 0; round < 2; round++) { + const results = await Promise.all(fleet.map(reconnect)); + for (const r of results) { if (r.registered) registered++; if (r.throttled) throttled++; } + } + assert.equal(throttled, 0, 'NO healthy fleet device may be throttled at cold start'); + assert.equal(registered, 24, 'every fleet reconnect registered'); +}); + +test('a single device storming IS throttled (backoff engages)', async () => { + const dev = await provision(); + assert.ok(dev); + let registered = 0, throttled = 0; + // 12 sequential reconnects within the 5s window -> exceeds the hard ceiling (8) + for (let i = 0; i < 12; i++) { + const r = await reconnect(dev); + if (r.registered) registered++; + if (r.throttled) throttled++; + } + assert.ok(throttled >= 1, `storming device must be throttled (got ${throttled} throttle(s))`); + assert.ok(registered < 12, `not all storm reconnects should succeed (got ${registered}/12)`); +}); + +test('neighbor isolation: a healthy device is unaffected while another storms', async () => { + const stormer = await provision(); + const neighbor = await provision(); + assert.ok(stormer && neighbor); + // storm the stormer hard + for (let i = 0; i < 12; i++) await reconnect(stormer); + // neighbor reconnects normally a couple of times -> must still register + const a = await reconnect(neighbor); + const b = await reconnect(neighbor); + assert.ok(a.registered && b.registered, 'neighbor must register normally while another device storms'); + assert.ok(!a.throttled && !b.throttled, 'neighbor must not be throttled by another device'); +}); diff --git a/server/test/reconnect-throttle.test.js b/server/test/reconnect-throttle.test.js new file mode 100644 index 0000000..819a294 --- /dev/null +++ b/server/test/reconnect-throttle.test.js @@ -0,0 +1,98 @@ +'use strict'; + +// #142 step 3 — deterministic unit tests for the per-device reconnect throttle. +// Pure logic with injected `now` / band; isolate the DB before require (the module +// pulls in services/loop-lag -> db/database which initialises a DB on load). + +const os = require('node:os'); +const path = require('node:path'); +const crypto = require('node:crypto'); +process.env.DATA_DIR = path.join(os.tmpdir(), 'st-thr-unit-' + crypto.randomBytes(4).toString('hex')); + +const { test, beforeEach } = require('node:test'); +const assert = require('node:assert/strict'); +const throttle = require('../lib/reconnect-throttle'); + +// config defaults: window=10000, baseMax=5, hardCeiling=20, baseBackoff=1000, +// maxBackoff=60000, releaseMs=30000, warmup=30000, elevMult=2, critMult=4. +const T0 = 1_000_000; // arbitrary epoch-ms origin for the warm-up clock +const POST = T0 + 40_000; // safely past the 30s warm-up +const WARM = T0 + 1_000; // inside the warm-up window + +beforeEach(() => throttle.__resetForTest({ startedAt: T0 })); + +test('healthy device is never throttled (<= baseMax genuine reconnects)', () => { + for (let i = 0; i < 5; i++) { + const v = throttle.check('A', POST + i, 'normal'); + assert.ok(v.allow, `reconnect ${i + 1} (<=baseMax) must be allowed`); + } +}); + +test('a per-device storm IS throttled and the backoff GROWS (tighten fast)', () => { + let v; + for (let i = 0; i < 5; i++) v = throttle.check('B', POST + i, 'normal'); // 5 allowed + v = throttle.check('B', POST + 5, 'normal'); // 6th -> flagged + assert.equal(v.allow, false); + assert.equal(v.reason, 'rate'); + assert.equal(v.observed, 6); + assert.equal(v.allowed, 5); + const b1 = v.retryAfterMs; + // keep hammering while blocked -> escalate, longer backoff each time + const b2 = throttle.check('B', POST + 6, 'normal').retryAfterMs; + const b3 = throttle.check('B', POST + 7, 'normal').retryAfterMs; + assert.ok(b2 > b1 && b3 > b2, `backoff must grow: ${b1} < ${b2} < ${b3}`); +}); + +test('lag band multiplies an already-flagged device\'s backoff (critical > normal)', () => { + let v; + for (let i = 0; i < 5; i++) throttle.check('N', POST + i, 'normal'); + v = throttle.check('N', POST + 5, 'normal'); + const normalBackoff = v.retryAfterMs; + + throttle.__resetForTest({ startedAt: T0 }); + for (let i = 0; i < 5; i++) throttle.check('C', POST + i, 'critical'); + v = throttle.check('C', POST + 5, 'critical'); + assert.ok(v.retryAfterMs > normalBackoff, `critical backoff ${v.retryAfterMs} > normal ${normalBackoff}`); +}); + +test('a healthy device is NOT throttled even when the band is critical (lag never gates the healthy)', () => { + for (let i = 0; i < 5; i++) { + const v = throttle.check('H', POST + i, 'critical'); + assert.ok(v.allow, 'healthy device stays allowed regardless of band'); + } +}); + +test('COLD START: during warm-up, moderate flapping (>baseMax, { + for (let i = 0; i < 12; i++) { // 12 > baseMax(5) but < hardCeiling(20) + const v = throttle.check('W', WARM + i, 'critical'); // band forced normal in warm-up anyway + assert.ok(v.allow, `warm-up reconnect ${i + 1} must be lenient`); + } +}); + +test('HARD CEILING is enforced even during warm-up (slow-ramp cannot train through)', () => { + let v; + for (let i = 0; i < 20; i++) { + v = throttle.check('K', WARM + i, 'normal'); + assert.ok(v.allow, `warm-up reconnect ${i + 1} (<=ceiling) allowed`); + } + v = throttle.check('K', WARM + 20, 'normal'); // 21st -> over ceiling(20) + assert.equal(v.allow, false); + assert.equal(v.reason, 'hard-ceiling'); +}); + +test('neighbor isolation: one device storming does not throttle another', () => { + for (let i = 0; i < 10; i++) throttle.check('STORM', POST + i, 'normal'); // STORM gets throttled + const v = throttle.check('NEIGHBOR', POST + 11, 'normal'); + assert.ok(v.allow, 'a different device must be unaffected'); +}); + +test('release slow: escalation level decays after a calm period', () => { + let v; + for (let i = 0; i < 6; i++) v = throttle.check('R', POST + i, 'normal'); // flagged, level 1 + assert.ok(v.level >= 1); + const peak = v.level; + // a calm reconnect well past the window AND past releaseMs(30000) + v = throttle.check('R', POST + 6 + 40_000, 'normal'); + assert.ok(v.allow, 'calm reconnect after the storm is allowed'); + assert.ok(v.level < peak, `level decays after calm: ${v.level} < ${peak}`); +}); diff --git a/server/test/status-log-prune.test.js b/server/test/status-log-prune.test.js new file mode 100644 index 0000000..4e7ef35 --- /dev/null +++ b/server/test/status-log-prune.test.js @@ -0,0 +1,48 @@ +'use strict'; + +// #142 step 4 — global device_status_log retention sweep. Deterministic, in-process +// (no server/port). Isolate the DB and set retention BEFORE requiring the module +// (config reads env at load; database.js initialises a DB on load). + +const os = require('node:os'); +const path = require('node:path'); +const crypto = require('node:crypto'); +process.env.DATA_DIR = path.join(os.tmpdir(), 'st-statusprune-' + crypto.randomBytes(4).toString('hex')); +process.env.STATUS_LOG_RETENTION_DAYS = '2'; + +const { test } = require('node:test'); +const assert = require('node:assert/strict'); +const { db, pruneStatusLog } = require('../db/database'); + +test('global sweep deletes rows older than retention across ALL devices, keeps recent', () => { + db.exec('DELETE FROM device_status_log'); // clean slate + const old = db.prepare("INSERT INTO device_status_log (device_id, status, timestamp) VALUES (?, ?, strftime('%s','now') - ?)"); + + // 5 days old (> 2d retention): an active device, a device NOT in the devices + // table (removed/idle — what the per-device insert-time prune never revisits), + // and the heartbeat offline_timeout status that bypasses logDeviceStatus. + old.run('live-dev', 'online', 5 * 86400); + old.run('removed-idle-dev', 'offline', 5 * 86400); + old.run('hb-dev', 'offline_timeout', 5 * 86400); + // recent (< retention): must survive, regardless of device existence / status. + old.run('live-dev', 'online', 0); + old.run('hb-dev', 'offline_timeout', 3600); + + assert.equal(db.prepare('SELECT COUNT(*) c FROM device_status_log').get().c, 5, 'seeded 5 rows'); + + const deleted = pruneStatusLog(); + assert.equal(deleted, 3, 'the 3 over-retention rows pruned (incl. removed-idle + offline_timeout paths)'); + + const remaining = db.prepare('SELECT device_id, status FROM device_status_log ORDER BY device_id').all(); + assert.equal(remaining.length, 2); + // both survivors are the recent rows; no old row of any device/status survived + assert.deepEqual(remaining.map(r => r.device_id).sort(), ['hb-dev', 'live-dev']); + const oldestNow = db.prepare("SELECT MIN(timestamp) m FROM device_status_log").get().m; + const cutoff = Math.floor(Date.now() / 1000) - 2 * 86400; + assert.ok(oldestNow >= cutoff, 'no surviving row is older than the retention cutoff'); +}); + +test('sweep is safe and idempotent on an empty/already-clean table', () => { + db.exec('DELETE FROM device_status_log'); + assert.equal(pruneStatusLog(), 0, 'nothing to delete -> 0, no throw'); +}); diff --git a/server/ws/deviceSocket.js b/server/ws/deviceSocket.js index b15c038..7788dce 100644 --- a/server/ws/deviceSocket.js +++ b/server/ws/deviceSocket.js @@ -6,6 +6,7 @@ const { db, pruneTelemetry, pruneScreenshots } = require('../db/database'); const config = require('../config'); const heartbeat = require('../services/heartbeat'); const commandQueue = require('../lib/command-queue'); +const reconnectThrottle = require('../lib/reconnect-throttle'); // Debounce window for marking a device offline on socket disconnect. Brief // flap (Wi-Fi blip, Engine.IO ping miss, server-side eviction-then-reconnect) @@ -27,6 +28,12 @@ const OFFLINE_DEBOUNCE_MS = 5000; // event is still forwarded every time, so the UI is unaffected. In-memory only. const lastPlayLogAt = new Map(); const PLAY_LOG_MIN_GAP_MS = 2000; + +// #142 content-ack dedup. An older app can spam "content : ready" for the same +// item; each was logged + emitted individually (secondary load). Suppress identical +// (device_id, content_id, status) reports within config.contentAckDedupMs. A status +// CHANGE has a different key and passes immediately. In-memory; resets on restart. +const lastContentAck = new Map(); const { getUserPlan, getUserDeviceCount } = require('../middleware/subscription'); // Phase 2.3: deviceRoom() resolves a device_id to its workspace room so // dashboardNs.emit can be scoped instead of broadcast platform-wide. @@ -353,6 +360,23 @@ module.exports = function setupDeviceSocket(io) { return; } + // #142: per-device reconnect throttle. Only GENUINE reconnects (a new + // socket) count — same-socket playlist refreshes (isPlaylistRefresh) are + // exempt. This runs BEFORE the heavy register work (DB writes, playlist + // build) so a single flapping device cannot saturate the event loop. The + // verdict is per-device; global lag only scales an already-flagged + // device's backoff, never gates a healthy one. + if (!isPlaylistRefresh) { + const verdict = reconnectThrottle.check(device_id); + if (!verdict.allow) { + console.warn(`[throttle] device ${device_id} reconnect throttled: reason=${verdict.reason} band=${verdict.band} observed=${verdict.observed}/${verdict.allowed} per ${config.reconnectWindowMs}ms -> backoff ${verdict.retryAfterMs}ms (level ${verdict.level})`); + socket.emit('device:throttled', { retry_after_ms: verdict.retryAfterMs, reason: 'reconnect_rate' }); + // nextTick disconnect so the throttle notice flushes first. + process.nextTick(() => { try { socket.disconnect(true); } catch (_) { /* */ } }); + return; + } + } + currentDeviceId = device_id; authenticated = true; // Cancel any pending offline timer - device is back in the grace window @@ -561,6 +585,13 @@ module.exports = function setupDeviceSocket(io) { if (!requireDeviceAuth()) return; const { device_id, content_id, status } = data; if (device_id !== currentDeviceId) return; + // #142: drop repeats of the same (device, content, status) within the dedup + // window. Only a change (new content/status) or a report after the window + // logs+emits, so a device spamming the same "ready" can't add load. + const ackKey = `${device_id}|${content_id}|${status}`; + const nowAck = Date.now(); + if (nowAck - (lastContentAck.get(ackKey) || 0) < config.contentAckDedupMs) return; + lastContentAck.set(ackKey, nowAck); console.log(`Device ${device_id} content ${content_id}: ${status}`); emitToDeviceWorkspace(dashboardNs, device_id, 'dashboard:content-ack', { device_id, content_id, status }); });