mirror of
https://github.com/screentinker/screentinker.git
synced 2026-06-17 03:32:32 -06:00
feat(api): batched email digest for agency uploads (#73)
Reuses the existing scheduler + sendEmail infra (no new scheduler). The agency endpoint enqueues one agency_notifications row per item added; a 15-min flush groups unsent rows per token+playlist+action and sends ONE digest per group to the workspace owner/admins + the playlist owner (deduped via UNION). Draft -> "added N items, awaiting approval"; published -> "updated <playlist>". Two robustness rules, both tested: - Queue never balloons when SMTP is off: the endpoint skips enqueue when !isConfigured(), and the flush drains-and-discards unsent rows as a backstop. - sent_at is stamped ONLY after a successful send, so a failed send retries next cycle instead of silently dropping. Wired into boot via startAgencyDigest(). 147 suite green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
986d94a778
commit
c55ca60b56
|
|
@ -197,6 +197,9 @@ const migrations = [
|
||||||
"CREATE TABLE IF NOT EXISTS api_token_targets (token_id TEXT NOT NULL REFERENCES api_tokens(id) ON DELETE CASCADE, playlist_id TEXT NOT NULL REFERENCES playlists(id) ON DELETE CASCADE, created_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), PRIMARY KEY (token_id, playlist_id))",
|
"CREATE TABLE IF NOT EXISTS api_token_targets (token_id TEXT NOT NULL REFERENCES api_tokens(id) ON DELETE CASCADE, playlist_id TEXT NOT NULL REFERENCES playlists(id) ON DELETE CASCADE, created_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), PRIMARY KEY (token_id, playlist_id))",
|
||||||
// #73: per-agency-token auto-publish (DEFAULT 0 = draft, the fail-safe).
|
// #73: per-agency-token auto-publish (DEFAULT 0 = draft, the fail-safe).
|
||||||
"ALTER TABLE api_tokens ADD COLUMN auto_publish INTEGER NOT NULL DEFAULT 0",
|
"ALTER TABLE api_tokens ADD COLUMN auto_publish INTEGER NOT NULL DEFAULT 0",
|
||||||
|
// #73: agency-upload notification queue (batched digest).
|
||||||
|
"CREATE TABLE IF NOT EXISTS agency_notifications (id INTEGER PRIMARY KEY AUTOINCREMENT, workspace_id TEXT NOT NULL, token_id TEXT NOT NULL, playlist_id TEXT NOT NULL, action TEXT NOT NULL, content_id TEXT, created_at INTEGER NOT NULL DEFAULT (strftime('%s','now')), sent_at INTEGER)",
|
||||||
|
"CREATE INDEX IF NOT EXISTS idx_agency_notifications_unsent ON agency_notifications(sent_at)",
|
||||||
];
|
];
|
||||||
// Apply each ALTER idempotently. A "duplicate column name" / "already exists"
|
// Apply each ALTER idempotently. A "duplicate column name" / "already exists"
|
||||||
// error means the column is already present (expected on a migrated DB) - benign.
|
// error means the column is already present (expected on a migrated DB) - benign.
|
||||||
|
|
|
||||||
|
|
@ -551,6 +551,21 @@ CREATE TABLE IF NOT EXISTS api_token_targets (
|
||||||
);
|
);
|
||||||
CREATE INDEX IF NOT EXISTS idx_api_tokens_user ON api_tokens(user_id);
|
CREATE INDEX IF NOT EXISTS idx_api_tokens_user ON api_tokens(user_id);
|
||||||
|
|
||||||
|
-- #73: agency-upload notification queue. The agency endpoint enqueues one row per item added
|
||||||
|
-- (only when email is configured); a 15-min flush job groups per token+playlist+action and
|
||||||
|
-- sends one digest per group, stamping sent_at ONLY after a successful send (failed -> retry).
|
||||||
|
CREATE TABLE IF NOT EXISTS agency_notifications (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
workspace_id TEXT NOT NULL,
|
||||||
|
token_id TEXT NOT NULL,
|
||||||
|
playlist_id TEXT NOT NULL,
|
||||||
|
action TEXT NOT NULL, -- 'draft' | 'published'
|
||||||
|
content_id TEXT,
|
||||||
|
created_at INTEGER NOT NULL DEFAULT (strftime('%s','now')),
|
||||||
|
sent_at INTEGER -- NULL = unsent
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_agency_notifications_unsent ON agency_notifications(sent_at);
|
||||||
|
|
||||||
-- ===================== SCHEMA MIGRATIONS =====================
|
-- ===================== SCHEMA MIGRATIONS =====================
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS schema_migrations (
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ const { ingestUploadedFile } = require('../lib/content-ingest');
|
||||||
const { listDesignatedPlaylists } = require('../lib/agency-targets');
|
const { listDesignatedPlaylists } = require('../lib/agency-targets');
|
||||||
const { listLayoutGeometry } = require('../lib/agency-layouts');
|
const { listLayoutGeometry } = require('../lib/agency-layouts');
|
||||||
const { publishPlaylist } = require('./playlists'); // #73: shared publish path for auto-publish
|
const { publishPlaylist } = require('./playlists'); // #73: shared publish path for auto-publish
|
||||||
|
const { isConfigured } = require('../services/email'); // #73: gate digest enqueue on SMTP being set
|
||||||
|
|
||||||
const TIME_RE = /^([01]\d|2[0-3]):[0-5]\d$/;
|
const TIME_RE = /^([01]\d|2[0-3]):[0-5]\d$/;
|
||||||
const DATE_RE = /^\d{4}-\d{2}-\d{2}$/;
|
const DATE_RE = /^\d{4}-\d{2}-\d{2}$/;
|
||||||
|
|
@ -108,6 +109,13 @@ router.post('/playlists/:playlistId/items', (req, res) => {
|
||||||
db.prepare("UPDATE playlists SET status = 'draft', updated_at = strftime('%s','now') WHERE id = ?").run(req.params.playlistId);
|
db.prepare("UPDATE playlists SET status = 'draft', updated_at = strftime('%s','now') WHERE id = ?").run(req.params.playlistId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// #73: enqueue a digest notification ONLY when email is configured, so the queue can't
|
||||||
|
// balloon on installs without SMTP. action reflects what actually happened (draft vs live).
|
||||||
|
if (isConfigured()) {
|
||||||
|
db.prepare('INSERT INTO agency_notifications (workspace_id, token_id, playlist_id, action, content_id) VALUES (?,?,?,?,?)')
|
||||||
|
.run(req.workspaceId, req.apiToken.id, req.params.playlistId, published ? 'published' : 'draft', content_id);
|
||||||
|
}
|
||||||
|
|
||||||
res.status(201).json({ id: itemId, playlist_id: req.params.playlistId, content_id, duration_sec, start_date: sd, end_date: ed, published });
|
res.status(201).json({ id: itemId, playlist_id: req.params.playlistId, content_id, duration_sec, start_date: sd, end_date: ed, published });
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -594,6 +594,10 @@ startAlertService(io);
|
||||||
const { startActivationNudge } = require('./services/activationNudge');
|
const { startActivationNudge } = require('./services/activationNudge');
|
||||||
startActivationNudge();
|
startActivationNudge();
|
||||||
|
|
||||||
|
// #73: agency-upload digest flush (batched draft/published notifications to admins + owner)
|
||||||
|
const { startAgencyDigest } = require('./services/agency-digest');
|
||||||
|
startAgencyDigest();
|
||||||
|
|
||||||
// Handle provisioning via WebSocket notification
|
// Handle provisioning via WebSocket notification
|
||||||
const { db } = require('./db/database');
|
const { db } = require('./db/database');
|
||||||
const originalProvisionRoute = require('./routes/provisioning');
|
const originalProvisionRoute = require('./routes/provisioning');
|
||||||
|
|
|
||||||
84
server/services/agency-digest.js
Normal file
84
server/services/agency-digest.js
Normal file
|
|
@ -0,0 +1,84 @@
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
// #73: batched digest of agency uploads. The agency endpoint enqueues a row per item added
|
||||||
|
// (ONLY when email is configured). This job flushes every 15 min: groups unsent rows per
|
||||||
|
// token+playlist+action, sends one email per group to the workspace owner/admins + the
|
||||||
|
// playlist owner (deduped), and stamps sent_at ONLY after a successful send. Two robustness
|
||||||
|
// rules: (1) never let the queue balloon when SMTP is off; (2) a failed send retries next
|
||||||
|
// cycle instead of silently dropping.
|
||||||
|
|
||||||
|
const { db: defaultDb } = require('../db/database');
|
||||||
|
const defaultEmail = require('./email');
|
||||||
|
|
||||||
|
const FLUSH_MS = 15 * 60 * 1000; // the digest window
|
||||||
|
|
||||||
|
// Workspace owner/admins (via the org) + the playlist owner. UNION dedupes by email.
|
||||||
|
function resolveRecipients(db, workspaceId, playlistId) {
|
||||||
|
return db.prepare(`
|
||||||
|
SELECT u.email FROM organization_members om
|
||||||
|
JOIN workspaces w ON w.organization_id = om.organization_id
|
||||||
|
JOIN users u ON u.id = om.user_id
|
||||||
|
WHERE w.id = ? AND om.role IN ('org_owner', 'org_admin') AND u.email IS NOT NULL
|
||||||
|
UNION
|
||||||
|
SELECT u.email FROM playlists p
|
||||||
|
JOIN users u ON u.id = p.user_id
|
||||||
|
WHERE p.id = ? AND u.email IS NOT NULL
|
||||||
|
`).all(workspaceId, playlistId);
|
||||||
|
}
|
||||||
|
|
||||||
|
function composeDigest(db, g) {
|
||||||
|
const agency = db.prepare('SELECT name FROM api_tokens WHERE id = ?').get(g.token_id)?.name || 'An agency';
|
||||||
|
const playlist = db.prepare('SELECT name FROM playlists WHERE id = ?').get(g.playlist_id)?.name || 'a playlist';
|
||||||
|
const n = g.n;
|
||||||
|
if (g.action === 'draft') {
|
||||||
|
return {
|
||||||
|
subject: `${agency} added ${n} item${n === 1 ? '' : 's'} to "${playlist}" — awaiting your approval`,
|
||||||
|
text: `${agency} added ${n} item${n === 1 ? '' : 's'} to the playlist "${playlist}".\n\nThey are saved as drafts and will NOT appear on screens until you publish the playlist.`,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
subject: `${agency} updated "${playlist}"`,
|
||||||
|
text: `${agency} added ${n} item${n === 1 ? '' : 's'} to the playlist "${playlist}", now live (this token is set to auto-publish).`,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Core flush - testable: pass a db and an email impl ({ isConfigured, sendEmail }).
|
||||||
|
async function flushAgencyDigests(db = defaultDb, email = defaultEmail) {
|
||||||
|
if (!email.isConfigured()) {
|
||||||
|
// SMTP off -> drain-and-discard so the queue can't grow unbounded on self-hosters
|
||||||
|
// who never set up email. (The endpoint also skips enqueue when off; this is the backstop.)
|
||||||
|
db.prepare('DELETE FROM agency_notifications WHERE sent_at IS NULL').run();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const groups = db.prepare(`
|
||||||
|
SELECT workspace_id, token_id, playlist_id, action, COUNT(*) AS n, GROUP_CONCAT(id) AS ids
|
||||||
|
FROM agency_notifications WHERE sent_at IS NULL
|
||||||
|
GROUP BY token_id, playlist_id, action
|
||||||
|
`).all();
|
||||||
|
|
||||||
|
for (const g of groups) {
|
||||||
|
try {
|
||||||
|
const recipients = resolveRecipients(db, g.workspace_id, g.playlist_id);
|
||||||
|
if (recipients.length) {
|
||||||
|
const { subject, text } = composeDigest(db, g);
|
||||||
|
for (const r of recipients) {
|
||||||
|
await email.sendEmail({ to: r.email, subject, text }); // throw -> caught below -> NOT stamped -> retried
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Stamp sent_at ONLY after every send for this group succeeded (or there were no
|
||||||
|
// recipients). A throw above skips this -> the rows stay unsent for the next cycle.
|
||||||
|
const now = Math.floor(Date.now() / 1000);
|
||||||
|
const stamp = db.prepare('UPDATE agency_notifications SET sent_at = ? WHERE id = ?');
|
||||||
|
db.transaction(() => { for (const id of g.ids.split(',')) stamp.run(now, id); })();
|
||||||
|
} catch (e) {
|
||||||
|
console.warn('agency digest: send failed, will retry next cycle:', e.message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function startAgencyDigest() {
|
||||||
|
setInterval(() => { flushAgencyDigests().catch(() => {}); }, FLUSH_MS);
|
||||||
|
console.log('Agency digest service started');
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = { startAgencyDigest, flushAgencyDigests, resolveRecipients, composeDigest };
|
||||||
75
server/test/agency-digest.test.js
Normal file
75
server/test/agency-digest.test.js
Normal file
|
|
@ -0,0 +1,75 @@
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
// #73 email digest robustness. Proves the two rules the design hinges on: (1) the queue
|
||||||
|
// never balloons when SMTP is off (drain-and-discard); (2) sent_at is stamped ONLY after a
|
||||||
|
// successful send, so a failure retries next cycle instead of silently dropping. Plus
|
||||||
|
// recipient resolution (org owner/admins + playlist owner, deduped) and digest grouping.
|
||||||
|
|
||||||
|
const { test } = require('node:test');
|
||||||
|
const assert = require('node:assert/strict');
|
||||||
|
const Database = require('better-sqlite3');
|
||||||
|
const { flushAgencyDigests, resolveRecipients } = require('../services/agency-digest');
|
||||||
|
|
||||||
|
function freshDb() {
|
||||||
|
const db = new Database(':memory:');
|
||||||
|
db.exec(`
|
||||||
|
CREATE TABLE agency_notifications (id INTEGER PRIMARY KEY AUTOINCREMENT, workspace_id TEXT, token_id TEXT, playlist_id TEXT, action TEXT, content_id TEXT, created_at INTEGER, sent_at INTEGER);
|
||||||
|
CREATE TABLE organization_members (organization_id TEXT, user_id TEXT, role TEXT);
|
||||||
|
CREATE TABLE workspaces (id TEXT, organization_id TEXT);
|
||||||
|
CREATE TABLE users (id TEXT, email TEXT);
|
||||||
|
CREATE TABLE playlists (id TEXT, user_id TEXT, name TEXT);
|
||||||
|
CREATE TABLE api_tokens (id TEXT, name TEXT);
|
||||||
|
INSERT INTO workspaces VALUES ('ws1','org1');
|
||||||
|
INSERT INTO users VALUES ('uOwner','owner@x'), ('uAdmin','admin@x'), ('uViewer','viewer@x'), ('uPlOwner','plowner@x');
|
||||||
|
INSERT INTO organization_members VALUES ('org1','uOwner','org_owner'), ('org1','uAdmin','org_admin'), ('org1','uViewer','member');
|
||||||
|
INSERT INTO playlists VALUES ('pl1','uPlOwner','Lobby');
|
||||||
|
INSERT INTO api_tokens VALUES ('tok1','Acme Agency');
|
||||||
|
`);
|
||||||
|
return db;
|
||||||
|
}
|
||||||
|
function enqueue(db, n, action = 'draft') {
|
||||||
|
const ins = db.prepare("INSERT INTO agency_notifications (workspace_id, token_id, playlist_id, action) VALUES ('ws1','tok1','pl1',?)");
|
||||||
|
for (let i = 0; i < n; i++) ins.run(action);
|
||||||
|
}
|
||||||
|
const cfg = (sendEmail) => ({ isConfigured: () => true, sendEmail });
|
||||||
|
const sink = () => { const sent = []; return { sent, sendEmail: async (m) => { sent.push(m); } }; };
|
||||||
|
|
||||||
|
test('#73 digest recipients: org owner + admins + playlist owner, deduped (NOT the viewer)', () => {
|
||||||
|
const emails = resolveRecipients(freshDb(), 'ws1', 'pl1').map(r => r.email).sort();
|
||||||
|
assert.deepEqual(emails, ['admin@x', 'owner@x', 'plowner@x']);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('#73 digest: 30 uploads -> ONE email per recipient (not 30), all rows stamped sent', async () => {
|
||||||
|
const db = freshDb();
|
||||||
|
enqueue(db, 30, 'draft');
|
||||||
|
const { sent, sendEmail } = sink();
|
||||||
|
await flushAgencyDigests(db, cfg(sendEmail));
|
||||||
|
assert.equal(sent.length, 3, '1 group x 3 recipients = 3 emails, not 30 per recipient');
|
||||||
|
assert.match(sent[0].subject, /Acme Agency added 30 items to "Lobby"/);
|
||||||
|
assert.equal(db.prepare('SELECT COUNT(*) c FROM agency_notifications WHERE sent_at IS NULL').get().c, 0);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('#73 digest: a failed send leaves rows UNSENT for retry (never silently dropped)', async () => {
|
||||||
|
const db = freshDb();
|
||||||
|
enqueue(db, 5, 'draft');
|
||||||
|
await flushAgencyDigests(db, cfg(async () => { throw new Error('smtp down'); }));
|
||||||
|
assert.equal(db.prepare('SELECT COUNT(*) c FROM agency_notifications WHERE sent_at IS NULL').get().c, 5, 'still unsent -> retried next cycle');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('#73 digest: SMTP off -> queue drained-and-discarded (never balloons)', async () => {
|
||||||
|
const db = freshDb();
|
||||||
|
enqueue(db, 10, 'draft');
|
||||||
|
await flushAgencyDigests(db, { isConfigured: () => false, sendEmail: async () => { throw new Error('must not send'); } });
|
||||||
|
assert.equal(db.prepare('SELECT COUNT(*) c FROM agency_notifications').get().c, 0, 'drained when email is off');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('#73 digest: draft vs published produce different subjects, grouped per action', async () => {
|
||||||
|
const db = freshDb();
|
||||||
|
enqueue(db, 2, 'draft');
|
||||||
|
enqueue(db, 3, 'published');
|
||||||
|
const { sent, sendEmail } = sink();
|
||||||
|
await flushAgencyDigests(db, cfg(sendEmail));
|
||||||
|
const subjects = sent.map(s => s.subject);
|
||||||
|
assert.ok(subjects.some(s => /awaiting your approval/.test(s)), 'draft digest mentions approval');
|
||||||
|
assert.ok(subjects.some(s => /updated "Lobby"/.test(s)), 'published digest says updated');
|
||||||
|
});
|
||||||
Loading…
Reference in a new issue