diff --git a/sipcord-bridge/src/transport/sip/audio_thread.rs b/sipcord-bridge/src/transport/sip/audio_thread.rs index 88d3dbe..cea3f02 100644 --- a/sipcord-bridge/src/transport/sip/audio_thread.rs +++ b/sipcord-bridge/src/transport/sip/audio_thread.rs @@ -316,6 +316,36 @@ fn is_call_valid(call_id: CallId) -> bool { } } +/// Short, log-friendly description of a pending op — avoids dumping sample buffers. +fn describe_op(op: &PendingPjsuaOp) -> String { + match op { + PendingPjsuaOp::PlayDirect { call_id, samples } => { + format!("PlayDirect {{ call_id: {}, samples: {} }}", call_id, samples.len()) + } + PendingPjsuaOp::StartLoop { call_id, samples } => { + format!("StartLoop {{ call_id: {}, samples: {} }}", call_id, samples.len()) + } + PendingPjsuaOp::StartStreaming { call_id, path, hangup_on_complete } => { + format!( + "StartStreaming {{ call_id: {}, path: {}, hangup_on_complete: {} }}", + call_id, + path.display(), + hangup_on_complete, + ) + } + PendingPjsuaOp::StartTestTone { call_id } => { + format!("StartTestTone {{ call_id: {} }}", call_id) + } + PendingPjsuaOp::Hangup { call_id } => format!("Hangup {{ call_id: {} }}", call_id), + PendingPjsuaOp::ConnectFaxPort { call_id, fax_slot, call_conf_port, .. } => { + format!( + "ConnectFaxPort {{ call_id: {}, fax_slot: {:?}, call_conf_port: {:?} }}", + call_id, fax_slot, call_conf_port, + ) + } + } +} + fn process_pending_pjsua_ops() { use super::ffi::direct_player::play_audio_to_call_direct_internal; use super::ffi::streaming_player::start_streaming_to_call; @@ -334,7 +364,7 @@ fn process_pending_pjsua_ops() { if let Some(cid) = call_id && !is_call_valid(cid) { - tracing::warn!("Skipping stale op for dead call {}: {:?}", cid, op); + tracing::warn!("Skipping stale op for dead call {}: {}", cid, describe_op(&op)); // For ConnectFaxPort, signal failure so the caller doesn't hang if let PendingPjsuaOp::ConnectFaxPort { done_tx, .. } = op { let _ = done_tx.send(false); diff --git a/sipcord-bridge/src/transport/sip/ffi/init.rs b/sipcord-bridge/src/transport/sip/ffi/init.rs index 477ab2b..f5c2ef2 100644 --- a/sipcord-bridge/src/transport/sip/ffi/init.rs +++ b/sipcord-bridge/src/transport/sip/ffi/init.rs @@ -64,11 +64,14 @@ use anyhow::{Context, Result}; use ipnet::Ipv4Net; use parking_lot::Mutex; use pjsua::*; +use std::collections::BTreeMap; use std::ffi::CString; use std::mem::MaybeUninit; +use std::net::IpAddr; use std::os::raw::{c_char, c_int}; use std::ptr; use std::sync::atomic::Ordering; +use std::time::Instant; /// Known PJSIP error conditions detected from log messages. /// @@ -91,6 +94,9 @@ pub enum PjsipEvent { /// SIP SUBSCRIBE for an unsupported event package (e.g. presence, dialog) /// — pjsip responds 489 Bad Event, which is correct; just noisy at ERROR level BadEventSubscription, + /// Inbound packet failed SIP parsing (UDP garbage flood, port scans, etc.). + /// Throttled per-source-IP to avoid log spam. + MalformedPacket, /// Unclassified message — logged at pjsip's original level Unclassified, } @@ -116,6 +122,9 @@ impl PjsipEvent { } else if msg.contains("Unable to create server subscription") { // SIP clients SUBSCRIBE to presence/dialog after REGISTER — expected and harmless (Self::BadEventSubscription, Some(4)) + } else if msg.contains("PJSIP syntax error exception") { + // Garbage packets / floods — throttled separately, level handled there + (Self::MalformedPacket, None) } else { (Self::Unclassified, None) } @@ -131,11 +140,107 @@ impl PjsipEvent { Self::TransportError => "TRANSPORT_ERROR", Self::NoMatchingCodec => "NO_MATCHING_CODEC", Self::BadEventSubscription => "BAD_EVENT_SUBSCRIBE", + Self::MalformedPacket => "MALFORMED_PACKET", Self::Unclassified => "UNCLASSIFIED", } } } +/// Per-source-IP throttle state for malformed-packet floods. +struct MalformedThrottle { + last_logged: Instant, + suppressed: u64, +} + +/// Log first packet from a new IP immediately, then suppress and emit a summary +/// every MALFORMED_LOG_INTERVAL_SECS while the flood continues. +const MALFORMED_LOG_INTERVAL_SECS: u64 = 60; +/// Drop tracking state after this much idle time so a returning IP gets a fresh +/// "first packet" log line rather than silently joining an old throttle bucket. +const MALFORMED_ENTRY_IDLE_SECS: u64 = 300; + +static MALFORMED_PACKET_THROTTLE: Mutex> = + Mutex::new(BTreeMap::new()); + +/// Extract `"IP:PORT"` from a "Dropping NNN bytes packet from UDP IP:PORT : ..." message. +fn extract_packet_source(msg: &str) -> Option<&str> { + let idx = msg.find("from ")?; + let rest = &msg[idx + 5..]; + // skip transport word ("UDP" / "TCP" / ...) + let space = rest.find(' ')?; + let after_transport = &rest[space + 1..]; + let end = after_transport + .find(|c: char| c == ' ' || c == '\t') + .unwrap_or(after_transport.len()); + Some(&after_transport[..end]) +} + +fn parse_peer_ip(peer: &str) -> Option { + // IPv6 form is "[::1]:5060"; IPv4 is "1.2.3.4:5060" + let host = if let Some(rest) = peer.strip_prefix('[') { + rest.split_once(']').map(|(h, _)| h)? + } else { + peer.rsplit_once(':').map(|(h, _)| h).unwrap_or(peer) + }; + host.parse().ok() +} + +/// Handle a "PJSIP syntax error exception" log line with per-IP throttling. +fn handle_malformed_packet(msg: &str) { + let peer = extract_packet_source(msg); + let Some(ip) = peer.and_then(parse_peer_ip) else { + // Couldn't parse source — log unthrottled so we don't silently drop unknown shapes. + tracing::warn!(target: "pjsip", event = "MALFORMED_PACKET", "{}", msg); + return; + }; + + let now = Instant::now(); + let mut map = MALFORMED_PACKET_THROTTLE.lock(); + map.retain(|_, st| now.duration_since(st.last_logged).as_secs() < MALFORMED_ENTRY_IDLE_SECS); + + match map.get_mut(&ip) { + None => { + map.insert( + ip, + MalformedThrottle { + last_logged: now, + suppressed: 0, + }, + ); + drop(map); + tracing::warn!( + target: "pjsip", + event = "MALFORMED_PACKET", + peer = %ip, + "malformed SIP packet from {} (further logs throttled to 1/{}s)", + ip, + MALFORMED_LOG_INTERVAL_SECS, + ); + } + Some(state) => { + state.suppressed += 1; + let elapsed = now.duration_since(state.last_logged).as_secs(); + if elapsed >= MALFORMED_LOG_INTERVAL_SECS { + let suppressed = state.suppressed; + state.suppressed = 0; + state.last_logged = now; + drop(map); + tracing::warn!( + target: "pjsip", + event = "MALFORMED_PACKET", + peer = %ip, + suppressed = suppressed, + window_secs = elapsed, + "still receiving malformed SIP packets from {} ({} in last {}s)", + ip, + suppressed, + elapsed, + ); + } + } + } +} + /// Extract "IP:PORT" from a PJSIP SSL error message. /// /// PJSIP ssl_sock logs include `peer: IP:PORT` at the end of the message. @@ -175,6 +280,11 @@ unsafe extern "C" fn pjsip_log_callback(level: c_int, data: *const c_char, _len: return; } + if event == PjsipEvent::MalformedPacket { + handle_malformed_packet(msg); + return; + } + if event != PjsipEvent::Unclassified { let tag = event.as_str(); match effective_level {