diff --git a/server/db/database.js b/server/db/database.js index 2b247e4..5845aca 100644 --- a/server/db/database.js +++ b/server/db/database.js @@ -197,6 +197,9 @@ const migrations = [ "CREATE TABLE IF NOT EXISTS api_token_targets (token_id TEXT NOT NULL REFERENCES api_tokens(id) ON DELETE CASCADE, playlist_id TEXT NOT NULL REFERENCES playlists(id) ON DELETE CASCADE, created_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), PRIMARY KEY (token_id, playlist_id))", // #73: per-agency-token auto-publish (DEFAULT 0 = draft, the fail-safe). "ALTER TABLE api_tokens ADD COLUMN auto_publish INTEGER NOT NULL DEFAULT 0", + // #73: agency-upload notification queue (batched digest). + "CREATE TABLE IF NOT EXISTS agency_notifications (id INTEGER PRIMARY KEY AUTOINCREMENT, workspace_id TEXT NOT NULL, token_id TEXT NOT NULL, playlist_id TEXT NOT NULL, action TEXT NOT NULL, content_id TEXT, created_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), sent_at INTEGER)", + "CREATE INDEX IF NOT EXISTS idx_agency_notifications_unsent ON agency_notifications(sent_at)", ]; // Apply each ALTER idempotently. A "duplicate column name" / "already exists" // error means the column is already present (expected on a migrated DB) - benign. diff --git a/server/db/schema.sql b/server/db/schema.sql index b67a107..542984f 100644 --- a/server/db/schema.sql +++ b/server/db/schema.sql @@ -551,6 +551,21 @@ CREATE TABLE IF NOT EXISTS api_token_targets ( ); CREATE INDEX IF NOT EXISTS idx_api_tokens_user ON api_tokens(user_id); +-- #73: agency-upload notification queue. The agency endpoint enqueues one row per item added +-- (only when email is configured); a 15-min flush job groups per token+playlist+action and +-- sends one digest per group, stamping sent_at ONLY after a successful send (failed -> retry). +CREATE TABLE IF NOT EXISTS agency_notifications ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + workspace_id TEXT NOT NULL, + token_id TEXT NOT NULL, + playlist_id TEXT NOT NULL, + action TEXT NOT NULL, -- 'draft' | 'published' + content_id TEXT, + created_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), + sent_at INTEGER -- NULL = unsent +); +CREATE INDEX IF NOT EXISTS idx_agency_notifications_unsent ON agency_notifications(sent_at); + -- ===================== SCHEMA MIGRATIONS ===================== CREATE TABLE IF NOT EXISTS schema_migrations ( diff --git a/server/routes/agency.js b/server/routes/agency.js index 5ec26ec..8db6637 100644 --- a/server/routes/agency.js +++ b/server/routes/agency.js @@ -16,6 +16,7 @@ const { ingestUploadedFile } = require('../lib/content-ingest'); const { listDesignatedPlaylists } = require('../lib/agency-targets'); const { listLayoutGeometry } = require('../lib/agency-layouts'); const { publishPlaylist } = require('./playlists'); // #73: shared publish path for auto-publish +const { isConfigured } = require('../services/email'); // #73: gate digest enqueue on SMTP being set const TIME_RE = /^([01]\d|2[0-3]):[0-5]\d$/; const DATE_RE = /^\d{4}-\d{2}-\d{2}$/; @@ -108,6 +109,13 @@ router.post('/playlists/:playlistId/items', (req, res) => { db.prepare("UPDATE playlists SET status = 'draft', updated_at = strftime('%s','now') WHERE id = ?").run(req.params.playlistId); } + // #73: enqueue a digest notification ONLY when email is configured, so the queue can't + // balloon on installs without SMTP. action reflects what actually happened (draft vs live). + if (isConfigured()) { + db.prepare('INSERT INTO agency_notifications (workspace_id, token_id, playlist_id, action, content_id) VALUES (?,?,?,?,?)') + .run(req.workspaceId, req.apiToken.id, req.params.playlistId, published ? 'published' : 'draft', content_id); + } + res.status(201).json({ id: itemId, playlist_id: req.params.playlistId, content_id, duration_sec, start_date: sd, end_date: ed, published }); }); diff --git a/server/server.js b/server/server.js index b35645e..efaa1be 100644 --- a/server/server.js +++ b/server/server.js @@ -594,6 +594,10 @@ startAlertService(io); const { startActivationNudge } = require('./services/activationNudge'); startActivationNudge(); +// #73: agency-upload digest flush (batched draft/published notifications to admins + owner) +const { startAgencyDigest } = require('./services/agency-digest'); +startAgencyDigest(); + // Handle provisioning via WebSocket notification const { db } = require('./db/database'); const originalProvisionRoute = require('./routes/provisioning'); diff --git a/server/services/agency-digest.js b/server/services/agency-digest.js new file mode 100644 index 0000000..7f5658c --- /dev/null +++ b/server/services/agency-digest.js @@ -0,0 +1,84 @@ +'use strict'; + +// #73: batched digest of agency uploads. The agency endpoint enqueues a row per item added +// (ONLY when email is configured). This job flushes every 15 min: groups unsent rows per +// token+playlist+action, sends one email per group to the workspace owner/admins + the +// playlist owner (deduped), and stamps sent_at ONLY after a successful send. Two robustness +// rules: (1) never let the queue balloon when SMTP is off; (2) a failed send retries next +// cycle instead of silently dropping. + +const { db: defaultDb } = require('../db/database'); +const defaultEmail = require('./email'); + +const FLUSH_MS = 15 * 60 * 1000; // the digest window + +// Workspace owner/admins (via the org) + the playlist owner. UNION dedupes by email. +function resolveRecipients(db, workspaceId, playlistId) { + return db.prepare(` + SELECT u.email FROM organization_members om + JOIN workspaces w ON w.organization_id = om.organization_id + JOIN users u ON u.id = om.user_id + WHERE w.id = ? AND om.role IN ('org_owner', 'org_admin') AND u.email IS NOT NULL + UNION + SELECT u.email FROM playlists p + JOIN users u ON u.id = p.user_id + WHERE p.id = ? AND u.email IS NOT NULL + `).all(workspaceId, playlistId); +} + +function composeDigest(db, g) { + const agency = db.prepare('SELECT name FROM api_tokens WHERE id = ?').get(g.token_id)?.name || 'An agency'; + const playlist = db.prepare('SELECT name FROM playlists WHERE id = ?').get(g.playlist_id)?.name || 'a playlist'; + const n = g.n; + if (g.action === 'draft') { + return { + subject: `${agency} added ${n} item${n === 1 ? '' : 's'} to "${playlist}" — awaiting your approval`, + text: `${agency} added ${n} item${n === 1 ? '' : 's'} to the playlist "${playlist}".\n\nThey are saved as drafts and will NOT appear on screens until you publish the playlist.`, + }; + } + return { + subject: `${agency} updated "${playlist}"`, + text: `${agency} added ${n} item${n === 1 ? '' : 's'} to the playlist "${playlist}", now live (this token is set to auto-publish).`, + }; +} + +// Core flush - testable: pass a db and an email impl ({ isConfigured, sendEmail }). +async function flushAgencyDigests(db = defaultDb, email = defaultEmail) { + if (!email.isConfigured()) { + // SMTP off -> drain-and-discard so the queue can't grow unbounded on self-hosters + // who never set up email. (The endpoint also skips enqueue when off; this is the backstop.) + db.prepare('DELETE FROM agency_notifications WHERE sent_at IS NULL').run(); + return; + } + const groups = db.prepare(` + SELECT workspace_id, token_id, playlist_id, action, COUNT(*) AS n, GROUP_CONCAT(id) AS ids + FROM agency_notifications WHERE sent_at IS NULL + GROUP BY token_id, playlist_id, action + `).all(); + + for (const g of groups) { + try { + const recipients = resolveRecipients(db, g.workspace_id, g.playlist_id); + if (recipients.length) { + const { subject, text } = composeDigest(db, g); + for (const r of recipients) { + await email.sendEmail({ to: r.email, subject, text }); // throw -> caught below -> NOT stamped -> retried + } + } + // Stamp sent_at ONLY after every send for this group succeeded (or there were no + // recipients). A throw above skips this -> the rows stay unsent for the next cycle. + const now = Math.floor(Date.now() / 1000); + const stamp = db.prepare('UPDATE agency_notifications SET sent_at = ? WHERE id = ?'); + db.transaction(() => { for (const id of g.ids.split(',')) stamp.run(now, id); })(); + } catch (e) { + console.warn('agency digest: send failed, will retry next cycle:', e.message); + } + } +} + +function startAgencyDigest() { + setInterval(() => { flushAgencyDigests().catch(() => {}); }, FLUSH_MS); + console.log('Agency digest service started'); +} + +module.exports = { startAgencyDigest, flushAgencyDigests, resolveRecipients, composeDigest }; diff --git a/server/test/agency-digest.test.js b/server/test/agency-digest.test.js new file mode 100644 index 0000000..fc05734 --- /dev/null +++ b/server/test/agency-digest.test.js @@ -0,0 +1,75 @@ +'use strict'; + +// #73 email digest robustness. Proves the two rules the design hinges on: (1) the queue +// never balloons when SMTP is off (drain-and-discard); (2) sent_at is stamped ONLY after a +// successful send, so a failure retries next cycle instead of silently dropping. Plus +// recipient resolution (org owner/admins + playlist owner, deduped) and digest grouping. + +const { test } = require('node:test'); +const assert = require('node:assert/strict'); +const Database = require('better-sqlite3'); +const { flushAgencyDigests, resolveRecipients } = require('../services/agency-digest'); + +function freshDb() { + const db = new Database(':memory:'); + db.exec(` + CREATE TABLE agency_notifications (id INTEGER PRIMARY KEY AUTOINCREMENT, workspace_id TEXT, token_id TEXT, playlist_id TEXT, action TEXT, content_id TEXT, created_at INTEGER, sent_at INTEGER); + CREATE TABLE organization_members (organization_id TEXT, user_id TEXT, role TEXT); + CREATE TABLE workspaces (id TEXT, organization_id TEXT); + CREATE TABLE users (id TEXT, email TEXT); + CREATE TABLE playlists (id TEXT, user_id TEXT, name TEXT); + CREATE TABLE api_tokens (id TEXT, name TEXT); + INSERT INTO workspaces VALUES ('ws1','org1'); + INSERT INTO users VALUES ('uOwner','owner@x'), ('uAdmin','admin@x'), ('uViewer','viewer@x'), ('uPlOwner','plowner@x'); + INSERT INTO organization_members VALUES ('org1','uOwner','org_owner'), ('org1','uAdmin','org_admin'), ('org1','uViewer','member'); + INSERT INTO playlists VALUES ('pl1','uPlOwner','Lobby'); + INSERT INTO api_tokens VALUES ('tok1','Acme Agency'); + `); + return db; +} +function enqueue(db, n, action = 'draft') { + const ins = db.prepare("INSERT INTO agency_notifications (workspace_id, token_id, playlist_id, action) VALUES ('ws1','tok1','pl1',?)"); + for (let i = 0; i < n; i++) ins.run(action); +} +const cfg = (sendEmail) => ({ isConfigured: () => true, sendEmail }); +const sink = () => { const sent = []; return { sent, sendEmail: async (m) => { sent.push(m); } }; }; + +test('#73 digest recipients: org owner + admins + playlist owner, deduped (NOT the viewer)', () => { + const emails = resolveRecipients(freshDb(), 'ws1', 'pl1').map(r => r.email).sort(); + assert.deepEqual(emails, ['admin@x', 'owner@x', 'plowner@x']); +}); + +test('#73 digest: 30 uploads -> ONE email per recipient (not 30), all rows stamped sent', async () => { + const db = freshDb(); + enqueue(db, 30, 'draft'); + const { sent, sendEmail } = sink(); + await flushAgencyDigests(db, cfg(sendEmail)); + assert.equal(sent.length, 3, '1 group x 3 recipients = 3 emails, not 30 per recipient'); + assert.match(sent[0].subject, /Acme Agency added 30 items to "Lobby"/); + assert.equal(db.prepare('SELECT COUNT(*) c FROM agency_notifications WHERE sent_at IS NULL').get().c, 0); +}); + +test('#73 digest: a failed send leaves rows UNSENT for retry (never silently dropped)', async () => { + const db = freshDb(); + enqueue(db, 5, 'draft'); + await flushAgencyDigests(db, cfg(async () => { throw new Error('smtp down'); })); + assert.equal(db.prepare('SELECT COUNT(*) c FROM agency_notifications WHERE sent_at IS NULL').get().c, 5, 'still unsent -> retried next cycle'); +}); + +test('#73 digest: SMTP off -> queue drained-and-discarded (never balloons)', async () => { + const db = freshDb(); + enqueue(db, 10, 'draft'); + await flushAgencyDigests(db, { isConfigured: () => false, sendEmail: async () => { throw new Error('must not send'); } }); + assert.equal(db.prepare('SELECT COUNT(*) c FROM agency_notifications').get().c, 0, 'drained when email is off'); +}); + +test('#73 digest: draft vs published produce different subjects, grouped per action', async () => { + const db = freshDb(); + enqueue(db, 2, 'draft'); + enqueue(db, 3, 'published'); + const { sent, sendEmail } = sink(); + await flushAgencyDigests(db, cfg(sendEmail)); + const subjects = sent.map(s => s.subject); + assert.ok(subjects.some(s => /awaiting your approval/.test(s)), 'draft digest mentions approval'); + assert.ok(subjects.some(s => /updated "Lobby"/.test(s)), 'published digest says updated'); +});