distributor/index.js
2025-06-21 00:37:55 -06:00

712 lines
21 KiB
JavaScript

require('dotenv').config({ override: true });
const config = require("./config.json");
const express = require('express');
const expressWs = require('express-ws');
const { client, xml } = require("@xmpp/client");
const colors = require('colors');
const html = require('html-entities');
const blacklist = require("./blacklist.json")
const wfos = require("./wfos.json")
const events = require("./events.json")
const os = require('os');
const fs = require('fs');
const path = require('path');
const markdown = require('markdown-it')();
const ejs = require('ejs');
const { connect } = require('http2');
const serveIndex = require('serve-index');
const hostname = process.env.HOST_OVERRIDE || os.hostname();
const app = express();
expressWs(app);
// Serve static files from the "public" directory
app.use(express.static('public'));
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
app.set('view engine', 'ejs');
app.set('views', path.join(__dirname, 'views'));
// Add ident headers
app.use((req, res, next) => {
res.header("Node-UUID", curUUID);
res.header("Node-Hostname", hostname);
next();
});
app.get("/", (req, res) => {
const markdownFilePath = path.join(__dirname, 'md', 'DOCS.md');
fs.readFile(markdownFilePath, 'utf8', (err, data) => {
if (err) {
console.error('Error reading markdown file:', err);
res.status(500).send('Internal Server Error');
return;
}
const htmlContent = markdown.render(data);
res.render('docViewer', { title: 'Websocket Docs', htmlContent: htmlContent });
});
});
app.use('/event-logs',
serveIndex(path.join(__dirname, 'event-logs'), { icons: true, view: , hidden: true }),
express.static(path.join(__dirname, 'event-logs'))
);
var socketStatus;
app.get('/health', (req, res) => {
res.status(200).json({
status: socketStatus,
connections: wsConnections.length,
uptime: process.uptime(),
messages: messages,
errCount: errCount,
startup: startup,
startTime: startTimestap,
uuid: curUUID,
});
});
global.wsConnections = [];
var roomList = [];
// IEM WebSocket
app.ws('/iem', (ws, req) => {
console.log(req.headers)
const clientIp = process.env.REALIP ? req.headers[process.env.REALIP.toLowerCase()] || req.ip : req.ip;
console.log(`connection from ${clientIp}`);
if (!req.headers['user-agent']) {
ws.send(JSON.stringify({
"type": "internal-response",
"code": 400,
"data": {
"error": "User-Agent header is required."
}
}));
console.log(`${colors.red("[ERROR]")} User-Agent header is required from ${clientIp}`);
ws.close();
return;
}
ws.send(JSON.stringify({
"type": "connection-info",
"status": socketStatus,
"uuid": curUUID,
"host": hostname,
"connections": wsConnections.length,
"uptime": process.uptime(),
"messages": messages,
"errCount": errCount,
"startup": startup,
"startTime": startTimestap,
"roomList": roomList,
"wfos": Object.keys(wfos).map((key) => {
return {
code: key,
location: wfos[key].location,
room: wfos[key].room
}
}),
iem
}));
sock = wsConnections.push({ ws, req, subs: [] });
ws.on('close', () => {
console.log(`disconnected from ${clientIp}`);
wsConnections = wsConnections.filter((conn) => conn.ws !== ws);
});
try {
ws.on('message', (msg) => {
let data;
try {
data = JSON.parse(msg);
} catch (error) {
ws.send(JSON.stringify({
"type": "internal-response",
"code": 400,
"data": {
"error": "Invalid JSON format."
}
}));
console.log(`${colors.red("[ERROR]")} Invalid JSON format from ${clientIp}: ${msg}`);
return;
}
if (!data.type) return ws.send(JSON.stringify({
"type": "internal-response",
"code": 400,
"data": {
"error": "Invalid request."
}
}));
switch (data.type) {
case "subscribe":
const subscriptionTarget = data.channel || getWFOroom(data.wfo) || "*";
if (subscriptionTarget === "*") {
wsConnections[sock - 1].subs.push(subscriptionTarget);
ws.send(JSON.stringify({
"type": "internal-response",
"code": 200,
"data": {
"message": `Subscribed to all channels.`
}
}));
break;
} else if (!roomList.includes(subscriptionTarget)) {
ws.send(JSON.stringify({
"type": "internal-response",
"code": 404,
"data": {
"error": "Invalid channel."
}
}));
break;
} else {
wsConnections[sock - 1].subs.push(subscriptionTarget);
ws.send(JSON.stringify({
"type": "internal-response",
"code": 200,
"data": {
"message": `Subscribed to ${subscriptionTarget}`
}
}));
break;
}
case "unsubscribe":
const unsubscribeTarget = data.channel || getWFOroom(data.wfo) || "*";
if (unsubscribeTarget === "*") {
wsConnections[sock - 1].subs = wsConnections[sock - 1].subs.filter((sub) => sub !== "*");
ws.send(JSON.stringify({
"type": "internal-response",
"code": 200,
"data": {
"message": `Unsubscribed from all channels.`
}
}));
break;
} else if (!getWFO(data.channel)) {
ws.send(JSON.stringify({
"type": "internal-response",
"code": 404,
"data": {
"error": "Invalid channel."
}
}));
break;
} else {
wsConnections[sock - 1].subs = wsConnections[sock - 1].subs.filter((sub) => sub !== unsubscribeTarget);
ws.send(JSON.stringify({
"type": "internal-response",
"code": 200,
"data": {
"message": `Unsubscribed from ${unsubscribeTarget}`
}
}));
break;
}
case "get-subscriptions":
ws.send(JSON.stringify({
"type": "internal-response",
"code": 200,
"data": {
"subscriptions": wsConnections[sock - 1].subs
}
}));
break;
case "room-list":
if (roomList.length > 0) {
ws.send(JSON.stringify({
"type": "room-list",
"code": 200,
"count": roomList.length,
"data": roomList
}));
} else {
ws.send(JSON.stringify({
"type": "room-list",
"code": 503,
"count": roomList.length,
"data": {
"error": "Room list is currently empty. Please try again later."
}
}));
}
break;
case "ping":
ws.send(JSON.stringify({
"type": "internal-response",
"code": 200,
"data": {
"message": "pong"
}
}));
break;
default:
ws.send(JSON.stringify({
"type": "internal-response",
"code": 400,
"data": {
"error": "Invalid request."
}
}));
break;
}
});
} catch (error) {
console.error(error);
ws.send(JSON.stringify({
"type": "internal-response",
"code": 500,
"data": {
"error": "Internal server error."
}
}));
}
})
// Random funcs
function toTitleCase(str) {
return str.replace(
/\w\S*/g,
function (txt) {
return txt.charAt(0).toUpperCase() + txt.substr(1).toLowerCase();
}
);
}
const parseProductID = function (product_id) {
const [timestamp, station, wmo, pil] = product_id.split("-");
return {
timestamp: convertDate(timestamp),
originalTimestamp: timestamp,
station,
wmo,
pil
};
}
// Convert date format 202405080131 (YYYYMMddHHmm) to iso format, hours and mins is UTC
const convertDate = function (date) {
const year = date.substring(0, 4);
const month = date.substring(4, 6);
const day = date.substring(6, 8);
const hours = date.substring(8, 10);
const mins = date.substring(10, 12);
// Because they don't have seconds, assume current seconds
const secs = new Date().getSeconds();
return new Date(Date.UTC(year, month - 1, day, hours, mins, secs));
}
// Get first url in a string, return object {string, url} remove the url from the string
const getFirstURL = function (string) {
url = string.match(/(https?:\/\/[^\s]+)/g);
if (!url) return { string, url: null };
const newString = string.replace(url[0], "");
return { string: newString, url: url[0] };
}
// Function to get the room name from the WFO code
const getWFOroom = function (code) {
if (typeof code !== 'string') {
return null;
}
code = code.toLowerCase();
if (wfos[code] && typeof wfos[code].room === 'string') {
return wfos[code].room;
} else {
return code;
}
}
// Function to get WFO data
const getWFO = function (code) {
code = code.toLowerCase();
if (wfos[code]) {
return wfos[code];
} else {
return null;
}
}
// Get WFO data from room name
function getWFOByRoom(room) {
room = room.toLowerCase();
for (const key in wfos) {
if (wfos.hasOwnProperty(key) && wfos[key].room === room) {
return wfos[key];
}
}
return {
location: room,
room: room
};
}
// func to Generate random string, ({upper, lower, number, special}, length)
const generateRandomString = function (options, length) {
let result = '';
const characters = {
upper: 'ABCDEFGHIJKLMNOPQRSTUVWXYZ',
lower: 'abcdefghijklmnopqrstuvwxyz',
number: '0123456789',
special: '!@#$%^&*()_+'
};
let chars = '';
for (const key in options) {
if (options[key]) {
chars += characters[key];
}
}
for (let i = 0; i < length; i++) {
result += chars.charAt(Math.floor(Math.random() * chars.length));
}
return result;
}
// Func to generate UUID
const generateUUID = function () {
return generateRandomString({ lower: true, upper: true, number: true }, 8) + "-" + generateRandomString({ lower: true, upper: true, number: true }, 4) + "-" + generateRandomString({ lower: true, upper: true, number: true }, 4) + "-" + generateRandomString({ lower: true, upper: true, number: true }, 4) + "-" + generateRandomString({ lower: true, upper: true, number: true }, 12);
}
// Variable setup
var iem = []
var startup = true;
var startTimestap = new Date();
var messages = 0;
var errCount = 0;
const curUUID = generateUUID();
// Start IEM XMPP Connection
const xmpp = client({
service: "xmpp://conference.weather.im",
domain: "weather.im",
resource: `discord-weather-bot-${generateRandomString({ upper: true, lower: true, number: true }, 5)}`, // Weird fix to "Username already in use"
});
//debug(xmpp, true);
xmpp.on("error", (err) => {
console.log(`${colors.red("[ERROR]")} XMPP Error: ${err}. Trying to reconnect...`);
// setTimeout(() => {
// xmpp.stop().then(() => {
// start();
// });
// }, 5000);
});
xmpp.on("offline", () => {
console.log(`${colors.yellow("[WARN]")} XMPP offline, trying to reconnect...`);
xmpp.disconnect().then(() => {
xmpp.stop().then(() => {
start();
}).catch((err) => {
console.log(`${colors.red("[ERROR]")} XMPP failed to stop: ${err}. Gonna go ahead and try to reconnect...`);
start();
});
}).catch((err) => {
console.log(`${colors.red("[ERROR]")} XMPP failed to disconnect: ${err}. Gonna go ahead and try to reconnect...`);
start();
});
});
var restartTimer = null;
var sent = [];
xmpp.on("stanza", (stanza) => {
// Debug stuff
//if (config.debug >= 2) console.log(`${colors.magenta("[DEBUG]")} Stanza: ${stanza.toString()}`);
// Handle Room List
if (stanza.is("iq") && stanza.attrs.type === "result" && stanza.getChild("query")) {
query = stanza.getChild("query");
if (query.attrs.xmlns === "http://jabber.org/protocol/disco#items") {
query.getChildren("item").forEach((item) => {
// Check if the JID is on the blacklist, if so, ignore it
if (blacklist.includes(item.attrs.jid)) return;
// get proper name from wfos
const wfo = getWFOByRoom(item.attrs.jid.split("@")[0]);
item.attrs.properName = wfo.location;
iem.push(item.attrs);
console.log(`${colors.cyan("[INFO]")} Found room: ${item.attrs.jid}`);
// Join the room
//xmpp.send(xml("presence", { to: `${channel.jid}/${channel.name}/${curUUID}` }, xml("item", { role: "visitor" })));
xmpp.send(xml("presence", { to: `${item.attrs.jid}/${curUUID}` }, xml("item", { role: "visitor" })));
roomList.push(item.attrs.jid.split("@")[0]);
});
}
}
// Get new messages and log them, ignore old messages
if (stanza.is("message") && stanza.attrs.type === "groupchat") {
// Stops spam from getting old messages
if (startup) return;
// Get channel name
fromChannel = stanza.attrs.from.split("@")[0];
// Ignores
if (!stanza.getChild("x")) return; // No PID, ignore it
if (!stanza.getChild("x").attrs.product_id) return;
const product_id = parseProductID(stanza.getChild("x").attrs.product_id);
const product_id_raw = stanza.getChild("x").attrs.product_id;
// Get body of message
const body = html.decode(stanza.getChildText("body"));
const bodyData = getFirstURL(body);
// get product id from "x" tag
var evt = events[product_id.pil.substring(0, 3)];
// Log the full event object to a file named "productid-timestamp-channelname.json"
const nowDate = new Date();
const year = nowDate.getFullYear();
const month = String(nowDate.getMonth() + 1).padStart(2, '0');
const day = String(nowDate.getDate()).padStart(2, '0');
const logDir = path.join(__dirname, "event-logs", year.toString(), month, day);
if (!fs.existsSync(logDir)) {
fs.mkdirSync(logDir, { recursive: true });
}
const logFilename = `${product_id_raw}-${product_id.timestamp.toISOString()}-${fromChannel}.json`.replace(/[:]/g, "_");
const logPath = path.join(logDir, logFilename);
const getCircularReplacer = () => {
const seen = new WeakSet();
return (key, value) => {
if (typeof value === "object" && value !== null) {
if (seen.has(value)) {
return "WARN: CIRC";
}
seen.add(value);
}
return value;
};
};
if (!evt) {
evt = { name: "Unknown", priority: 3 }
console.log(`${colors.red("[ERROR]")} Unknown event type: ${product_id.pil.substring(0, 3)}. Fix me`);
const logChannel = discord.guilds.cache.get(config.discord.mainGuild).channels.cache.get(config.discord.logChannel);
logChannel.send({
embeds: [
{
title: "Unknown Event Type",
description: `Unknown event type: ${product_id.pil.substring(0, 3)}. Please check the logs for more details.`,
color: 0xff0000
}
]
});
}
evt.code = product_id.pil.substring(0, 3);
// Check timestamp, if not within 3 minutes, ignore it
const now = new Date();
const diff = (now - product_id.timestamp) / 1000 / 60;
if (diff > 3) return;
// if (config.debug >= 1) console.log(`${colors.magenta("[DEBUG]")} New message from ${fromChannel}`);
console.log(`${colors.cyan("[INFO]")} ${getWFOByRoom(fromChannel).location} - ${product_id_raw} - ${evt.text} - ${product_id.timestamp}`);
messages++;
textTries = 0;
tryGetText = () => {
fetch(`https://mesonet.agron.iastate.edu/api/1/nwstext/${product_id_raw}`).then((res) => {
// If neither the body nor the product text contains the filter, ignore it
res.text().then((text) => {
if (wsConnections.length > 0) {
wsConnections.forEach((connection) => {
if (connection.subs.includes(fromChannel) || connection.subs.includes("*")) {
connection.ws.send(JSON.stringify({
"type": "iem-message",
"data": {
"channel": getWFOByRoom(fromChannel),
"event": evt,
"body": bodyData,
"timestamp": product_id.timestamp,
"wmo": product_id.wmo,
"pil": product_id.pil,
"station": product_id.station,
"product_data": product_id,
"raw": product_id_raw,
"rawBody": body,
"image": stanza.getChild("x").attrs.twitter_media || null,
"productText": text || null
}
}));
}
});
}
});
}).catch((err) => {
setTimeout(() => {
if (textTries >= 3) {
console.log(`${colors.red("[ERROR]")} Failed to fetch product text, giving up... ${err}`)
if (wsConnections.length > 0) {
wsConnections.forEach((connection) => {
if (connection.subs.includes(fromChannel) || connection.subs.includes("*")) {
connection.ws.send(JSON.stringify({
"type": "iem-message",
"data": {
"channel": getWFOByRoom(fromChannel),
"event": evt,
"body": bodyData.string,
"timestamp": product_id.timestamp,
"wmo": product_id.wmo,
"pil": product_id.pil,
"station": product_id.station,
"raw": product_id_raw,
"rawBody": body,
"image": stanza.getChild("x").attrs.twitter_media || null,
"productText": null
}
}));
}
});
}
} else {
textTries++;
console.log(`${colors.red("[ERROR]")} Failed to fetch product text, retrying... ${err}`)
setTimeout(tryGetText, 100);
}
})
});
fs.writeFileSync(logPath, JSON.stringify({
evt, stanza, product_id, product_id_raw, bodyData, body, sentData: {
"type": "iem-message",
"data": {
"channel": getWFOByRoom(fromChannel),
"event": evt,
"body": bodyData,
"timestamp": product_id.timestamp,
"wmo": product_id.wmo,
"pil": product_id.pil,
"station": product_id.station,
"product_data": product_id,
"raw": product_id_raw,
"rawBody": body,
"image": stanza.getChild("x").attrs.twitter_media || null,
"productText": text || null
}
}
}, getCircularReplacer(), 2), 'utf8');
}
tryGetText();
}
});
xmpp.on("status", (status) => {
socketStatus = status;
console.log(`${colors.cyan("[INFO]")} XMPP Status is ${status}`);
// Broadcast a message to all connected WebSocket clients
wsConnections.forEach((connection) => {
if (connection.ws.readyState === 1) { // Ensure the socket is open
connection.ws.send(JSON.stringify({
"type": "xmpp-status",
"status": status
}));
}
});
});
xmpp.reconnect.on("reconnecting", () => {
console.log(`${colors.yellow("[WARN]")} XMPP Reconnecting...`);
wsConnections.forEach((connection) => {
if (connection.ws.readyState === 1) { // Ensure the socket is open
connection.ws.send(JSON.stringify({
"type": "xmpp-reconnect",
"status": "reconnecting"
}));
}
});
})
xmpp.reconnect.on("reconnected", () => {
console.log(`${colors.green("[INFO]")} XMPP Reconnected`);
wsConnections.forEach((connection) => {
if (connection.ws.readyState === 1) { // Ensure the socket is open
connection.ws.send(JSON.stringify({
"type": "xmpp-reconnect",
"status": "reconnected"
}));
}
});
})
xmpp.on("online", async (address) => {
// if (config["uptime-kuma"].enabled) {
// fetch(config["uptime-kuma"].url).then(() => {
// console.log(`${colors.cyan("[INFO]")} Sent heartbeat to Uptime Kuma`)
// })
// setInterval(() => {
// // Send POST request to config["uptime-kuma"].url
// fetch(config["uptime-kuma"].url).then(() => {
// console.log(`${colors.cyan("[INFO]")} Sent heartbeat to Uptime Kuma`)
// })
// }, config["uptime-kuma"].interval * 1000) // Every X seconds
// }
// Start listening on all channels, (dont ban me funny man)
// for (const channel in iem) {
// console.log(`Joining ${channel.name}`)
// await xmpp.send(xml("presence", { to: `${channel.jud}/${channel.name}` }));
// }
/* sub format
<presence to="botstalk@conference.weather.im/add9b8f1-038d-47ed-b708-6ed60075a82f" xmlns="jabber:client">
<x xmlns="http://jabber.org/protocol/muc#user">
<item>
<role>visitor</role>
</item>
</x>
</presence>
*/
// Request room list
// Automatically find room list
xmpp.send(xml("iq", { type: "get", to: "conference.weather.im", id: "rooms" }, xml("query", { xmlns: "http://jabber.org/protocol/disco#items" })));
// Join all channels (Old method)
// iem.forEach((channel => {
// console.log(`${colors.cyan("[INFO]")} Joining ${channel.jid}/${channel.name}/${curUUID}`)
// //xmpp.send(xml("presence", { to: `${channel.jid}/${channel.jid.split("@")[0]}` }));
// xmpp.send(xml("presence", { to: `${channel.jid}/${channel.name}/${curUUID}` }, xml("item", { role: "visitor" })));
// }))
console.log(`${colors.cyan("[INFO]")} Connected to XMPP server as ${address.toString()}`);
setTimeout(() => {
startup = false;
console.log(`${colors.cyan("[INFO]")} Startup complete, listening for messages...`);
}, 1000)
});
xmpp.on("close", () => {
console.log(`${colors.yellow("[WARN]")} XMPP connection closed, trying to reconnect...`);
xmpp.disconnect().then(() => {
xmpp.stop().then(() => {
start();
})
})
})
const start = () => {
startup = true;
if (xmpp.status !== "offline") {
console.log(`${colors.yellow("[WARN]")} XMPP is already running, stopping it...`);
xmpp.stop().then(() => {
console.log(`${colors.cyan("[INFO]")} XMPP stopped, starting it again...`);
}).catch((err) => {
console.log(`${colors.red("[ERROR]")} Failed to stop XMPP: ${err}. Trying to start anyway...`);
});
}
xmpp.start().catch((err) => {
console.log(`${colors.red("[ERROR]")} XMPP failed to start: ${err}.`);
setTimeout(start, 5000);
});
}
// Start Express Server
const PORT = process.env.SERVER_PORT || 3000;
// Start the server
app.listen(PORT, () => {
console.log(`Server is listening on ${PORT}`);
start();
});