require("dotenv").config(); const ws = require("ws") /* Old NTFY Body from old system ntfyBody = { "topic": `${config.ntfy.prefix}${fromChannel}`, "message": bodyData.string, "tags": [`Timestamp: ${product_id.timestamp}`, `Station: ${product_id.station}`, `WMO: ${product_id.wmo}`, `PIL: ${product_id.pil}`, `Channel: ${fromChannel}`], "priority": evt.priority, "actions": [{ "action": "view", "label": "Product", "url": bodyData.url }, { "action": "view", "label": "Product Text", "url": `https://mesonet.agron.iastate.edu/api/1/nwstext/${product_id_raw}` }] } // Message body from websocket { "type": "iem-message", "data": { "channel": { "location": "", "room": "" }, "event": { "name": "", "priority": , "code": "" }, "body": "", "timestamp": "", "wmo": "", "pil": "", "station": "", "raw": "", "rawBody": "", "image": "", // Optional "productText": "" // Optional } } */ let retryQueue = []; function handleIEMMessage(msg) { if (!msg.data) return; const data = msg.data; ntfyBody = { topic: `${data.channel.room}`, message: data.body.string, tags: [ `Timestamp: ${data.timestamp}`, `Station: ${data.station}`, `WMO: ${data.wmo}`, `PIL: ${data.pil}`, `Channel: ${data.channel.room}`, ], priority: data.event.priority, actions: [ { action: "view", label: "Product", url: data.body.url, }, { action: "view", label: "Product Text", url: `https://mesonet.agron.iastate.edu/api/1/nwstext/${data.raw}`, }, ], }; if (data.image) { ntfyBody.attach = data.image; } // Send to NTFY fetch(process.env.NTFY_HOST, { method: "POST", headers: { "Content-Type": "application/json", "Authorization": `Bearer ${process.env.NTFY_TOKEN}`, }, body: JSON.stringify(ntfyBody), }).then((res) => { if (!res.ok) { retryQueue.push(ntfyBody); console.error(`Error sending to NTFY: ${res.statusText}. Added to retry queue.`); } else { console.log(`Sent to NTFY topic ${ntfyBody.topic}`); } }).catch((error) => { retryQueue.push(ntfyBody); console.error("Error sending to NTFY:", error); }); }; function processRetryQueue() { if (retryQueue.length === 0) return; const body = retryQueue.shift(); fetch(process.env.NTFY_HOST, { method: "POST", headers: { "Content-Type": "application/json", "Authorization": `Bearer ${process.env.NTFY_TOKEN}`, }, body: JSON.stringify(body), }).then((res) => { if (!res.ok) { retryQueue.push(body); console.error(`Error retrying to NTFY: ${res.statusText}. Re-added to retry queue.`); } else { console.log(`Retried and sent to NTFY topic ${body.topic}`); } }).catch((error) => { retryQueue.push(body); console.error("Error retrying to NTFY:", error); }); }; // Process retry queue every second setInterval(processRetryQueue, 1000); // Connect to process.env.IEM_WS_SERVER const wsClient = new ws(process.env.IEM_WS_SERVER, { headers: { "User-Agent": "ntfy-pusher/1.0", }, }); wsClient.on("open", function open() { console.log("Connected to IEM WebSocket server"); wsClient.send(JSON.stringify({ type: "subscribe", channel: "*" })); }); wsClient.on("message", function incoming(data) { let msg; try { msg = JSON.parse(data); } catch (error) { console.error("Error parsing message:", error); } if (!msg) return; // Ignore invalid messages switch (msg.type) { case "iem-message": handleIEMMessage(msg); break; } });