From 742d8c4b09d5284f790579b9c2665ce9c64e6dba Mon Sep 17 00:00:00 2001 From: ScreenTinker Date: Thu, 14 May 2026 13:06:43 -0500 Subject: [PATCH] feat(socket): delivery queue for offline-device emits Short-lived per-device queue covers the TV-flap window (issue #3): when a device is mid-reconnect, prior code emitted to an empty room and the event vanished. Now playlist-updates and commands targeting an offline device are queued and flushed in order on the next device:register for that device_id. server/lib/command-queue.js (new): - pendingPlaylistUpdate: per-device marker (rebuild via builder on flush -> always fresh DB state, no stale snapshots) - pendingCommands: per-device Map with last-of-type dedup (most recent screen_off wins) - TTL via COMMAND_QUEUE_TTL_MS env (default 30000) - Active sweep every 30s prunes expired entries Memory bounds: ~6 entries per device worst case (1 playlist marker + 5 command types), unref'd sweep timer. Wired emit sites (8 total; the four direct socket.emit calls in deviceSocket register handlers are intentionally NOT queued because the socket is alive by definition at those points): - server/routes/video-walls.js (pushWallPayloadToDevice) - server/routes/device-groups.js (pushPlaylistToDevice) - server/routes/content.js (content-delete fan-out) - server/routes/playlists.js (pushToDevices + assign) - server/services/scheduler.js (scheduled rotations) - server/ws/deviceSocket.js x2 (wall leader reclaim/reassign) server/ws/deviceSocket.js register paths now call flushQueue after heartbeat.registerConnection + socket.join. Existing socket.emit('device:playlist-update', ...) lines kept - they send the initial state on register; the flush replays any queued events. Player's handlePlaylistUpdate fingerprint check dedupes the overlap. Refs #3 Co-Authored-By: Claude Opus 4.7 (1M context) --- README.md | 1 + server/config.js | 4 + server/lib/command-queue.js | 153 +++++++++++++++++++++++++++++++++ server/routes/content.js | 4 +- server/routes/device-groups.js | 4 +- server/routes/playlists.js | 7 +- server/routes/video-walls.js | 3 +- server/server.js | 4 + server/services/scheduler.js | 4 +- server/ws/deviceSocket.js | 9 +- 10 files changed, 183 insertions(+), 10 deletions(-) create mode 100644 server/lib/command-queue.js diff --git a/README.md b/README.md index b7e49bc..42b4fb7 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,7 @@ Schema migrations run automatically on first boot — no manual migration comman | `PING_TIMEOUT` | Socket.IO Engine.IO pong wait (ms). Lower = faster dead-socket detection; higher = more forgiving of laggy clients. | `30000` | | `HEARTBEAT_INTERVAL` | App-level offline-checker frequency (ms). How often the server sweeps the device list looking for stale heartbeats. | `10000` | | `HEARTBEAT_TIMEOUT` | How long without an app-level heartbeat (ms) before marking a device offline. Raise for slow/jittery networks. | `45000` | +| `COMMAND_QUEUE_TTL_MS` | How long the server holds commands and playlist-updates for a device that's offline at emit time (ms). Flushed in order on reconnect within this window; dropped past TTL. | `30000` | ### Optional Integrations diff --git a/server/config.js b/server/config.js index 405698b..02b5d82 100644 --- a/server/config.js +++ b/server/config.js @@ -14,6 +14,10 @@ module.exports = { // reporter found raising HEARTBEAT_TIMEOUT to 60s reduced false offlines). heartbeatInterval: parseInt(process.env.HEARTBEAT_INTERVAL) || 10000, heartbeatTimeout: parseInt(process.env.HEARTBEAT_TIMEOUT) || 45000, + // How long the server holds commands/playlist-updates for a device that's + // offline at emit time (ms). On reconnect within this window, queued events + // are flushed in order. Past TTL they're dropped. See lib/command-queue.js. + commandQueueTtlMs: parseInt(process.env.COMMAND_QUEUE_TTL_MS) || 30000, // Engine.IO transport-level ping/pong. Raised from Socket.IO defaults // (25000/20000) because TV WebKits (LG webOS, older Tizen) miss pongs // under decode load - tighter values cause spurious transport drops. diff --git a/server/lib/command-queue.js b/server/lib/command-queue.js new file mode 100644 index 0000000..59d82a5 --- /dev/null +++ b/server/lib/command-queue.js @@ -0,0 +1,153 @@ +// Short-lived per-device queue for events that target a currently-offline +// device. Designed for the TV-flap case where a device disconnects for a few +// seconds (Engine.IO ping miss, Wi-Fi blip, decode stall) and reconnects via +// Socket.IO's auto-reconnect. Without this queue, any device:command or +// device:playlist-update emitted during the disconnect window goes nowhere - +// the room is empty, the emit is silently dropped. +// +// Two structures, both keyed by device_id, both pruned by TTL: +// +// pendingPlaylistUpdate: Map +// We don't store the payload. On flush we rebuild via buildPlaylistPayload +// so the device gets the LATEST DB state, not a stale snapshot from when +// the update was first queued. +// +// pendingCommands: Map> +// One entry per command type per device. Last-of-type wins (the most +// recent screen_off supersedes any earlier ones). Payloads stored verbatim +// because commands are stateless declarations. +// +// Memory bounds: worst-case ~6 entries per device (1 playlist marker + 5 +// command types), each ~200 bytes. 10,000 offline devices = ~12MB. Sweep +// thread prunes empty per-device records every 30s. + +const config = require('../config'); + +const pendingPlaylistUpdate = new Map(); +const pendingCommands = new Map(); + +let _sweepTimer = null; + +// Internal helper - drop expired entries for a single device. Called lazily +// from queue/flush paths AND from the sweep thread. +function pruneDevice(deviceId) { + const now = Date.now(); + const pu = pendingPlaylistUpdate.get(deviceId); + if (pu && pu.expiresAt <= now) pendingPlaylistUpdate.delete(deviceId); + + const cmds = pendingCommands.get(deviceId); + if (cmds) { + for (const [type, entry] of cmds) { + if (entry.expiresAt <= now) cmds.delete(type); + } + if (cmds.size === 0) pendingCommands.delete(deviceId); + } +} + +// Mark a pending playlist-update for a device. Caller used to call +// deviceNs.to(deviceId).emit('device:playlist-update', buildPlaylistPayload(deviceId)); +// directly. Now they call queueOrEmitPlaylistUpdate which checks room presence +// first and queues only if the device is offline. +function queueOrEmitPlaylistUpdate(deviceNs, deviceId, buildPayload) { + if (!deviceNs || !deviceId || typeof buildPayload !== 'function') return { delivered: false }; + const room = deviceNs.adapter.rooms.get(deviceId); + if (room && room.size > 0) { + deviceNs.to(deviceId).emit('device:playlist-update', buildPayload(deviceId)); + return { delivered: true }; + } + pendingPlaylistUpdate.set(deviceId, { expiresAt: Date.now() + config.commandQueueTtlMs }); + return { delivered: false, queued: true }; +} + +// Queue a single command for an offline device. Returns true if accepted +// (always true under current logic; reserved for future "rejected because +// stale/full" cases). Used by item 6 in commit D - dashboard command handler +// calls this when the device room is empty. +function queueCommand(deviceId, type, payload) { + if (!deviceId || !type) return false; + let perDevice = pendingCommands.get(deviceId); + if (!perDevice) { + perDevice = new Map(); + pendingCommands.set(deviceId, perDevice); + } + perDevice.set(type, { payload: payload || {}, expiresAt: Date.now() + config.commandQueueTtlMs }); + return true; +} + +// Called on device:register success, after heartbeat.registerConnection and +// socket.join. Drains both queues to the just-reconnected device. +// +// buildPayload is the buildPlaylistPayload function from deviceSocket.js, +// passed in to avoid a circular require. We call it at flush time so the +// playlist reflects current DB state, not whatever it was when queued. +function flushQueue(deviceNs, deviceId, buildPayload) { + if (!deviceNs || !deviceId) return { playlistUpdate: false, commands: 0 }; + pruneDevice(deviceId); + + let playlistUpdate = false; + let commands = 0; + + const pu = pendingPlaylistUpdate.get(deviceId); + if (pu) { + pendingPlaylistUpdate.delete(deviceId); + if (typeof buildPayload === 'function') { + deviceNs.to(deviceId).emit('device:playlist-update', buildPayload(deviceId)); + playlistUpdate = true; + } + } + + const cmds = pendingCommands.get(deviceId); + if (cmds) { + pendingCommands.delete(deviceId); + for (const [type, entry] of cmds) { + deviceNs.to(deviceId).emit('device:command', { type, payload: entry.payload }); + commands++; + } + } + + if (playlistUpdate || commands > 0) { + console.log(`Flushed queue for ${deviceId}: playlistUpdate=${playlistUpdate}, commands=${commands}`); + } + return { playlistUpdate, commands }; +} + +function getQueueDepth(deviceId) { + pruneDevice(deviceId); + const hasPlaylist = pendingPlaylistUpdate.has(deviceId) ? 1 : 0; + const cmdCount = pendingCommands.get(deviceId)?.size || 0; + return hasPlaylist + cmdCount; +} + +// Active sweep prunes devices that never come back. Without this, a device +// that goes permanently offline leaves its queue entries in memory until TTL, +// which is fine, but the Map keys themselves linger. Cheap to walk. +function startSweep() { + if (_sweepTimer) return; + _sweepTimer = setInterval(() => { + for (const deviceId of pendingPlaylistUpdate.keys()) pruneDevice(deviceId); + for (const deviceId of pendingCommands.keys()) pruneDevice(deviceId); + }, 30000); + if (_sweepTimer.unref) _sweepTimer.unref(); +} + +function stopSweep() { + if (_sweepTimer) { clearInterval(_sweepTimer); _sweepTimer = null; } +} + +// Test helpers - reset internal state. Not exported via module.exports for +// production callers; bound below for the test harness only. +function _resetForTests() { + pendingPlaylistUpdate.clear(); + pendingCommands.clear(); + stopSweep(); +} + +module.exports = { + queueOrEmitPlaylistUpdate, + queueCommand, + flushQueue, + getQueueDepth, + startSweep, + stopSweep, + _resetForTests, +}; diff --git a/server/routes/content.js b/server/routes/content.js index 3c0f07e..affe22e 100644 --- a/server/routes/content.js +++ b/server/routes/content.js @@ -432,8 +432,10 @@ router.delete('/:id', (req, res) => { const io = req.app.get('io'); if (io) { const { buildPlaylistPayload } = require('../ws/deviceSocket'); + const commandQueue = require('../lib/command-queue'); + const deviceNs = io.of('/device'); for (const d of affectedDevices) { - io.of('/device').to(d.device_id).emit('device:playlist-update', buildPlaylistPayload(d.device_id)); + commandQueue.queueOrEmitPlaylistUpdate(deviceNs, d.device_id, buildPlaylistPayload); } } } catch (e) { /* silent */ } diff --git a/server/routes/device-groups.js b/server/routes/device-groups.js index 7b210f5..5893d7c 100644 --- a/server/routes/device-groups.js +++ b/server/routes/device-groups.js @@ -217,8 +217,8 @@ function pushPlaylistToDevice(req, deviceId) { const io = req.app.get('io'); if (!io) return; const { buildPlaylistPayload } = require('../ws/deviceSocket'); - const deviceNs = io.of('/device'); - deviceNs.to(deviceId).emit('device:playlist-update', buildPlaylistPayload(deviceId)); + const commandQueue = require('../lib/command-queue'); + commandQueue.queueOrEmitPlaylistUpdate(io.of('/device'), deviceId, buildPlaylistPayload); } catch (e) { /* silent */ } } diff --git a/server/routes/playlists.js b/server/routes/playlists.js index 8542450..834ff74 100644 --- a/server/routes/playlists.js +++ b/server/routes/playlists.js @@ -89,9 +89,11 @@ function pushToDevices(playlistId, req) { const io = req.app.get('io'); if (!io) return; const { buildPlaylistPayload } = require('../ws/deviceSocket'); + const commandQueue = require('../lib/command-queue'); + const deviceNs = io.of('/device'); const devices = db.prepare('SELECT id FROM devices WHERE playlist_id = ?').all(playlistId); for (const d of devices) { - io.of('/device').to(d.id).emit('device:playlist-update', buildPlaylistPayload(d.id)); + commandQueue.queueOrEmitPlaylistUpdate(deviceNs, d.id, buildPlaylistPayload); } } catch (e) { /* silent */ } } @@ -449,7 +451,8 @@ router.post('/:id/assign', requirePlaylistWrite, (req, res) => { const io = req.app.get('io'); if (io) { const { buildPlaylistPayload } = require('../ws/deviceSocket'); - io.of('/device').to(device_id).emit('device:playlist-update', buildPlaylistPayload(device_id)); + const commandQueue = require('../lib/command-queue'); + commandQueue.queueOrEmitPlaylistUpdate(io.of('/device'), device_id, buildPlaylistPayload); } } catch (e) { /* silent */ } diff --git a/server/routes/video-walls.js b/server/routes/video-walls.js index d711bd4..55041ac 100644 --- a/server/routes/video-walls.js +++ b/server/routes/video-walls.js @@ -81,7 +81,8 @@ function pushWallPayloadToDevice(req, deviceId) { const io = req.app.get('io'); if (!io) return; const { buildPlaylistPayload } = require('../ws/deviceSocket'); - io.of('/device').to(deviceId).emit('device:playlist-update', buildPlaylistPayload(deviceId)); + const commandQueue = require('../lib/command-queue'); + commandQueue.queueOrEmitPlaylistUpdate(io.of('/device'), deviceId, buildPlaylistPayload); } catch (e) { /* silent */ } } diff --git a/server/server.js b/server/server.js index bc52464..cc60142 100644 --- a/server/server.js +++ b/server/server.js @@ -436,6 +436,10 @@ app.set('io', io); const { startHeartbeatChecker } = require('./services/heartbeat'); startHeartbeatChecker(io); +// Start command-queue sweep (prunes expired entries for offline devices) +const commandQueue = require('./lib/command-queue'); +commandQueue.startSweep(); + // Start scheduler const { startScheduler } = require('./services/scheduler'); startScheduler(io); diff --git a/server/services/scheduler.js b/server/services/scheduler.js index 085ef26..268172f 100644 --- a/server/services/scheduler.js +++ b/server/services/scheduler.js @@ -106,8 +106,8 @@ function parseSimpleRRule(rrule) { function pushPlaylistToDevice(deviceId, deviceNs) { // Use the single-source buildPlaylistPayload from deviceSocket const { buildPlaylistPayload } = require('../ws/deviceSocket'); - const payload = buildPlaylistPayload(deviceId); - deviceNs.to(deviceId).emit('device:playlist-update', payload); + const commandQueue = require('../lib/command-queue'); + commandQueue.queueOrEmitPlaylistUpdate(deviceNs, deviceId, buildPlaylistPayload); } module.exports = { startScheduler, pushPlaylistToDevice }; diff --git a/server/ws/deviceSocket.js b/server/ws/deviceSocket.js index 089f609..a7fceeb 100644 --- a/server/ws/deviceSocket.js +++ b/server/ws/deviceSocket.js @@ -5,6 +5,7 @@ const fs = require('fs'); const { db, pruneTelemetry, pruneScreenshots } = require('../db/database'); const config = require('../config'); const heartbeat = require('../services/heartbeat'); +const commandQueue = require('../lib/command-queue'); const { getUserPlan, getUserDeviceCount } = require('../middleware/subscription'); // Phase 2.3: deviceRoom() resolves a device_id to its workspace room so // dashboardNs.emit can be scoped instead of broadcast platform-wide. @@ -255,6 +256,8 @@ module.exports = function setupDeviceSocket(io) { socket.join(existing.device_id); logDeviceStatus(existing.device_id, 'online'); emitToDeviceWorkspace(dashboardNs, existing.device_id, 'dashboard:device-status', { device_id: existing.device_id, status: 'online' }); + // Flush any commands/playlist-updates queued while this device was offline. + commandQueue.flushQueue(deviceNs, existing.device_id, buildPlaylistPayload); // Send playlist const access = checkDeviceAccess(existing.device_id); if (!access.allowed) { @@ -307,6 +310,8 @@ module.exports = function setupDeviceSocket(io) { socket.join(device_id); socket.emit('device:registered', { device_id, device_token: tokenToSend, status: 'online' }); logDeviceStatus(device_id, 'online'); + // Flush any commands/playlist-updates queued while this device was offline. + commandQueue.flushQueue(deviceNs, device_id, buildPlaylistPayload); // If this device is part of a wall, re-evaluate leadership. // Preferred leader = online member with smallest (canvas_x + @@ -333,7 +338,7 @@ module.exports = function setupDeviceSocket(io) { const members = db.prepare('SELECT device_id FROM video_wall_devices WHERE wall_id = ?').all(wall.id); for (const m of members) { if (m.device_id !== device_id) { - deviceNs.to(m.device_id).emit('device:playlist-update', buildPlaylistPayload(m.device_id)); + commandQueue.queueOrEmitPlaylistUpdate(deviceNs, m.device_id, buildPlaylistPayload); } } } @@ -595,7 +600,7 @@ module.exports = function setupDeviceSocket(io) { const members = db.prepare('SELECT device_id FROM video_wall_devices WHERE wall_id = ?').all(wall.id); for (const m of members) { if (m.device_id !== currentDeviceId) { - deviceNs.to(m.device_id).emit('device:playlist-update', buildPlaylistPayload(m.device_id)); + commandQueue.queueOrEmitPlaylistUpdate(deviceNs, m.device_id, buildPlaylistPayload); } } }