This commit is contained in:
coral 2026-05-11 14:47:50 -07:00
parent 096970e509
commit 9554949b60
2 changed files with 141 additions and 1 deletions

View file

@ -316,6 +316,36 @@ fn is_call_valid(call_id: CallId) -> bool {
}
}
/// Short, log-friendly description of a pending op — avoids dumping sample buffers.
fn describe_op(op: &PendingPjsuaOp) -> String {
match op {
PendingPjsuaOp::PlayDirect { call_id, samples } => {
format!("PlayDirect {{ call_id: {}, samples: {} }}", call_id, samples.len())
}
PendingPjsuaOp::StartLoop { call_id, samples } => {
format!("StartLoop {{ call_id: {}, samples: {} }}", call_id, samples.len())
}
PendingPjsuaOp::StartStreaming { call_id, path, hangup_on_complete } => {
format!(
"StartStreaming {{ call_id: {}, path: {}, hangup_on_complete: {} }}",
call_id,
path.display(),
hangup_on_complete,
)
}
PendingPjsuaOp::StartTestTone { call_id } => {
format!("StartTestTone {{ call_id: {} }}", call_id)
}
PendingPjsuaOp::Hangup { call_id } => format!("Hangup {{ call_id: {} }}", call_id),
PendingPjsuaOp::ConnectFaxPort { call_id, fax_slot, call_conf_port, .. } => {
format!(
"ConnectFaxPort {{ call_id: {}, fax_slot: {:?}, call_conf_port: {:?} }}",
call_id, fax_slot, call_conf_port,
)
}
}
}
fn process_pending_pjsua_ops() {
use super::ffi::direct_player::play_audio_to_call_direct_internal;
use super::ffi::streaming_player::start_streaming_to_call;
@ -334,7 +364,7 @@ fn process_pending_pjsua_ops() {
if let Some(cid) = call_id
&& !is_call_valid(cid)
{
tracing::warn!("Skipping stale op for dead call {}: {:?}", cid, op);
tracing::warn!("Skipping stale op for dead call {}: {}", cid, describe_op(&op));
// For ConnectFaxPort, signal failure so the caller doesn't hang
if let PendingPjsuaOp::ConnectFaxPort { done_tx, .. } = op {
let _ = done_tx.send(false);

View file

@ -64,11 +64,14 @@ use anyhow::{Context, Result};
use ipnet::Ipv4Net;
use parking_lot::Mutex;
use pjsua::*;
use std::collections::BTreeMap;
use std::ffi::CString;
use std::mem::MaybeUninit;
use std::net::IpAddr;
use std::os::raw::{c_char, c_int};
use std::ptr;
use std::sync::atomic::Ordering;
use std::time::Instant;
/// Known PJSIP error conditions detected from log messages.
///
@ -91,6 +94,9 @@ pub enum PjsipEvent {
/// SIP SUBSCRIBE for an unsupported event package (e.g. presence, dialog)
/// — pjsip responds 489 Bad Event, which is correct; just noisy at ERROR level
BadEventSubscription,
/// Inbound packet failed SIP parsing (UDP garbage flood, port scans, etc.).
/// Throttled per-source-IP to avoid log spam.
MalformedPacket,
/// Unclassified message — logged at pjsip's original level
Unclassified,
}
@ -116,6 +122,9 @@ impl PjsipEvent {
} else if msg.contains("Unable to create server subscription") {
// SIP clients SUBSCRIBE to presence/dialog after REGISTER — expected and harmless
(Self::BadEventSubscription, Some(4))
} else if msg.contains("PJSIP syntax error exception") {
// Garbage packets / floods — throttled separately, level handled there
(Self::MalformedPacket, None)
} else {
(Self::Unclassified, None)
}
@ -131,11 +140,107 @@ impl PjsipEvent {
Self::TransportError => "TRANSPORT_ERROR",
Self::NoMatchingCodec => "NO_MATCHING_CODEC",
Self::BadEventSubscription => "BAD_EVENT_SUBSCRIBE",
Self::MalformedPacket => "MALFORMED_PACKET",
Self::Unclassified => "UNCLASSIFIED",
}
}
}
/// Per-source-IP throttle state for malformed-packet floods.
struct MalformedThrottle {
last_logged: Instant,
suppressed: u64,
}
/// Log first packet from a new IP immediately, then suppress and emit a summary
/// every MALFORMED_LOG_INTERVAL_SECS while the flood continues.
const MALFORMED_LOG_INTERVAL_SECS: u64 = 60;
/// Drop tracking state after this much idle time so a returning IP gets a fresh
/// "first packet" log line rather than silently joining an old throttle bucket.
const MALFORMED_ENTRY_IDLE_SECS: u64 = 300;
static MALFORMED_PACKET_THROTTLE: Mutex<BTreeMap<IpAddr, MalformedThrottle>> =
Mutex::new(BTreeMap::new());
/// Extract `"IP:PORT"` from a "Dropping NNN bytes packet from UDP IP:PORT : ..." message.
fn extract_packet_source(msg: &str) -> Option<&str> {
let idx = msg.find("from ")?;
let rest = &msg[idx + 5..];
// skip transport word ("UDP" / "TCP" / ...)
let space = rest.find(' ')?;
let after_transport = &rest[space + 1..];
let end = after_transport
.find(|c: char| c == ' ' || c == '\t')
.unwrap_or(after_transport.len());
Some(&after_transport[..end])
}
fn parse_peer_ip(peer: &str) -> Option<IpAddr> {
// IPv6 form is "[::1]:5060"; IPv4 is "1.2.3.4:5060"
let host = if let Some(rest) = peer.strip_prefix('[') {
rest.split_once(']').map(|(h, _)| h)?
} else {
peer.rsplit_once(':').map(|(h, _)| h).unwrap_or(peer)
};
host.parse().ok()
}
/// Handle a "PJSIP syntax error exception" log line with per-IP throttling.
fn handle_malformed_packet(msg: &str) {
let peer = extract_packet_source(msg);
let Some(ip) = peer.and_then(parse_peer_ip) else {
// Couldn't parse source — log unthrottled so we don't silently drop unknown shapes.
tracing::warn!(target: "pjsip", event = "MALFORMED_PACKET", "{}", msg);
return;
};
let now = Instant::now();
let mut map = MALFORMED_PACKET_THROTTLE.lock();
map.retain(|_, st| now.duration_since(st.last_logged).as_secs() < MALFORMED_ENTRY_IDLE_SECS);
match map.get_mut(&ip) {
None => {
map.insert(
ip,
MalformedThrottle {
last_logged: now,
suppressed: 0,
},
);
drop(map);
tracing::warn!(
target: "pjsip",
event = "MALFORMED_PACKET",
peer = %ip,
"malformed SIP packet from {} (further logs throttled to 1/{}s)",
ip,
MALFORMED_LOG_INTERVAL_SECS,
);
}
Some(state) => {
state.suppressed += 1;
let elapsed = now.duration_since(state.last_logged).as_secs();
if elapsed >= MALFORMED_LOG_INTERVAL_SECS {
let suppressed = state.suppressed;
state.suppressed = 0;
state.last_logged = now;
drop(map);
tracing::warn!(
target: "pjsip",
event = "MALFORMED_PACKET",
peer = %ip,
suppressed = suppressed,
window_secs = elapsed,
"still receiving malformed SIP packets from {} ({} in last {}s)",
ip,
suppressed,
elapsed,
);
}
}
}
}
/// Extract "IP:PORT" from a PJSIP SSL error message.
///
/// PJSIP ssl_sock logs include `peer: IP:PORT` at the end of the message.
@ -175,6 +280,11 @@ unsafe extern "C" fn pjsip_log_callback(level: c_int, data: *const c_char, _len:
return;
}
if event == PjsipEvent::MalformedPacket {
handle_malformed_packet(msg);
return;
}
if event != PjsipEvent::Unclassified {
let tag = event.as_str();
match effective_level {