diff --git a/sipcord-bridge/Cargo.toml b/sipcord-bridge/Cargo.toml index 69fb8ab..a424381 100644 --- a/sipcord-bridge/Cargo.toml +++ b/sipcord-bridge/Cargo.toml @@ -83,7 +83,6 @@ rtrb = "0.3.2" async-trait = "0.1" # Utilities -anyhow = "1.0.100" thiserror = "2.0.18" tracing = "0.1.44" tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } diff --git a/sipcord-bridge/src/audio/flac.rs b/sipcord-bridge/src/audio/flac.rs index 1eb9de9..b5451b5 100644 --- a/sipcord-bridge/src/audio/flac.rs +++ b/sipcord-bridge/src/audio/flac.rs @@ -2,7 +2,7 @@ //! //! Parses FLAC file bytes to extract raw PCM i16 samples. -use anyhow::{Context, bail}; +use super::AudioParseError; use tracing::debug; /// Parse a FLAC file and return the raw PCM i16 samples (mono). @@ -11,9 +11,9 @@ use tracing::debug; /// - Standard FLAC files /// - Stereo to mono conversion (if needed) /// - Various bit depths (converted to 16-bit) -pub fn parse_flac(data: &[u8]) -> anyhow::Result<(Vec, u32)> { +pub fn parse_flac(data: &[u8]) -> Result<(Vec, u32), AudioParseError> { let cursor = std::io::Cursor::new(data); - let mut reader = claxon::FlacReader::new(cursor).context("Failed to create FLAC reader")?; + let mut reader = claxon::FlacReader::new(cursor)?; let info = reader.streaminfo(); let sample_rate = info.sample_rate; @@ -28,7 +28,7 @@ pub fn parse_flac(data: &[u8]) -> anyhow::Result<(Vec, u32)> { // Read all samples let mut raw_samples: Vec = Vec::new(); for sample in reader.samples() { - raw_samples.push(sample.context("Failed to read FLAC sample")?); + raw_samples.push(sample?); } // Convert to i16 based on bit depth @@ -37,7 +37,12 @@ pub fn parse_flac(data: &[u8]) -> anyhow::Result<(Vec, u32)> { 16 => raw_samples.iter().map(|&s| s as i16).collect(), 24 => raw_samples.iter().map(|&s| (s >> 8) as i16).collect(), 32 => raw_samples.iter().map(|&s| (s >> 16) as i16).collect(), - _ => bail!("Unsupported FLAC bit depth: {}", bits_per_sample), + n => { + return Err(AudioParseError::Unsupported(format!( + "FLAC bit depth: {}", + n + ))); + } }; // Convert to mono if stereo (samples are interleaved) diff --git a/sipcord-bridge/src/audio/mod.rs b/sipcord-bridge/src/audio/mod.rs index 5fa5b20..d80d531 100644 --- a/sipcord-bridge/src/audio/mod.rs +++ b/sipcord-bridge/src/audio/mod.rs @@ -6,3 +6,22 @@ pub mod flac; pub mod simd; pub mod wav; + +/// Errors that can occur while parsing a WAV or FLAC file. +#[derive(thiserror::Error, Debug)] +pub enum AudioParseError { + /// File header, chunk structure, or sample data was malformed for the + /// format implied by the magic bytes. Carries a short human-readable + /// reason (chunk name, byte offset, etc.). + #[error("malformed audio data: {0}")] + Malformed(String), + + /// Audio format is recognised but not supported by this parser (e.g. + /// non-PCM WAV, or a FLAC stream with an exotic bit depth). + #[error("unsupported audio: {0}")] + Unsupported(String), + + /// Underlying claxon FLAC decoder error. + #[error("FLAC decode error: {0}")] + Flac(#[from] claxon::Error), +} diff --git a/sipcord-bridge/src/audio/wav.rs b/sipcord-bridge/src/audio/wav.rs index 84835c3..cbab9dd 100644 --- a/sipcord-bridge/src/audio/wav.rs +++ b/sipcord-bridge/src/audio/wav.rs @@ -3,9 +3,17 @@ //! Parses WAV file bytes to extract raw PCM i16 samples. //! Supports standard PCM WAV files (format code 1). -use anyhow::ensure; +use super::AudioParseError; use tracing::debug; +fn ensure(cond: bool, msg: &'static str) -> Result<(), AudioParseError> { + if cond { + Ok(()) + } else { + Err(AudioParseError::Malformed(msg.into())) + } +} + /// WAV format chunk data #[derive(Debug)] struct WavFormat { @@ -25,11 +33,11 @@ struct WavFormat { /// - Standard PCM WAV files (format code 1) /// - Stereo to mono conversion (if needed) /// - 16-bit samples -pub fn parse_wav(data: &[u8]) -> anyhow::Result<(Vec, u32)> { +pub fn parse_wav(data: &[u8]) -> Result<(Vec, u32), AudioParseError> { // Validate RIFF header - ensure!(data.len() >= 12, "WAV file too short for header"); - ensure!(&data[0..4] == b"RIFF", "Missing RIFF header"); - ensure!(&data[8..12] == b"WAVE", "Missing WAVE format"); + ensure(data.len() >= 12, "WAV file too short for header")?; + ensure(&data[0..4] == b"RIFF", "missing RIFF header")?; + ensure(&data[8..12] == b"WAVE", "missing WAVE format")?; let mut pos = 12; let mut format: Option = None; @@ -45,7 +53,7 @@ pub fn parse_wav(data: &[u8]) -> anyhow::Result<(Vec, u32)> { match chunk_id { b"fmt " => { - ensure!(chunk_size >= 16, "fmt chunk too small"); + ensure(chunk_size >= 16, "fmt chunk too small")?; format = Some(WavFormat { audio_format: u16::from_le_bytes([data[pos], data[pos + 1]]), num_channels: u16::from_le_bytes([data[pos + 2], data[pos + 3]]), @@ -63,9 +71,19 @@ pub fn parse_wav(data: &[u8]) -> anyhow::Result<(Vec, u32)> { b"data" => { let fmt = format .as_ref() - .ok_or_else(|| anyhow::anyhow!("data chunk before fmt chunk"))?; - ensure!(fmt.audio_format == 1, "Only PCM format supported"); - ensure!(fmt.bits_per_sample == 16, "Only 16-bit samples supported"); + .ok_or_else(|| AudioParseError::Malformed("data chunk before fmt chunk".into()))?; + if fmt.audio_format != 1 { + return Err(AudioParseError::Unsupported(format!( + "WAV audio_format={} (only PCM=1 supported)", + fmt.audio_format + ))); + } + if fmt.bits_per_sample != 16 { + return Err(AudioParseError::Unsupported(format!( + "WAV bits_per_sample={} (only 16 supported)", + fmt.bits_per_sample + ))); + } let data_end = (pos + chunk_size).min(data.len()); let sample_data = &data[pos..data_end]; @@ -115,7 +133,7 @@ pub fn parse_wav(data: &[u8]) -> anyhow::Result<(Vec, u32)> { let sample_rate = format .as_ref() .map(|f| f.sample_rate) - .ok_or_else(|| anyhow::anyhow!("No fmt chunk found"))?; + .ok_or_else(|| AudioParseError::Malformed("no fmt chunk found".into()))?; Ok((samples, sample_rate)) } diff --git a/sipcord-bridge/src/call/mod.rs b/sipcord-bridge/src/call/mod.rs index 121ceb5..6dd794a 100644 --- a/sipcord-bridge/src/call/mod.rs +++ b/sipcord-bridge/src/call/mod.rs @@ -28,7 +28,8 @@ use crate::transport::sip::{ clear_channel_stale_audio, empty_bridge_grace_period_secs, register_call_channel, register_discord_to_sip, stop_loop, unregister_call_channel, unregister_discord_to_sip, }; -use anyhow::Result; +use crate::BridgeError; +use crate::services::sound::SoundError; use crossbeam_channel::{Receiver, Sender, bounded}; use dashmap::{DashMap, DashSet}; use std::collections::HashSet; @@ -146,16 +147,14 @@ impl BridgeCoordinator { sip_cmd_tx: Sender, sip_event_rx: Receiver, shared_discord: Arc, - ) -> Self { + ) -> Result { let (discord_event_tx, discord_event_rx) = bounded(1000); // Load sounds from config.toml let sounds_dir = PathBuf::from(&crate::config::EnvConfig::global().sounds_dir); + let sound_manager = create_sound_manager(sounds_dir)?; - let sound_manager = create_sound_manager(sounds_dir) - .expect("Failed to create SoundManager - check config.toml and sound files"); - - Self { + Ok(Self { backend, sip_cmd_tx, sip_event_rx, @@ -169,11 +168,11 @@ impl BridgeCoordinator { discord_event_rx, sound_manager, shared_discord, - } + }) } /// Run the bridge coordinator (consumes self) - pub async fn run(self) -> Result<()> { + pub async fn run(self) -> Result<(), BridgeError> { info!("Bridge coordinator started"); // Shared notify: VoiceReceiver signals this on unexpected DriverDisconnect, diff --git a/sipcord-bridge/src/config.rs b/sipcord-bridge/src/config.rs index bce6cf5..c239edd 100644 --- a/sipcord-bridge/src/config.rs +++ b/sipcord-bridge/src/config.rs @@ -1,8 +1,34 @@ -use anyhow::{Context, Result}; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::OnceLock; +/// Errors that can occur loading and validating bridge configuration. +#[derive(thiserror::Error, Debug)] +pub enum ConfigError { + #[error("failed to read config file {path:?}: {source}")] + Read { + path: PathBuf, + #[source] + source: std::io::Error, + }, + + #[error("failed to parse config file {path:?}: {source}")] + TomlParse { + path: PathBuf, + #[source] + source: toml::de::Error, + }, + + #[error("failed to parse environment variables: {0}")] + Envy(#[from] envy::Error), + + #[error("global EnvConfig has already been initialised")] + EnvAlreadyInitialised, + + #[error("required environment variable {0} is not set")] + MissingEnvVar(&'static str), +} + /// Global application config (loaded from config.toml) pub static APP_CONFIG: OnceLock = OnceLock::new(); @@ -85,30 +111,29 @@ pub struct EnvConfig { impl EnvConfig { /// Parse environment variables (via `envy`) and store in the global `OnceLock`. /// Call once at the top of `main()`. - pub fn init() -> Result<()> { + pub fn init() -> Result<(), ConfigError> { dotenvy::dotenv().ok(); - let cfg: EnvConfig = - envy::from_env().context("Failed to parse environment variables into EnvConfig")?; + let cfg: EnvConfig = envy::from_env()?; ENV_CONFIG .set(cfg) - .ok() - .context("EnvConfig already initialized")?; + .map_err(|_| ConfigError::EnvAlreadyInitialised)?; Ok(()) } - /// Access the global `EnvConfig` (panics if `init()` was not called). + /// Access the global `EnvConfig` (panics if `init()` was not called — a + /// programmer error, not a recoverable condition). pub fn global() -> &'static EnvConfig { - ENV_CONFIG - .get() - .expect("EnvConfig not initialized — call EnvConfig::init() first") + ENV_CONFIG.get().unwrap_or_else(|| { + panic!("EnvConfig not initialized — call EnvConfig::init() first") + }) } /// Build a `SipConfig` from the parsed environment. - pub fn to_sip_config(&self) -> Result { + pub fn to_sip_config(&self) -> Result { let public_host = self .sip_public_host .clone() - .context("SIP_PUBLIC_HOST required")?; + .ok_or(ConfigError::MissingEnvVar("SIP_PUBLIC_HOST"))?; let local_net = match (&self.sip_local_host, &self.sip_local_cidr) { (Some(host), Some(cidr)) => Some(LocalNetConfig { @@ -299,18 +324,23 @@ pub struct SoundEntry { impl AppConfig { /// Load configuration from a TOML file - pub fn load(path: &Path) -> Result { - let contents = std::fs::read_to_string(path) - .with_context(|| format!("Failed to read config file: {}", path.display()))?; - toml::from_str(&contents) - .with_context(|| format!("Failed to parse config file: {}", path.display())) + pub fn load(path: &Path) -> Result { + let contents = std::fs::read_to_string(path).map_err(|source| ConfigError::Read { + path: path.to_path_buf(), + source, + })?; + toml::from_str(&contents).map_err(|source| ConfigError::TomlParse { + path: path.to_path_buf(), + source, + }) } - /// Get the global application config (panics if not initialized) + /// Get the global application config (panics if not initialized — a + /// programmer error: caller must `AppConfig::load(...)` first). pub fn global() -> &'static AppConfig { - APP_CONFIG - .get() - .expect("AppConfig not initialized - call AppConfig::load() first") + APP_CONFIG.get().unwrap_or_else(|| { + panic!("AppConfig not initialized — call AppConfig::load() first") + }) } /// Get bridge config (with defaults if not loaded yet) @@ -370,7 +400,7 @@ pub struct LocalNetConfig { impl SipConfig { /// Load SIP configuration from environment variables. /// Standalone method for backends that don't need the full Config. - pub fn from_env() -> Result { + pub fn from_env() -> Result { EnvConfig::global().to_sip_config() } } diff --git a/sipcord-bridge/src/error.rs b/sipcord-bridge/src/error.rs new file mode 100644 index 0000000..750fee1 --- /dev/null +++ b/sipcord-bridge/src/error.rs @@ -0,0 +1,47 @@ +//! Top-level error type for the `sipcord-bridge` crate. +//! +//! [`BridgeError`] aggregates every subsystem error so callers (main binaries, +//! adapter crates) can use a single `Result` type and rely on `?` propagation +//! via `#[from]` conversions. + +use crate::audio::AudioParseError; +use crate::config::ConfigError; +use crate::fax::FaxError; +use crate::routing::CallError; +use crate::services::sound::SoundError; +use crate::transport::discord::DiscordError; +use crate::transport::sip::error::SipError; + +/// Umbrella error for the entire bridge crate. +#[derive(thiserror::Error, Debug)] +pub enum BridgeError { + #[error(transparent)] + Config(#[from] ConfigError), + + #[error(transparent)] + Sip(#[from] SipError), + + #[error(transparent)] + Discord(#[from] DiscordError), + + #[error(transparent)] + Routing(#[from] CallError), + + #[error(transparent)] + Fax(#[from] FaxError), + + #[error(transparent)] + Sound(#[from] SoundError), + + #[error(transparent)] + AudioParse(#[from] AudioParseError), + + /// Generic I/O at the top level (file ops in main, etc.) that aren't tied + /// to a particular subsystem. + #[error("I/O ({context}): {source}")] + Io { + context: String, + #[source] + source: std::io::Error, + }, +} diff --git a/sipcord-bridge/src/fax/discord_poster.rs b/sipcord-bridge/src/fax/discord_poster.rs index 63d0111..5b92c7d 100644 --- a/sipcord-bridge/src/fax/discord_poster.rs +++ b/sipcord-bridge/src/fax/discord_poster.rs @@ -5,8 +5,8 @@ //! - Replaced with "Fax Received" (green) with page image gallery on success //! - Edited to "Fax Failed" (red) with reason on failure +use super::FaxError; use crate::services::snowflake::Snowflake; -use anyhow::{Context, Result}; use serenity::all::{ChannelId, MessageId, UserId}; use serenity::builder::{ CreateAttachment, CreateEmbed, CreateEmbedFooter, CreateMessage, EditMessage, @@ -30,14 +30,20 @@ pub struct DiscordPoster { } impl DiscordPoster { - pub fn new(bot_token: String, channel_id: Snowflake, user_id: String) -> Self { - let token: Token = bot_token.parse().expect("invalid Discord bot token"); - Self { + pub fn new( + bot_token: String, + channel_id: Snowflake, + user_id: String, + ) -> Result { + let token: Token = bot_token + .parse() + .map_err(|e| FaxError::InvalidToken(format!("{e}")))?; + Ok(Self { http: Arc::new(Http::new(token)), channel_id: ChannelId::new(*channel_id), user_id, display_name: None, - } + }) } /// Resolve and cache the Discord display name for the user. @@ -70,7 +76,7 @@ impl DiscordPoster { } /// Post a "Receiving fax..." status message. Returns the message ID for future edits. - pub async fn post_fax_receiving(&mut self) -> Result { + pub async fn post_fax_receiving(&mut self) -> Result { self.resolve_display_name().await; let embed = CreateEmbed::new() @@ -83,8 +89,7 @@ impl DiscordPoster { .channel_id .widen() .send_message(&self.http, CreateMessage::new().embed(embed)) - .await - .context("Failed to post fax receiving message")?; + .await?; debug!("Posted fax receiving message: {}", msg.id); Ok(msg.id.get()) @@ -104,7 +109,7 @@ impl DiscordPoster { image_pages: Vec>, page_count: u32, file_ext: &str, - ) -> Result<()> { + ) -> Result<(), FaxError> { /// Discord's maximum number of embeds per message. const MAX_EMBEDS: u32 = 10; @@ -170,14 +175,18 @@ impl DiscordPoster { "Discord API error editing fax complete (msg={}, {} pages): {}", message_id, page_count, e ); - anyhow::bail!("Failed to edit fax complete message: {}", e); + return Err(FaxError::Discord(e)); } Ok(()) } /// Edit the status message to show a failure reason. - pub async fn edit_fax_failed(&self, message_id: u64, reason: &str) -> Result<()> { + pub async fn edit_fax_failed( + &self, + message_id: u64, + reason: &str, + ) -> Result<(), FaxError> { let embed = CreateEmbed::new() .title("Fax Failed") .description(reason) @@ -201,7 +210,7 @@ impl DiscordPoster { } /// Post a standalone failure message (when no "receiving" message was posted). - pub async fn post_fax_failed(&mut self, reason: &str) -> Result<()> { + pub async fn post_fax_failed(&mut self, reason: &str) -> Result<(), FaxError> { self.resolve_display_name().await; let embed = CreateEmbed::new() diff --git a/sipcord-bridge/src/fax/mod.rs b/sipcord-bridge/src/fax/mod.rs index 3193140..37402f7 100644 --- a/sipcord-bridge/src/fax/mod.rs +++ b/sipcord-bridge/src/fax/mod.rs @@ -16,3 +16,50 @@ pub mod discord_poster; pub mod session; pub mod spandsp; pub mod tiff_decoder; + +/// Errors from the fax subsystem. +/// +/// Variants are intentionally coarse — fax flows are end-to-end best-effort +/// (a missed page or codec mismatch logs and aborts the session) and the +/// detailed `String` payloads carry enough context for triage. Where a more +/// structured upstream type already exists (`serenity::Error`, `io::Error`), +/// we wrap it via `#[from]` / `#[source]`. +#[derive(thiserror::Error, Debug)] +pub enum FaxError { + /// Discord REST / gateway error while posting or editing fax status. + #[error("Discord post failed: {0}")] + Discord(#[from] serenity::Error), + + /// Token parsing failure when constructing the fax-posting client. + #[error("invalid Discord bot token: {0}")] + InvalidToken(String), + + /// I/O error reading/writing TIFFs or working with paths. + #[error("fax I/O ({context}): {source}")] + Io { + context: String, + #[source] + source: std::io::Error, + }, + + /// Path couldn't be converted to UTF-8 for the SpanDSP / TIFF API. + #[error("path is not valid UTF-8: {0}")] + NonUtf8Path(String), + + /// SpanDSP FFI returned an error from one of its setters or state-init + /// functions. + #[error("SpanDSP ({operation}): {detail}")] + SpanDsp { + operation: &'static str, + detail: String, + }, + + /// TIFF parsing / decoding failure. Carries a human-readable reason. + #[error("TIFF decode: {0}")] + Tiff(String), + + /// A received fax produced no pages (decoder bail-out, session closed + /// before any page was completed, etc.). + #[error("no pages in received fax")] + NoPages, +} diff --git a/sipcord-bridge/src/fax/session.rs b/sipcord-bridge/src/fax/session.rs index 8c87487..581e403 100644 --- a/sipcord-bridge/src/fax/session.rs +++ b/sipcord-bridge/src/fax/session.rs @@ -7,12 +7,12 @@ //! 4. On completion, TIFF is converted to PNG and posted to Discord //! 5. On failure or timeout, an error message is posted to Discord +use crate::fax::FaxError; use crate::fax::discord_poster::DiscordPoster; use crate::fax::spandsp::{FaxReceiver, FaxRxStatus, FaxT38Receiver}; use crate::fax::tiff_decoder; use crate::services::snowflake::Snowflake; use crate::transport::sip::CallId; -use anyhow::Result; use std::io::Cursor; use std::path::PathBuf; use std::time::Instant; @@ -89,7 +89,7 @@ impl FaxSession { guild_id: Snowflake, user_id: String, bot_token: String, - ) -> Result { + ) -> Result { let fax_config = crate::config::AppConfig::fax(); // Use configured tmp_folder or system temp dir @@ -109,12 +109,15 @@ impl FaxSession { }); let tiff_dir = base_dir.join(format!("{}{}", fax_config.prefix, session_id)); - std::fs::create_dir_all(&tiff_dir)?; + std::fs::create_dir_all(&tiff_dir).map_err(|source| FaxError::Io { + context: format!("create tiff dir {}", tiff_dir.display()), + source, + })?; let tiff_path = tiff_dir.join(format!("{}{}.tiff", fax_config.prefix, session_id)); let receiver = FaxReceiver::new_audio_receiver(&tiff_path)?; - let poster = DiscordPoster::new(bot_token, text_channel_id, user_id.clone()); + let poster = DiscordPoster::new(bot_token, text_channel_id, user_id.clone())?; Ok(Self { call_id, @@ -278,7 +281,7 @@ impl FaxSession { /// Post the initial "Receiving fax..." message to Discord. /// Called when fax negotiation is detected. - pub async fn post_receiving_message(&mut self) -> Result<()> { + pub async fn post_receiving_message(&mut self) -> Result<(), FaxError> { match self.poster.post_fax_receiving().await { Ok(msg_id) => { debug!( @@ -317,7 +320,7 @@ impl FaxSession { /// Convert the received TIFF to images and post to Discord. /// Called after fax reception is complete. - pub async fn convert_and_post(&mut self) -> Result<()> { + pub async fn convert_and_post(&mut self) -> Result<(), FaxError> { // Guard against double-processing: if we've already posted (Complete) or failed, // another caller (e.g., CallEnded racing with T.38 completion) already handled it. // Note: FaxState::Received is NOT skipped — that's the normal entry state. @@ -364,11 +367,12 @@ impl FaxSession { .write_to(&mut Cursor::new(&mut buf), output_format.image_format()) .map(|_| buf) }) - .collect::, _>>()?; + .collect::, _>>() + .map_err(|e| FaxError::Tiff(format!("image encode: {e}")))?; if image_pages.is_empty() { self.post_failure("No pages in received fax").await; - anyhow::bail!("No pages in received fax"); + return Err(FaxError::NoPages); } let page_count = image_pages.len() as u32; diff --git a/sipcord-bridge/src/fax/spandsp.rs b/sipcord-bridge/src/fax/spandsp.rs index 00a99d0..9fb1d72 100644 --- a/sipcord-bridge/src/fax/spandsp.rs +++ b/sipcord-bridge/src/fax/spandsp.rs @@ -3,7 +3,7 @@ //! Uses the `spandsp` safe wrapper crate to decode G.711 audio into TIFF images. //! Audio arrives at 16kHz from PJSUA conference bridge; we downsample to 8kHz for SpanDSP. -use anyhow::{Context, Result}; +use super::FaxError; use spandsp::fax::FaxState; use spandsp::logging::{LogLevel, LogShowFlags}; use spandsp::spandsp_sys; @@ -75,19 +75,30 @@ fn configure_t30( t30: &spandsp::t30::T30State, tiff_path: &str, callback_state: &mut FaxCallbackState, -) -> Result<()> { +) -> Result<(), FaxError> { + /// Local macro: tag a SpanDSP error with the operation name. Avoids + /// boilerplate at every setter call site. + macro_rules! spandsp_err { + ($op:expr) => { + |e| FaxError::SpanDsp { + operation: $op, + detail: e.to_string(), + } + }; + } + t30.set_rx_file(tiff_path, -1) - .map_err(|e| anyhow::anyhow!("Failed to set rx file: {}", e))?; + .map_err(spandsp_err!("set_rx_file"))?; t30.set_supported_modems(T30ModemSupport::default()) - .map_err(|e| anyhow::anyhow!("Failed to set supported modems: {}", e))?; + .map_err(spandsp_err!("set_supported_modems"))?; t30.set_ecm_capability(true) - .map_err(|e| anyhow::anyhow!("Failed to set ECM: {}", e))?; + .map_err(spandsp_err!("set_ecm_capability"))?; let compressions = T4_COMPRESSION_T4_1D | T4_COMPRESSION_T4_2D | T4_COMPRESSION_T6; t30.set_supported_compressions(compressions) - .map_err(|e| anyhow::anyhow!("Failed to set compressions: {}", e))?; + .map_err(spandsp_err!("set_supported_compressions"))?; let sizes = T4_SUPPORT_WIDTH_215MM | T4_SUPPORT_WIDTH_255MM @@ -97,7 +108,7 @@ fn configure_t30( | T4_RESOLUTION_R8_SUPERFINE | T4_RESOLUTION_200_200; t30.set_supported_image_sizes(sizes) - .map_err(|e| anyhow::anyhow!("Failed to set image sizes: {}", e))?; + .map_err(spandsp_err!("set_supported_image_sizes"))?; let user_data = callback_state as *mut FaxCallbackState as *mut std::ffi::c_void; unsafe { @@ -208,15 +219,20 @@ impl FaxReceiver { /// Create a new fax receiver in audio mode. /// /// Initializes SpanDSP in receive mode and sets the output TIFF path. - pub fn new_audio_receiver(tiff_path: &Path) -> Result { - let tiff_path_str = tiff_path.to_str().context("Invalid TIFF path")?; + pub fn new_audio_receiver(tiff_path: &Path) -> Result { + let tiff_path_str = tiff_path + .to_str() + .ok_or_else(|| FaxError::NonUtf8Path(tiff_path.display().to_string()))?; - let fax = FaxState::new(false) - .map_err(|e| anyhow::anyhow!("Failed to initialize SpanDSP fax state: {}", e))?; + let fax = FaxState::new(false).map_err(|e| FaxError::SpanDsp { + operation: "FaxState::new", + detail: e.to_string(), + })?; - let t30 = fax - .get_t30_state() - .map_err(|e| anyhow::anyhow!("Failed to get T.30 state: {}", e))?; + let t30 = fax.get_t30_state().map_err(|e| FaxError::SpanDsp { + operation: "FaxState::get_t30_state", + detail: e.to_string(), + })?; let mut callback_state = Box::new(FaxCallbackState { negotiation_started: false, @@ -350,8 +366,13 @@ impl FaxT38Receiver { /// /// `tiff_path`: Where to write the received fax TIFF file. /// `tx_ifp_sender`: Channel for outgoing IFP packets (sent to UDPTL socket). - pub fn new(tiff_path: &Path, tx_ifp_sender: mpsc::UnboundedSender>) -> Result { - let tiff_path_str = tiff_path.to_str().context("Invalid TIFF path")?; + pub fn new( + tiff_path: &Path, + tx_ifp_sender: mpsc::UnboundedSender>, + ) -> Result { + let tiff_path_str = tiff_path + .to_str() + .ok_or_else(|| FaxError::NonUtf8Path(tiff_path.display().to_string()))?; let tx_callback_state = Box::new(TxCallbackState { sender: tx_ifp_sender, @@ -359,13 +380,18 @@ impl FaxT38Receiver { let tx_user_data = &*tx_callback_state as *const TxCallbackState as *mut std::ffi::c_void; let terminal = unsafe { - T38Terminal::new_raw(false, Some(tx_packet_handler), tx_user_data) - .map_err(|e| anyhow::anyhow!("Failed to initialize T38Terminal: {}", e))? + T38Terminal::new_raw(false, Some(tx_packet_handler), tx_user_data).map_err(|e| { + FaxError::SpanDsp { + operation: "T38Terminal::new_raw", + detail: e.to_string(), + } + })? }; - let t30 = terminal - .get_t30_state() - .map_err(|e| anyhow::anyhow!("Failed to get T.30 state from T38Terminal: {}", e))?; + let t30 = terminal.get_t30_state().map_err(|e| FaxError::SpanDsp { + operation: "T38Terminal::get_t30_state", + detail: e.to_string(), + })?; let mut callback_state = Box::new(FaxCallbackState { negotiation_started: false, @@ -384,7 +410,10 @@ impl FaxT38Receiver { let t38_core = terminal .get_t38_core_state() - .map_err(|e| anyhow::anyhow!("Failed to get T38Core: {}", e))?; + .map_err(|e| FaxError::SpanDsp { + operation: "T38Terminal::get_t38_core_state", + detail: e.to_string(), + })?; configure_log_state(spandsp_sys::t38_core_get_logging_state(t38_core.as_ptr())); } diff --git a/sipcord-bridge/src/fax/tiff_decoder.rs b/sipcord-bridge/src/fax/tiff_decoder.rs index 310d7e7..f50e1ac 100644 --- a/sipcord-bridge/src/fax/tiff_decoder.rs +++ b/sipcord-bridge/src/fax/tiff_decoder.rs @@ -6,12 +6,20 @@ //! Huffman table data derived from the ITU-T T.4 standard. //! Bit-reading approach inspired by the `fax` crate (MIT licensed). -use anyhow::{Result, bail}; +use super::FaxError; use image::GrayImage; use std::path::Path; use std::sync::OnceLock; use tracing::debug; +/// Convenience: most failures in this module are malformed-TIFF conditions +/// that map cleanly onto `FaxError::Tiff(String)`. +macro_rules! tiff_bail { + ($($arg:tt)*) => { + return Err(FaxError::Tiff(format!($($arg)*))) + }; +} + // Public API /// Maximum TIFF file size (50 MB). Well above any reasonable fax output from SpanDSP, @@ -19,19 +27,27 @@ use tracing::debug; const MAX_TIFF_SIZE: u64 = 50 * 1024 * 1024; /// Decode all pages of a fax TIFF file into grayscale images. -pub fn decode_fax_tiff(path: &Path) -> Result> { +pub fn decode_fax_tiff(path: &Path) -> Result, FaxError> { if !path.exists() { - bail!("TIFF file not found: {}", path.display()); + tiff_bail!("TIFF file not found: {}", path.display()); } - let file_size = std::fs::metadata(path)?.len(); + let file_size = std::fs::metadata(path) + .map_err(|source| FaxError::Io { + context: format!("metadata({})", path.display()), + source, + })? + .len(); if file_size > MAX_TIFF_SIZE { - bail!( + tiff_bail!( "TIFF file too large: {} bytes (max {} bytes)", file_size, MAX_TIFF_SIZE ); } - let data = std::fs::read(path)?; + let data = std::fs::read(path).map_err(|source| FaxError::Io { + context: format!("read({})", path.display()), + source, + })?; let pages = parse_tiff_ifds(&data)?; let mut images = Vec::with_capacity(pages.len()); @@ -51,7 +67,7 @@ pub fn decode_fax_tiff(path: &Path) -> Result> { let start = *off as usize; let end = start + *len as usize; if end > data.len() { - bail!( + tiff_bail!( "TIFF strip extends past file: offset={}, count={}, file_len={}", off, len, @@ -71,7 +87,7 @@ pub fn decode_fax_tiff(path: &Path) -> Result> { let transitions_per_line = match page.compression { 3 => decode_group3(&strip_data, page.width, page.height, page.t4_options)?, 4 => decode_group4(&strip_data, page.width, page.height)?, - other => bail!("Unsupported TIFF compression: {}", other), + other => tiff_bail!("Unsupported TIFF compression: {}", other), }; let img = assemble_image( @@ -104,18 +120,18 @@ struct TiffPage { y_resolution: Option<(u32, u32)>, // numerator, denominator (RATIONAL) } -fn parse_tiff_ifds(data: &[u8]) -> Result> { +fn parse_tiff_ifds(data: &[u8]) -> Result, FaxError> { if data.len() < 8 { - bail!("TIFF file too short"); + tiff_bail!("TIFF file too short"); } let le = match (data[0], data[1]) { (0x49, 0x49) => true, (0x4D, 0x4D) => false, - _ => bail!("Not a TIFF file"), + _ => tiff_bail!("Not a TIFF file"), }; let magic = read_u16(data, 2, le); if magic != 42 { - bail!("Bad TIFF magic: {}", magic); + tiff_bail!("Bad TIFF magic: {}", magic); } let mut ifd_offset = read_u32(data, 4, le) as usize; @@ -184,7 +200,7 @@ fn parse_tiff_ifds(data: &[u8]) -> Result> { } if pages.is_empty() { - bail!("No IFDs found in TIFF"); + tiff_bail!("No IFDs found in TIFF"); } Ok(pages) } @@ -830,7 +846,12 @@ fn decode_line_2d(reader: &mut BitReader, reference: &[u16], width: u16) -> Opti // Group 3 Image Driver -fn decode_group3(data: &[u8], width: u32, height: u32, t4_options: u32) -> Result>> { +fn decode_group3( + data: &[u8], + width: u32, + height: u32, + t4_options: u32, +) -> Result>, FaxError> { let w = width as u16; let is_2d = (t4_options & 1) != 0; let has_fill_bits = (t4_options & 4) != 0; @@ -840,7 +861,7 @@ fn decode_group3(data: &[u8], width: u32, height: u32, t4_options: u32) -> Resul // Scan for the first EOL if !reader.scan_for_eol() { - bail!("No EOL found at start of Group 3 data"); + tiff_bail!("No EOL found at start of Group 3 data"); } for _ in 0..height { @@ -926,14 +947,14 @@ fn decode_group3(data: &[u8], width: u32, height: u32, t4_options: u32) -> Resul } if lines.is_empty() { - bail!("Group 3 decoder produced no lines"); + tiff_bail!("Group 3 decoder produced no lines"); } Ok(lines) } // Group 4 Image Driver -fn decode_group4(data: &[u8], width: u32, height: u32) -> Result>> { +fn decode_group4(data: &[u8], width: u32, height: u32) -> Result>, FaxError> { let w = width as u16; let mut reader = BitReader::new(data); let mut lines: Vec> = Vec::with_capacity(height as usize); @@ -958,7 +979,7 @@ fn decode_group4(data: &[u8], width: u32, height: u32) -> Result>> } if lines.is_empty() { - bail!("Group 4 decoder produced no lines"); + tiff_bail!("Group 4 decoder produced no lines"); } Ok(lines) } diff --git a/sipcord-bridge/src/lib.rs b/sipcord-bridge/src/lib.rs index bded370..731871c 100644 --- a/sipcord-bridge/src/lib.rs +++ b/sipcord-bridge/src/lib.rs @@ -8,11 +8,19 @@ //! and authentication. A built-in `StaticBackend` (TOML dialplan) is included. #![feature(portable_simd)] +// Lock down the no-unwrap policy. Test modules opt out via the +// `#[cfg_attr(test, allow(...))]` shim at their boundary (or `#[allow]` at +// the test fn level for isolated cases). See feedback memories +// `feedback-no-unwrap-in-production` and `feedback-fix-clippy-at-source`. +#![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::expect_used))] pub mod audio; pub mod call; pub mod config; +pub mod error; pub mod fax; pub mod routing; pub mod services; pub mod transport; + +pub use error::BridgeError; diff --git a/sipcord-bridge/src/main.rs b/sipcord-bridge/src/main.rs index 9a34b3b..86c7076 100644 --- a/sipcord-bridge/src/main.rs +++ b/sipcord-bridge/src/main.rs @@ -3,25 +3,32 @@ //! Standalone SIP-to-Discord voice bridge using a TOML dialplan. #![feature(portable_simd)] +#![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::expect_used))] use std::path::PathBuf; use std::sync::Arc; -use anyhow::{Context, Result}; use tracing::{error, info}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use sipcord_bridge::BridgeError; use sipcord_bridge::call::BridgeCoordinator; -use sipcord_bridge::config::{APP_CONFIG, AppConfig, EnvConfig, SipConfig}; +use sipcord_bridge::config::{APP_CONFIG, AppConfig, ConfigError, EnvConfig, SipConfig}; use sipcord_bridge::routing::static_router::StaticBackend; use sipcord_bridge::transport::discord::SharedDiscordClient; use sipcord_bridge::transport::sip::SipTransport; #[tokio::main] -async fn main() -> Result<()> { - rustls::crypto::ring::default_provider() +async fn main() -> Result<(), BridgeError> { + // Pre-init failures here are programmer errors (missing rustls feature + // flag, double-init of the global crypto provider) — panicking is the + // right behaviour and there's no caller that could recover. + if rustls::crypto::ring::default_provider() .install_default() - .expect("Failed to install rustls crypto provider"); + .is_err() + { + panic!("rustls crypto provider already installed or feature missing"); + } tracing_subscriber::registry() .with( @@ -39,17 +46,17 @@ async fn main() -> Result<()> { let app_config = AppConfig::load(&config_path)?; APP_CONFIG .set(app_config) - .expect("AppConfig already initialized"); + .map_err(|_| BridgeError::Config(ConfigError::EnvAlreadyInitialised))?; info!("Loaded config from {}", config_path.display()); run_static_router().await } -async fn run_static_router() -> Result<()> { +async fn run_static_router() -> Result<(), BridgeError> { let bot_token = EnvConfig::global() .discord_bot_token .clone() - .context("DISCORD_BOT_TOKEN required")?; + .ok_or(ConfigError::MissingEnvVar("DISCORD_BOT_TOKEN"))?; let sip_config = SipConfig::from_env()?; // Load dialplan @@ -73,9 +80,7 @@ async fn run_static_router() -> Result<()> { }); // Create shared Discord client - let shared_discord = SharedDiscordClient::new(&bot_token) - .await - .expect("Failed to create shared Discord client"); + let shared_discord = SharedDiscordClient::new(&bot_token).await?; info!("Shared Discord client initialized"); let bridge = BridgeCoordinator::new( @@ -83,7 +88,7 @@ async fn run_static_router() -> Result<()> { sip_transport.commands(), sip_transport.events(), shared_discord, - ); + )?; info!("Starting components..."); diff --git a/sipcord-bridge/src/routing/mod.rs b/sipcord-bridge/src/routing/mod.rs index 3e4f708..208730c 100644 --- a/sipcord-bridge/src/routing/mod.rs +++ b/sipcord-bridge/src/routing/mod.rs @@ -41,12 +41,17 @@ pub enum RouteDecision { } /// Errors that trigger audio playback before hangup -#[derive(Debug, Clone, Copy)] +#[derive(thiserror::Error, Debug, Clone, Copy)] pub enum CallError { + #[error("no channel mapping for the dialed extension")] NoChannelMapping, + #[error("user lacks permission for the target Discord channel")] NoPermissions, + #[error("Discord API error")] DiscordApiError, + #[error("server is busy")] ServerBusy, + #[error("unknown call error")] Unknown, } diff --git a/sipcord-bridge/src/routing/static_router.rs b/sipcord-bridge/src/routing/static_router.rs index 44afcc5..838ebeb 100644 --- a/sipcord-bridge/src/routing/static_router.rs +++ b/sipcord-bridge/src/routing/static_router.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use serde::Deserialize; use tracing::info; +use crate::config::ConfigError; use crate::routing::{Backend, CallError, CallStartedInfo, OutboundCallRequest, RouteDecision}; use crate::services::snowflake::Snowflake; use crate::transport::sip::DigestAuthParams; @@ -46,11 +47,15 @@ pub struct StaticBackend { impl StaticBackend { /// Load the dialplan from a TOML file. `bot_token` comes from the environment. - pub fn load(path: &Path, bot_token: String) -> anyhow::Result { - let content = std::fs::read_to_string(path) - .map_err(|e| anyhow::anyhow!("Failed to read {}: {}", path.display(), e))?; - let dialplan: Dialplan = toml::from_str(&content) - .map_err(|e| anyhow::anyhow!("Failed to parse {}: {}", path.display(), e))?; + pub fn load(path: &Path, bot_token: String) -> Result { + let content = std::fs::read_to_string(path).map_err(|source| ConfigError::Read { + path: path.to_path_buf(), + source, + })?; + let dialplan: Dialplan = toml::from_str(&content).map_err(|source| ConfigError::TomlParse { + path: path.to_path_buf(), + source, + })?; info!( "Loaded dialplan from {} ({} extensions)", diff --git a/sipcord-bridge/src/services/registrar.rs b/sipcord-bridge/src/services/registrar.rs index e19c423..bae606f 100644 --- a/sipcord-bridge/src/services/registrar.rs +++ b/sipcord-bridge/src/services/registrar.rs @@ -169,6 +169,18 @@ impl Registrar { } } +/// Start the periodic cleanup task +pub fn spawn_cleanup_task(registrar: Arc) { + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(30)); + loop { + interval.tick().await; + registrar.remove_expired(); + debug!("Registrar cleanup complete"); + } + }); +} + #[cfg(test)] mod tests { use super::*; @@ -313,15 +325,3 @@ mod tests { assert_eq!(contacts[0].0, "sip:charlie@5.6.7.8"); } } - -/// Start the periodic cleanup task -pub fn spawn_cleanup_task(registrar: Arc) { - tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(30)); - loop { - interval.tick().await; - registrar.remove_expired(); - debug!("Registrar cleanup complete"); - } - }); -} diff --git a/sipcord-bridge/src/services/sound/mod.rs b/sipcord-bridge/src/services/sound/mod.rs index 29ae292..1d2ea74 100644 --- a/sipcord-bridge/src/services/sound/mod.rs +++ b/sipcord-bridge/src/services/sound/mod.rs @@ -8,16 +8,51 @@ mod streaming; -use crate::audio::{flac, wav}; +use crate::audio::{AudioParseError, flac, wav}; use crate::config::{AppConfig, SoundEntry}; use crate::transport::sip::CONF_SAMPLE_RATE; -use anyhow::{Context, Result}; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; use tracing::{debug, info, warn}; -pub use streaming::StreamingPlayer; +pub use streaming::{StreamingError, StreamingPlayer}; + +/// Errors raised by sound loading and parsing. +#[derive(thiserror::Error, Debug)] +pub enum SoundError { + /// Failed to read the sound file from disk. + #[error("failed to read sound file {path:?}: {source}")] + Read { + path: PathBuf, + #[source] + source: std::io::Error, + }, + + /// Parse failure from the WAV/FLAC decoder. + #[error("failed to parse audio for {name}: {source}")] + Parse { + name: String, + #[source] + source: AudioParseError, + }, + + /// Sound's sample rate didn't match the bridge's configured rate. + #[error("sound {name} has wrong sample rate: {got} Hz (expected {expected} Hz)")] + WrongSampleRate { + name: String, + got: u32, + expected: u32, + }, + + /// File header doesn't match any supported format (WAV / FLAC). + #[error("unknown audio format for {name}: header bytes {header:02x?}")] + UnknownFormat { name: String, header: Vec }, + + /// Streaming player setup failure. + #[error(transparent)] + Streaming(#[from] StreamingError), +} /// A preloaded sound ready for immediate playback #[derive(Debug, Clone)] @@ -49,7 +84,7 @@ pub struct SoundManager { impl SoundManager { /// Create a new SoundManager and load sounds from config - pub fn new(sounds_dir: PathBuf) -> Result { + pub fn new(sounds_dir: PathBuf) -> Result { let config = AppConfig::global(); let mut manager = Self { preloaded: HashMap::new(), @@ -63,7 +98,7 @@ impl SoundManager { } /// Load all sounds from config entries - fn load_sounds(&mut self, entries: &HashMap) -> Result<()> { + fn load_sounds(&mut self, entries: &HashMap) -> Result<(), SoundError> { let mut preloaded_count = 0; let mut streaming_count = 0; let mut virtual_count = 0; @@ -135,9 +170,15 @@ impl SoundManager { } /// Load a preloaded sound from a file - fn load_preloaded_sound(&self, path: &Path, name: &str) -> Result { - let data = std::fs::read(path) - .with_context(|| format!("Failed to read sound file: {}", path.display()))?; + fn load_preloaded_sound( + &self, + path: &Path, + name: &str, + ) -> Result { + let data = std::fs::read(path).map_err(|source| SoundError::Read { + path: path.to_path_buf(), + source, + })?; let samples = self.parse_audio(&data, name)?; @@ -149,21 +190,22 @@ impl SoundManager { }) } - /// Parse audio data (auto-detect WAV or FLAC format) - /// Expects 16kHz mono - panics if wrong sample rate - fn parse_audio(&self, data: &[u8], name: &str) -> Result> { + /// Parse audio data (auto-detect WAV or FLAC format). + /// Expects 16kHz mono — returns `WrongSampleRate` otherwise. + fn parse_audio(&self, data: &[u8], name: &str) -> Result, SoundError> { // Check for FLAC magic number: "fLaC" if data.len() >= 4 && &data[0..4] == b"fLaC" { debug!("Detected FLAC format for '{}'", name); - let (samples, rate) = flac::parse_flac(data) - .with_context(|| format!("Failed to parse FLAC for sound '{}'", name))?; + let (samples, rate) = flac::parse_flac(data).map_err(|source| SoundError::Parse { + name: name.to_string(), + source, + })?; if rate != CONF_SAMPLE_RATE { - anyhow::bail!( - "Sound '{}' has wrong sample rate: {} Hz (expected {} Hz). Pre-resample the file.", - name, - rate, - CONF_SAMPLE_RATE - ); + return Err(SoundError::WrongSampleRate { + name: name.to_string(), + got: rate, + expected: CONF_SAMPLE_RATE, + }); } return Ok(samples); } @@ -171,24 +213,24 @@ impl SoundManager { // Check for WAV magic number: "RIFF" if data.len() >= 4 && &data[0..4] == b"RIFF" { debug!("Detected WAV format for '{}'", name); - let (samples, rate) = wav::parse_wav(data) - .with_context(|| format!("Failed to parse WAV for sound '{}'", name))?; + let (samples, rate) = wav::parse_wav(data).map_err(|source| SoundError::Parse { + name: name.to_string(), + source, + })?; if rate != CONF_SAMPLE_RATE { - anyhow::bail!( - "Sound '{}' has wrong sample rate: {} Hz (expected {} Hz). Pre-resample the file.", - name, - rate, - CONF_SAMPLE_RATE - ); + return Err(SoundError::WrongSampleRate { + name: name.to_string(), + got: rate, + expected: CONF_SAMPLE_RATE, + }); } return Ok(samples); } - anyhow::bail!( - "Unknown audio format for '{}': header bytes {:?}", - name, - &data[..4.min(data.len())] - ) + Err(SoundError::UnknownFormat { + name: name.to_string(), + header: data[..4.min(data.len())].to_vec(), + }) } /// Get a preloaded sound by name @@ -235,6 +277,6 @@ impl SoundManager { } /// Create an Arc-wrapped SoundManager for sharing across async tasks -pub fn create_sound_manager(sounds_dir: PathBuf) -> Result> { +pub fn create_sound_manager(sounds_dir: PathBuf) -> Result, SoundError> { Ok(Arc::new(SoundManager::new(sounds_dir)?)) } diff --git a/sipcord-bridge/src/services/sound/streaming.rs b/sipcord-bridge/src/services/sound/streaming.rs index a6cba94..9545e71 100644 --- a/sipcord-bridge/src/services/sound/streaming.rs +++ b/sipcord-bridge/src/services/sound/streaming.rs @@ -6,10 +6,9 @@ //! Uses Symphonia for FLAC decoding (pure Rust). use crate::transport::sip::CONF_SAMPLE_RATE; -use anyhow::{Context, Result}; use std::collections::VecDeque; use std::fs::File; -use std::path::Path; +use std::path::{Path, PathBuf}; use symphonia::core::audio::{AudioBufferRef, Signal}; use symphonia::core::codecs::{CODEC_TYPE_NULL, DecoderOptions}; use symphonia::core::formats::FormatOptions; @@ -17,6 +16,40 @@ use symphonia::core::io::MediaSourceStream; use symphonia::core::meta::MetadataOptions; use symphonia::core::probe::Hint; +/// Errors raised by the streaming player. +#[derive(thiserror::Error, Debug)] +pub enum StreamingError { + #[error("failed to open streaming file {path:?}: {source}")] + Open { + path: PathBuf, + #[source] + source: std::io::Error, + }, + + #[error("failed to probe streaming format {path:?}: {source}")] + Probe { + path: PathBuf, + #[source] + source: symphonia::core::errors::Error, + }, + + #[error("streaming file {path:?} has no audio track")] + NoTrack { path: PathBuf }, + + #[error("streaming file {path:?} has no sample rate")] + NoSampleRate { path: PathBuf }, + + #[error("streaming file {path:?} has wrong sample rate: {got} Hz (expected {expected} Hz)")] + WrongSampleRate { + path: PathBuf, + got: u32, + expected: u32, + }, + + #[error("failed to create streaming decoder: {0}")] + Decoder(#[source] symphonia::core::errors::Error), +} + /// Streaming player for large audio files /// /// Reads FLAC frames on-demand to avoid loading entire file into memory. @@ -39,9 +72,11 @@ pub struct StreamingPlayer { impl StreamingPlayer { /// Create a new streaming player for a FLAC file - pub fn new(path: &Path) -> Result { - let file = File::open(path) - .with_context(|| format!("Failed to open streaming file: {}", path.display()))?; + pub fn new(path: &Path) -> Result { + let file = File::open(path).map_err(|source| StreamingError::Open { + path: path.to_path_buf(), + source, + })?; let mss = MediaSourceStream::new(Box::new(file), Default::default()); @@ -57,7 +92,10 @@ impl StreamingPlayer { &FormatOptions::default(), &MetadataOptions::default(), ) - .with_context(|| format!("Failed to probe format: {}", path.display()))?; + .map_err(|source| StreamingError::Probe { + path: path.to_path_buf(), + source, + })?; let format = probed.format; @@ -66,7 +104,9 @@ impl StreamingPlayer { .tracks() .iter() .find(|t| t.codec_params.codec != CODEC_TYPE_NULL) - .ok_or_else(|| anyhow::anyhow!("No audio track found in {}", path.display()))?; + .ok_or_else(|| StreamingError::NoTrack { + path: path.to_path_buf(), + })?; let track_id = track.id; @@ -74,15 +114,16 @@ impl StreamingPlayer { let sample_rate = track .codec_params .sample_rate - .ok_or_else(|| anyhow::anyhow!("No sample rate in track"))?; + .ok_or_else(|| StreamingError::NoSampleRate { + path: path.to_path_buf(), + })?; if sample_rate != CONF_SAMPLE_RATE { - anyhow::bail!( - "Streaming file {} has wrong sample rate: {} Hz (expected {} Hz)", - path.display(), - sample_rate, - CONF_SAMPLE_RATE - ); + return Err(StreamingError::WrongSampleRate { + path: path.to_path_buf(), + got: sample_rate, + expected: CONF_SAMPLE_RATE, + }); } let channels = track.codec_params.channels.map(|c| c.count()).unwrap_or(1); @@ -99,7 +140,7 @@ impl StreamingPlayer { let decoder = symphonia::default::get_codecs() .make(&track.codec_params, &DecoderOptions::default()) - .with_context(|| "Failed to create decoder")?; + .map_err(StreamingError::Decoder)?; Ok(Self { format, diff --git a/sipcord-bridge/src/transport/discord/mod.rs b/sipcord-bridge/src/transport/discord/mod.rs index c6255cd..e0fa6ec 100644 --- a/sipcord-bridge/src/transport/discord/mod.rs +++ b/sipcord-bridge/src/transport/discord/mod.rs @@ -2,7 +2,6 @@ mod voice; use crate::audio::simd; use crate::services::snowflake::Snowflake; -use anyhow::Result; use audioadapter::Adapter; use audioadapter_buffers::direct::SequentialSliceOfVecs; use crossbeam_channel::Sender; @@ -29,6 +28,25 @@ use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::oneshot; use tracing::{debug, error, info, trace, warn}; +/// Errors raised by the Discord voice transport. +#[derive(thiserror::Error, Debug)] +pub enum DiscordError { + /// Discord bot token rejected by serenity (malformed, missing parts, etc.). + #[error("invalid Discord bot token: {0}")] + InvalidToken(String), + + /// Serenity / songbird error (gateway, REST, voice connect). + #[error(transparent)] + Serenity(#[from] serenity::Error), + + /// Songbird voice join failed after the configured number of retries. + #[error("failed to join voice channel after {attempts} attempts: {last_error}")] + JoinFailed { + attempts: u32, + last_error: String, + }, +} + // Direct audio path: SIP audio thread → Discord // Uses lock-free ring buffer for real-time audio streaming @@ -161,16 +179,19 @@ fn create_resampler() -> Async { window: WindowFunction::BlackmanHarris2, }; - // 16kHz to 48kHz = ratio of 3.0, mono input, 320 samples per chunk (20ms at 16kHz) + // 16kHz to 48kHz = ratio of 3.0, mono input, 320 samples per chunk (20ms at 16kHz). + // Params are static and known-good; if rubato rejects them it's a programmer + // error (e.g. ratio constants changed inconsistently) and the program can't + // run anyway — panic explicitly with the rubato error for diagnostics. Async::new_sinc( - 48000.0 / 16000.0, // resample ratio (3.0x) - 1.1, // max relative ratio (allow slight variation) + 48000.0 / 16000.0, + 1.1, ¶ms, - 320, // chunk size (samples per frame at 16kHz) - 1, // mono channel - FixedAsync::Input, // fixed input size + 320, + 1, + FixedAsync::Input, ) - .expect("Failed to create resampler") + .unwrap_or_else(|e| panic!("create_resampler: rubato rejected static params: {e}")) } /// RAII guard for a registered Discord audio sender. @@ -481,7 +502,7 @@ impl SharedDiscordClient { /// This opens a single gateway WebSocket connection that stays alive for /// the bridge's lifetime. The returned Songbird manager is used by all /// voice connections to join/leave channels. - pub async fn new(bot_token: &str) -> Result> { + pub async fn new(bot_token: &str) -> Result, DiscordError> { info!("Creating shared Discord client (single gateway connection)"); let intents = GatewayIntents::GUILDS | GatewayIntents::GUILD_VOICE_STATES; @@ -494,7 +515,7 @@ impl SharedDiscordClient { let token: Token = bot_token .parse() - .map_err(|e| anyhow::anyhow!("Invalid bot token: {}", e))?; + .map_err(|e| DiscordError::InvalidToken(format!("{e}")))?; let mut client = Client::builder(token, intents) .event_handler(Arc::new(SharedClientEventHandler { ready_tx })) @@ -607,7 +628,7 @@ impl DiscordVoiceConnection { channel_id: Snowflake, event_tx: Sender, health_check_notify: Arc, - ) -> Result { + ) -> Result { info!( "Joining voice channel {} in guild {} for bridge {} (using shared client)", channel_id, guild_id, bridge_id @@ -654,10 +675,12 @@ impl DiscordVoiceConnection { // This allows the pjsua audio thread to bypass tokio entirely let audio_sender = RegisteredAudioSender::new(channel_id, producer); - // Create shared timestamp for health tracking + // Create shared timestamp for health tracking. The system + // clock would have to be set before 1970 to produce Err + // here; default to 0 in that case rather than panic. let now_ms = SystemTime::now() .duration_since(UNIX_EPOCH) - .unwrap() + .unwrap_or_default() .as_millis() as u64; let last_audio_received = Arc::new(AtomicU64::new(now_ms)); @@ -774,11 +797,10 @@ impl DiscordVoiceConnection { } // All retries failed - anyhow::bail!( - "Failed to join voice channel after {} attempts: {:?}", - max_retries, - last_error - ) + Err(DiscordError::JoinFailed { + attempts: max_retries, + last_error: format!("{:?}", last_error), + }) } /// Send audio to the Discord voice channel @@ -799,7 +821,7 @@ impl DiscordVoiceConnection { let now_ms = SystemTime::now() .duration_since(UNIX_EPOCH) - .unwrap() + .unwrap_or_default() .as_millis() as u64; let last_recv = self.inner.last_audio_received.load(Ordering::Relaxed); @@ -1070,7 +1092,7 @@ impl VoiceEventHandler for VoiceReceiver { // Update health tracking timestamp - VoiceTick arriving means Discord is alive let now_ms = SystemTime::now() .duration_since(UNIX_EPOCH) - .unwrap() + .unwrap_or_default() .as_millis() as u64; self.last_audio_received.store(now_ms, Ordering::Relaxed); diff --git a/sipcord-bridge/src/transport/discord/voice.rs b/sipcord-bridge/src/transport/discord/voice.rs index 4071d98..0ed190b 100644 --- a/sipcord-bridge/src/transport/discord/voice.rs +++ b/sipcord-bridge/src/transport/discord/voice.rs @@ -181,8 +181,21 @@ impl Read for StreamingAudioSource { return Ok(buf.len()); } - // Read samples from ring buffer directly into output buffer - let chunk = consumer.read_chunk(samples_to_read).unwrap(); + // Read samples from ring buffer directly into output buffer. + // `samples_to_read <= samples_available` by construction, so this + // should never error; if rtrb's state ever desyncs, log + silence + // rather than panic on the audio thread. + let chunk = match consumer.read_chunk(samples_to_read) { + Ok(c) => c, + Err(e) => { + tracing::warn!( + "StreamingAudioSource: rtrb read_chunk unexpectedly failed ({:?}), filling with silence", + e + ); + buf.fill(0); + return Ok(buf.len()); + } + }; let (first, second) = chunk.as_slices(); // Bulk copy f32 samples as raw bytes (memcpy instead of per-sample loop) diff --git a/sipcord-bridge/src/transport/sip/callbacks.rs b/sipcord-bridge/src/transport/sip/callbacks.rs index 6000315..76b2edf 100644 --- a/sipcord-bridge/src/transport/sip/callbacks.rs +++ b/sipcord-bridge/src/transport/sip/callbacks.rs @@ -49,10 +49,9 @@ use super::ffi::utils::{extract_sip_username, pj_str_to_string}; use dashmap::DashMap; use parking_lot::Mutex; use pjsua::*; -use std::ffi::CString; use std::mem::MaybeUninit; use std::net::IpAddr; -use std::os::raw::{c_char, c_int}; +use std::os::raw::c_int; use std::ptr; /// Global sender for outbound call events (set during initialization) @@ -112,9 +111,7 @@ pub unsafe fn extract_user_agent(rdata: *const pjsip_rx_data) -> Option } // Find User-Agent header by name - let hdr_name = CString::new("User-Agent").ok()?; - let name = pj_str(hdr_name.as_ptr() as *mut c_char); - + let name = super::ffi::pj_str::pj_str_from_cstr(c"User-Agent"); let hdr = pjsip_msg_find_hdr_by_name(msg, &name, ptr::null_mut()); if hdr.is_null() { return None; @@ -236,58 +233,16 @@ pub unsafe fn extract_digest_auth_from_rdata( /// Send 401 Unauthorized response with WWW-Authenticate header pub unsafe fn send_401_challenge(call_id: CallId, www_auth: &str) { unsafe { - // Create the WWW-Authenticate header - let hdr_name = CString::new("WWW-Authenticate").unwrap(); - let hdr_value = CString::new(www_auth).unwrap(); - - // Create msg_data with the WWW-Authenticate header - let mut msg_data = MaybeUninit::::uninit(); - pjsua_msg_data_init(msg_data.as_mut_ptr()); - let msg_data_ptr = msg_data.assume_init_mut(); - - // Create a pool for the header - let pool = pjsua_pool_create(c"auth".as_ptr(), 512, 512); - if pool.is_null() { - tracing::error!("Failed to create pool for 401 challenge"); - pjsua_call_hangup(*call_id, 500, ptr::null(), ptr::null()); - return; - } - - // Create the header - let name = pj_str(hdr_name.as_ptr() as *mut c_char); - let value = pj_str(hdr_value.as_ptr() as *mut c_char); - - let hdr = pjsip_generic_string_hdr_create(pool, &name, &value); - - if !hdr.is_null() { - // Add header to the list using pj_list_insert_before (insert at end of list) - pj_list_insert_before( - &mut msg_data_ptr.hdr_list as *mut _ as *mut pj_list_type, - hdr as *mut pj_list_type, - ); - } - - // Send 401 response - this will cause pjsua to send the response and then - // the client should retry with Authorization header - let reason = CString::new("Unauthorized").unwrap(); - let reason_pj = pj_str(reason.as_ptr() as *mut c_char); - - let status = pjsua_call_answer(*call_id, 401, &reason_pj, msg_data_ptr); - if status != pj_constants__PJ_SUCCESS as i32 { - tracing::warn!( - "Failed to send 401 challenge for call {}: {}", - call_id, - status - ); - // Hangup if we can't send challenge + if let Err(e) = super::ffi::pj_str::answer_call_with_headers( + *call_id, + 401, + c"Unauthorized", + c"auth", + &[(c"WWW-Authenticate", www_auth)], + ) { + tracing::warn!("Failed to send 401 challenge for call {}: {}", call_id, e); pjsua_call_hangup(*call_id, 500, ptr::null(), ptr::null()); } - - // DO NOT release the pool here - PJSUA may still need the header data - // after pjsua_call_answer returns. The pool will be cleaned up when - // pjsua is destroyed. This leaks ~512 bytes per 401 challenge but - // prevents use-after-free crashes. - // TODO: Track pools per-call and release them in on_call_state when call ends } } @@ -329,59 +284,26 @@ pub unsafe fn send_302_redirect(call_id: CallId, target_domain: &str, extension: // Create the Contact header: sip:extension@target_domain let contact_uri = format!("sip:{}@{}", extension, target_domain); - let hdr_name = CString::new("Contact").unwrap(); - let hdr_value = CString::new(contact_uri).unwrap(); - // Create msg_data with the Contact header - let mut msg_data = MaybeUninit::::uninit(); - pjsua_msg_data_init(msg_data.as_mut_ptr()); - let msg_data_ptr = msg_data.assume_init_mut(); - - // Create a pool for the header - let pool = pjsua_pool_create(c"redirect".as_ptr(), 512, 512); - if pool.is_null() { - tracing::error!("Failed to create pool for 302 redirect"); - pjsua_call_hangup(*call_id, 500, ptr::null(), ptr::null()); - return; + match super::ffi::pj_str::answer_call_with_headers( + *call_id, + 302, + c"Moved Temporarily", + c"redirect", + &[(c"Contact", contact_uri.as_str())], + ) { + Err(e) => { + tracing::warn!("Failed to send 302 redirect for call {}: {}", call_id, e); + pjsua_call_hangup(*call_id, 500, ptr::null(), ptr::null()); + } + Ok(()) => { + tracing::info!( + "Sent 302 redirect for call {} to {}", + call_id, + target_domain + ); + } } - - // Create the header - let name = pj_str(hdr_name.as_ptr() as *mut c_char); - let value = pj_str(hdr_value.as_ptr() as *mut c_char); - - let hdr = pjsip_generic_string_hdr_create(pool, &name, &value); - - if !hdr.is_null() { - // Add header to the list using pj_list_insert_before (insert at end of list) - pj_list_insert_before( - &mut msg_data_ptr.hdr_list as *mut _ as *mut pj_list_type, - hdr as *mut pj_list_type, - ); - } - - // Send 302 response - let reason = CString::new("Moved Temporarily").unwrap(); - let reason_pj = pj_str(reason.as_ptr() as *mut c_char); - - let status = pjsua_call_answer(*call_id, 302, &reason_pj, msg_data_ptr); - if status != pj_constants__PJ_SUCCESS as i32 { - tracing::warn!( - "Failed to send 302 redirect for call {}: {}", - call_id, - status - ); - // Hangup if we can't send redirect - pjsua_call_hangup(*call_id, 500, ptr::null(), ptr::null()); - } else { - tracing::info!( - "Sent 302 redirect for call {} to {}", - call_id, - target_domain - ); - } - - // DO NOT release the pool here - PJSUA may still need the header data - // after pjsua_call_answer returns. Same issue as send_401_challenge. } } diff --git a/sipcord-bridge/src/transport/sip/error.rs b/sipcord-bridge/src/transport/sip/error.rs new file mode 100644 index 0000000..810be59 --- /dev/null +++ b/sipcord-bridge/src/transport/sip/error.rs @@ -0,0 +1,162 @@ +//! Typed error types for the SIP transport layer. +//! +//! Three sibling enums cover the three phases of the SIP path: +//! +//! - [`SipInitError`] — startup: pjsua create/init/start, transports, codecs, +//! account registration. One-shot failures that take down the process. +//! - [`SipResponseError`] — building or sending an individual SIP response +//! (401/302/200 etc.) from inside an FFI callback. Per-call, recoverable +//! in the sense that we log and continue. +//! - [`SipAudioError`] — runtime audio plumbing: hooking players into a +//! call's conference port. Surfaces when media isn't ready yet, when a +//! port name has an interior NUL, or when pjsua refuses a connect. +//! +//! [`SipError`] is the umbrella for callers that want to handle any of them +//! uniformly. Conversion is via `#[from]`, so `?` propagation works through +//! the hierarchy without explicit `map_err`. + +use std::ffi::NulError; + +/// Umbrella error for everything the SIP transport layer can return. +#[derive(thiserror::Error, Debug)] +pub enum SipError { + #[error(transparent)] + Init(#[from] SipInitError), + + #[error(transparent)] + Response(#[from] SipResponseError), + + #[error(transparent)] + Audio(#[from] SipAudioError), + + #[error(transparent)] + Call(#[from] SipCallError), +} + +/// Errors raised by outbound-call setup (`make_outbound_call`). +#[derive(thiserror::Error, Debug)] +pub enum SipCallError { + /// A URI / display-name string couldn't be converted to CString because + /// of an interior NUL byte. + #[error("invalid {field} for outbound call: {source}")] + InvalidString { + field: &'static str, + #[source] + source: NulError, + }, + + /// `pjsua_call_make_call` returned non-success. + #[error("pjsua_call_make_call failed (status {0})")] + MakeCall(i32), +} + +/// Errors raised by `init_pjsua`, `create_tls_transport`, `reload_tls_transport`, +/// `process_pjsua_events`, and friends. +#[derive(thiserror::Error, Debug)] +pub enum SipInitError { + /// A pjsua API returned a non-success status code. `operation` names the + /// specific call (`"pjsua_create"`, `"pjsua_init"`, `"pjsua_start"`, + /// `"pjsua_acc_add"`, `"pjsua_set_null_snd_dev"`, `"pjsua_handle_events"`, + /// etc.). + #[error("pjsua {operation} failed (status {status})")] + Pjsua { + operation: &'static str, + status: i32, + }, + + /// `pjsua_transport_create` failed for `kind` ("UDP", "TCP", or "TLS"). + #[error("transport create ({kind}) failed (status {status})")] + TransportCreate { + kind: &'static str, + status: i32, + }, + + /// A configuration string (host name, URI, etc.) couldn't be converted + /// to a `CString` because of an interior NUL byte. + #[error("invalid {field} string for FFI: {source}")] + InvalidString { + field: &'static str, + #[source] + source: NulError, + }, + + /// A `Path` to be passed into pjsua wasn't valid UTF-8. + #[error("{field} path is not valid UTF-8")] + NonUtf8Path { field: &'static str }, +} + +/// Errors raised by audio-port plumbing (`play_audio_to_call_direct`, +/// `start_loop`, `start_test_tone_to_call`, etc.) and the helpers in +/// `frame_utils`. +#[derive(thiserror::Error, Debug)] +pub enum SipAudioError { + /// The call doesn't have a conference port yet — media negotiation is + /// still in progress, or the call has just ended. Caller can retry or + /// drop the audio. + #[error("no conference port for call {call_id} (media not ready yet)")] + NoConfPort { call_id: super::ffi::types::CallId }, + + /// A port name (used to identify the player in pjsua's mixer) contains + /// an interior NUL. + #[error("invalid port name: {0}")] + InvalidPortName(#[from] NulError), + + /// `pjsua_conf_add_port`, `pjsua_conf_connect`, etc. returned non-success. + #[error("pjsua conf {operation} failed (status {status})")] + Pjsua { + operation: &'static str, + status: i32, + }, + + /// Frame size / port count mismatch between the audio source and the + /// pjsua port. + #[error("frame mismatch: {0}")] + FrameMismatch(String), + + /// Failure setting up a streaming player (file read, decoder, etc.). + #[error(transparent)] + Streaming(#[from] crate::services::sound::StreamingError), +} + +/// Errors that can occur while building or sending a SIP response. +/// +/// Surfaces failures from the pjsua/pjsip FFI surface — CString conversion, +/// pool allocation, header creation, and the final stateless / transactional +/// send. Variants stay coarse-grained because the typical caller is a pjsip +/// callback that can only log and continue. +#[derive(thiserror::Error, Debug)] +pub enum SipResponseError { + /// A runtime string contained an interior NUL byte and could not be + /// converted to `CString`. + #[error("CString conversion failed (interior NUL)")] + CStringNul(#[from] NulError), + + /// `pjsua_pool_create` returned null — out of memory or pjsua not + /// initialised. + #[error("pjsua pool allocation failed")] + PoolAlloc, + + /// `pjsip_generic_string_hdr_create` returned null. + #[error("pjsip header creation failed")] + HeaderCreate, + + /// `pjsua_get_pjsip_endpt` returned null — pjsua not initialised. + #[error("pjsip endpoint is null (pjsua not initialised)")] + EndpointNull, + + /// `pjsip_endpt_respond_stateless` returned a non-success pj status code. + #[error("pjsip stateless send failed (status {0})")] + StatelessSend(i32), + + /// `pjsip_tsx_create_uas2` returned a non-success pj status code. + #[error("pjsip UAS transaction creation failed (status {0})")] + TsxCreate(i32), + + /// `pjsip_endpt_create_response` returned a non-success pj status code. + #[error("pjsip response build failed (status {0})")] + ResponseBuild(i32), + + /// `pjsua_call_answer` returned a non-success pj status code. + #[error("pjsua_call_answer failed (status {0})")] + CallAnswer(i32), +} diff --git a/sipcord-bridge/src/transport/sip/ffi/direct_player.rs b/sipcord-bridge/src/transport/sip/ffi/direct_player.rs index 20113a1..fc6ec0d 100644 --- a/sipcord-bridge/src/transport/sip/ffi/direct_player.rs +++ b/sipcord-bridge/src/transport/sip/ffi/direct_player.rs @@ -3,8 +3,8 @@ //! This module provides one-shot audio playback (e.g., join sounds) that //! bypasses the channel buffer and plays directly to a specific call. +use crate::transport::sip::error::SipAudioError; use super::types::*; -use anyhow::Result; use parking_lot::Mutex; use pjsua::*; use std::collections::HashMap; @@ -86,7 +86,7 @@ pub unsafe extern "C" fn direct_player_on_destroy(this_port: *mut pjmedia_port) /// /// This queues the operation to be executed by the audio thread to avoid /// deadlocks with the audio thread's pjsua_conf_connect/disconnect calls. -pub fn play_audio_to_call_direct(call_id: CallId, samples: &[i16]) -> Result<()> { +pub fn play_audio_to_call_direct(call_id: CallId, samples: &[i16]) -> Result<(), SipAudioError> { use super::types::{PendingPjsuaOp, queue_pjsua_op}; tracing::debug!( @@ -103,14 +103,17 @@ pub fn play_audio_to_call_direct(call_id: CallId, samples: &[i16]) -> Result<()> /// Internal implementation of play_audio_to_call_direct /// Called from the audio thread to actually create and connect the player -pub fn play_audio_to_call_direct_internal(call_id: CallId, samples: &[i16]) -> Result<()> { +pub fn play_audio_to_call_direct_internal( + call_id: CallId, + samples: &[i16], +) -> Result<(), SipAudioError> { use super::frame_utils::{PortCallbacks, create_and_connect_port}; // Get call's conference port let call_conf_port = CALL_CONF_PORTS .get() .and_then(|p| p.get(&call_id).map(|r| *r)) - .ok_or_else(|| anyhow::anyhow!("No conf_port for call {}", call_id))?; + .ok_or(SipAudioError::NoConfPort { call_id })?; // Store samples in the player state BEFORE creating port (get_frame needs them) // We'll clean up if port creation fails diff --git a/sipcord-bridge/src/transport/sip/ffi/frame_utils.rs b/sipcord-bridge/src/transport/sip/ffi/frame_utils.rs index 59c89b3..ce06f51 100644 --- a/sipcord-bridge/src/transport/sip/ffi/frame_utils.rs +++ b/sipcord-bridge/src/transport/sip/ffi/frame_utils.rs @@ -3,11 +3,11 @@ //! Provides common helpers for filling audio frames and a shared no-op //! put_frame callback used by ports that only produce audio. +use crate::transport::sip::error::SipAudioError; use super::types::{ CONF_CHANNELS, CONF_MASTER_PORT, CONF_SAMPLE_RATE, CallId, ConfPort, SAMPLES_PER_FRAME, SendablePool, }; -use anyhow::Result; use parking_lot::Mutex; use pjsua::*; use std::sync::OnceLock; @@ -120,7 +120,7 @@ pub unsafe fn create_and_connect_port( signature: u32, callbacks: PortCallbacks, call_conf_port: ConfPort, -) -> Result { +) -> Result { // Get or create the memory pool let pool = pool.get_or_init(|| { let p = unsafe { pjsua_pool_create(pool_name.as_ptr() as *const _, 4096, 4096) }; @@ -132,18 +132,16 @@ pub unsafe fn create_and_connect_port( let port_size = std::mem::size_of::(); let port = unsafe { pj_pool_alloc(pool_ptr, port_size) as *mut pjmedia_port }; if port.is_null() { - anyhow::bail!( - "Failed to allocate {} port for call {}", - name_prefix, - call_id - ); + return Err(SipAudioError::FrameMismatch(format!( + "failed to allocate {} port for call {}", + name_prefix, call_id + ))); } unsafe { std::ptr::write_bytes(port as *mut u8, 0, port_size) }; // Create port name let port_name = format!("{}{}", name_prefix, call_id); - let port_name_cstr = std::ffi::CString::new(port_name) - .map_err(|e| anyhow::anyhow!("Invalid port name: {}", e))?; + let port_name_cstr = std::ffi::CString::new(port_name)?; // Initialize port info unsafe { @@ -167,21 +165,30 @@ pub unsafe fn create_and_connect_port( let mut player_slot: i32 = 0; let status = unsafe { pjsua_conf_add_port(pool_ptr, port, &mut player_slot) }; if status != pj_constants__PJ_SUCCESS as i32 { - anyhow::bail!("Failed to add {} port to conf: {}", name_prefix, status); + return Err(SipAudioError::Pjsua { + operation: "pjsua_conf_add_port", + status, + }); } // Connect player port to the target call's port let conf = unsafe { get_conference_bridge() }; let Some(conf) = conf else { unsafe { pjsua_conf_remove_port(player_slot) }; - anyhow::bail!("Failed to get conference bridge for {} port", name_prefix); + return Err(SipAudioError::FrameMismatch(format!( + "no conference bridge available for {} port", + name_prefix + ))); }; let status = unsafe { pjmedia_conf_connect_port(conf, player_slot as u32, *call_conf_port as u32, 0) }; if status != pj_constants__PJ_SUCCESS as i32 { unsafe { pjsua_conf_remove_port(player_slot) }; - anyhow::bail!("Failed to connect {} port to call: {}", name_prefix, status); + return Err(SipAudioError::Pjsua { + operation: "pjmedia_conf_connect_port", + status, + }); } Ok(ConfPortGuard { diff --git a/sipcord-bridge/src/transport/sip/ffi/init.rs b/sipcord-bridge/src/transport/sip/ffi/init.rs index f5c2ef2..f3169e5 100644 --- a/sipcord-bridge/src/transport/sip/ffi/init.rs +++ b/sipcord-bridge/src/transport/sip/ffi/init.rs @@ -5,7 +5,7 @@ //! - TLS transport creation and hot-reload //! - Shutdown and thread registration -use super::super::audio_thread::stop_audio_thread; +use crate::transport::sip::audio_thread::stop_audio_thread; use std::fmt; /// SIP invite session state (Rust wrapper for pjsip_inv_state) @@ -50,22 +50,23 @@ impl fmt::Display for InvState { } } } -use super::super::callbacks::{ +use crate::transport::sip::callbacks::{ on_call_media_state_cb, on_call_rx_reinvite_cb, on_call_state_cb, on_dtmf_digit_cb, on_incoming_call_cb, }; -use super::super::nat::{ +use crate::transport::sip::nat::{ on_rx_request_nat_fixup_cb, on_rx_response_nat_fixup_cb, on_tx_request_cb, on_tx_response_cb, }; -use super::super::register_handler::on_rx_request_cb; +use crate::transport::sip::error::SipInitError; +use crate::transport::sip::register_handler::on_rx_request_cb; +use super::pj_str; use super::types::*; use crate::config::{SipConfig, TlsConfig}; -use anyhow::{Context, Result}; use ipnet::Ipv4Net; use parking_lot::Mutex; use pjsua::*; use std::collections::BTreeMap; -use std::ffi::CString; +use std::ffi::{CStr, CString}; use std::mem::MaybeUninit; use std::net::IpAddr; use std::os::raw::{c_char, c_int}; @@ -170,7 +171,7 @@ fn extract_packet_source(msg: &str) -> Option<&str> { let space = rest.find(' ')?; let after_transport = &rest[space + 1..]; let end = after_transport - .find(|c: char| c == ' ' || c == '\t') + .find([' ', '\t']) .unwrap_or(after_transport.len()); Some(&after_transport[..end]) } @@ -312,7 +313,10 @@ pub fn set_callbacks(handlers: CallbackHandlers) { } /// Initialize pjsua with optional TLS support -pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result<()> { +pub fn init_pjsua( + config: &SipConfig, + tls_config: Option<&TlsConfig>, +) -> Result<(), SipInitError> { // Initialize public host config for Contact header rewriting on outgoing responses. // pjsua derives Contact from the TCP connection's local address (private IP), but // external clients need the public hostname to route BYE back to us. @@ -352,7 +356,10 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result< // Create pjsua instance let status = pjsua_create(); if status != pj_constants__PJ_SUCCESS as i32 { - anyhow::bail!("Failed to create pjsua: {}", status); + return Err(SipInitError::Pjsua { + operation: "pjsua_create", + status, + }); } // Disable automatic UDP->TCP switch for large SIP messages. @@ -421,7 +428,10 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result< // Initialize pjsua let status = pjsua_init(cfg_ptr, log_cfg_ptr, media_cfg_ptr); if status != pj_constants__PJ_SUCCESS as i32 { - anyhow::bail!("Failed to init pjsua: {}", status); + return Err(SipInitError::Pjsua { + operation: "pjsua_init", + status, + }); } // Create UDP transport @@ -432,7 +442,12 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result< // Set public address if specified - keep CString alive until transport is created let public_host_cstring = if !config.public_host.is_empty() { - let host = CString::new(config.public_host.as_str()).context("Invalid public host")?; + let host = CString::new(config.public_host.as_str()).map_err(|source| { + SipInitError::InvalidString { + field: "public_host", + source, + } + })?; t_cfg_ptr.public_addr = pj_str(host.as_ptr() as *mut c_char); Some(host) } else { @@ -450,7 +465,10 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result< drop(public_host_cstring); if status != pj_constants__PJ_SUCCESS as i32 { - anyhow::bail!("Failed to create UDP transport: {}", status); + return Err(SipInitError::TransportCreate { + kind: "UDP", + status, + }); } // Create TCP transport on the same port @@ -461,8 +479,12 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result< // Set public address for TCP - keep CString alive let tcp_public_host_cstring = if !config.public_host.is_empty() { - let host = - CString::new(config.public_host.as_str()).context("Invalid public host for TCP")?; + let host = CString::new(config.public_host.as_str()).map_err(|source| { + SipInitError::InvalidString { + field: "public_host (TCP)", + source, + } + })?; tcp_cfg_ptr.public_addr = pj_str(host.as_ptr() as *mut c_char); Some(host) } else { @@ -479,7 +501,10 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result< drop(tcp_public_host_cstring); if status != pj_constants__PJ_SUCCESS as i32 { - anyhow::bail!("Failed to create TCP transport: {}", status); + return Err(SipInitError::TransportCreate { + kind: "TCP", + status, + }); } tracing::info!("TCP transport created on port {}", config.port); @@ -494,7 +519,10 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result< // Start pjsua let status = pjsua_start(); if status != pj_constants__PJ_SUCCESS as i32 { - anyhow::bail!("Failed to start pjsua: {}", status); + return Err(SipInitError::Pjsua { + operation: "pjsua_start", + status, + }); } // Configure codec priorities to keep INVITE SDP small. @@ -507,27 +535,27 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result< // ordered by quality (highest priority = preferred in SDP negotiation). { // Disable all audio codecs first - let all = CString::new("*").unwrap(); - pjsua_codec_set_priority(&pj_str(all.as_ptr() as *mut c_char), 0); + pjsua_codec_set_priority(&pj_str::pj_str_from_cstr(c"*"), 0); // Re-enable desired codecs (highest priority = preferred in negotiation). // NOTE: G722 is registered internally at 16000Hz in PJSIP despite the // RFC 3551 SDP convention of advertising clock_rate=8000. - let codecs: &[(&str, u8)] = &[ - ("opus/48000", 255), // Best quality: adaptive, wideband/fullband - ("G722/16000", 254), // Wideband 16kHz, widely supported - ("AMR/8000", 252), // Adaptive narrowband - ("PCMU/8000", 200), // G.711 mu-law, ubiquitous fallback - ("PCMA/8000", 199), // G.711 A-law, ubiquitous fallback - ("telephone-event", 200), // DTMF support (all sample rates) + // Codec IDs are static — use &CStr literals so neither CString allocation + // nor an NUL check is needed. + let codecs: &[(&CStr, u8)] = &[ + (c"opus/48000", 255), // Best quality: adaptive, wideband/fullband + (c"G722/16000", 254), // Wideband 16kHz, widely supported + (c"AMR/8000", 252), // Adaptive narrowband + (c"PCMU/8000", 200), // G.711 mu-law, ubiquitous fallback + (c"PCMA/8000", 199), // G.711 A-law, ubiquitous fallback + (c"telephone-event", 200), // DTMF support (all sample rates) ]; for (name, priority) in codecs { - let codec_id = CString::new(*name).unwrap(); let status = - pjsua_codec_set_priority(&pj_str(codec_id.as_ptr() as *mut c_char), *priority); + pjsua_codec_set_priority(&pj_str::pj_str_from_cstr(name), *priority); if status != pj_constants__PJ_SUCCESS as i32 { - tracing::warn!("Failed to set codec priority for {}: {}", name, status); + tracing::warn!("Failed to set codec priority for {:?}: {}", name, status); } } @@ -535,7 +563,7 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result< "Codec priorities configured: {}", codecs .iter() - .map(|(n, p)| format!("{}={}", n, p)) + .map(|(n, p)| format!("{}={}", n.to_string_lossy(), p)) .collect::>() .join(", ") ); @@ -579,7 +607,7 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result< tracing::info!("Registered REGISTER handler module"); // Store the module pointer so register_handler can create // UAS transactions for deferred REGISTER responses. - super::super::register_handler::set_register_module_ptr(&raw mut REGISTER_MODULE); + crate::transport::sip::register_handler::set_register_module_ptr(&raw mut REGISTER_MODULE); } } else { tracing::warn!("Could not get PJSIP endpoint for module registration"); @@ -633,7 +661,10 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result< // This allows us to manually control audio I/O let master_port = pjsua_set_no_snd_dev(); if master_port.is_null() { - anyhow::bail!("Failed to set null sound device"); + return Err(SipInitError::Pjsua { + operation: "pjsua_set_no_snd_dev", + status: -1, // pjsua returned null pointer rather than a status code + }); } // Verify the master port's actual sample rate @@ -682,8 +713,12 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result< let acc_cfg_ptr = acc_cfg.assume_init_mut(); // Local account ID - keep CString alive until account is added - let local_uri = CString::new(format!("sip:sipcord@{}", config.public_host)) - .context("Invalid local URI")?; + let local_uri = CString::new(format!("sip:sipcord@{}", config.public_host)).map_err( + |source| SipInitError::InvalidString { + field: "local account URI", + source, + }, + )?; acc_cfg_ptr.id = pj_str(local_uri.as_ptr() as *mut c_char); // Enable incoming calls without registration @@ -710,7 +745,12 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result< // Set public IP for RTP if configured - this is advertised in SDP c= line // Without this, pjsua uses the local interface IP which won't work for NAT let rtp_public_ip_cstring = if let Some(ref public_ip) = config.rtp_public_ip { - let ip_cstr = CString::new(public_ip.as_str()).context("Invalid RTP public IP")?; + let ip_cstr = CString::new(public_ip.as_str()).map_err(|source| { + SipInitError::InvalidString { + field: "rtp_public_ip", + source, + } + })?; acc_cfg_ptr.rtp_cfg.public_addr = pj_str(ip_cstr.as_ptr() as *mut c_char); tracing::info!( "Account RTP config: port={}, port_range={} (ports {}-{}), public_addr={}", @@ -743,7 +783,10 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result< drop(rtp_public_ip_cstring); if status != pj_constants__PJ_SUCCESS as i32 { - anyhow::bail!("Failed to add account: {}", status); + return Err(SipInitError::Pjsua { + operation: "pjsua_acc_add", + status, + }); } Ok(()) @@ -752,7 +795,10 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result< /// Create TLS transport for SIP-over-TLS /// Returns Ok(true) if created, Ok(false) if skipped due to missing certs -fn create_tls_transport(tls_config: &TlsConfig, public_host: &str) -> Result { +fn create_tls_transport( + tls_config: &TlsConfig, + public_host: &str, +) -> Result { // Check cert files exist before doing anything let cert_path = tls_config.cert_path(); let key_path = tls_config.key_path(); @@ -784,13 +830,29 @@ fn create_tls_transport(tls_config: &TlsConfig, public_host: &str) -> Result Result Result Result { +pub fn reload_tls_transport( + tls_config: &TlsConfig, + public_host: &str, +) -> Result { // Check active calls - don't reload if calls are active let active_calls = COUNTED_CALL_IDS .get() @@ -899,7 +967,7 @@ pub fn active_media_call_count() -> usize { } /// Process pjsua events (call from event loop) -pub fn process_pjsua_events(timeout_ms: u32) -> Result<()> { +pub fn process_pjsua_events(timeout_ms: u32) -> Result<(), SipInitError> { unsafe { pj_thread_sleep(timeout_ms); } @@ -968,10 +1036,7 @@ pub fn send_183_session_progress(call_id: CallId) { ); } - // Create reason string - let reason = CString::new("Session Progress").unwrap(); - let reason_pj = pj_str(reason.as_ptr() as *mut c_char); - + let reason_pj = pj_str::pj_str_from_cstr(c"Session Progress"); let status = pjsua_call_answer(*call_id, 183, &reason_pj, ptr::null()); if status != pj_constants__PJ_SUCCESS as i32 { tracing::warn!("Failed to send 183 for call {}: status={}", call_id, status); diff --git a/sipcord-bridge/src/transport/sip/ffi/looping_player.rs b/sipcord-bridge/src/transport/sip/ffi/looping_player.rs index a8ca7b6..c45f5fa 100644 --- a/sipcord-bridge/src/transport/sip/ffi/looping_player.rs +++ b/sipcord-bridge/src/transport/sip/ffi/looping_player.rs @@ -4,7 +4,7 @@ //! Used for the "connecting" sound during call setup (183 Session Progress). use super::types::*; -use anyhow::Result; +use crate::transport::sip::error::SipAudioError; use parking_lot::Mutex; use pjsua::*; use std::collections::HashMap; @@ -118,7 +118,7 @@ pub unsafe extern "C" fn looping_player_on_destroy(this_port: *mut pjmedia_port) /// /// Creates a pjmedia_port that loops the given samples and connects it to the call. /// The loop continues until stop_loop is called. -pub fn start_loop(call_id: CallId, samples: Vec) -> Result<()> { +pub fn start_loop(call_id: CallId, samples: Vec) -> Result<(), SipAudioError> { use super::frame_utils::{PortCallbacks, create_and_connect_port}; // Check if already looping for this call @@ -134,9 +134,7 @@ pub fn start_loop(call_id: CallId, samples: Vec) -> Result<()> { let call_conf_port = CALL_CONF_PORTS .get() .and_then(|p| p.get(&call_id).map(|r| *r)) - .ok_or_else(|| { - anyhow::anyhow!("No conf_port for call {} - media not ready yet", call_id) - })?; + .ok_or(SipAudioError::NoConfPort { call_id })?; let guard = unsafe { let callbacks = PortCallbacks { diff --git a/sipcord-bridge/src/transport/sip/ffi/mod.rs b/sipcord-bridge/src/transport/sip/ffi/mod.rs index 27473bc..51cd2ff 100644 --- a/sipcord-bridge/src/transport/sip/ffi/mod.rs +++ b/sipcord-bridge/src/transport/sip/ffi/mod.rs @@ -19,6 +19,7 @@ pub(super) mod direct_player; pub(crate) mod frame_utils; pub(super) mod init; pub(super) mod looping_player; +pub(super) mod pj_str; pub(super) mod streaming_player; pub(super) mod test_tone; pub mod types; diff --git a/sipcord-bridge/src/transport/sip/ffi/pj_str.rs b/sipcord-bridge/src/transport/sip/ffi/pj_str.rs new file mode 100644 index 0000000..1f00ecf --- /dev/null +++ b/sipcord-bridge/src/transport/sip/ffi/pj_str.rs @@ -0,0 +1,211 @@ +//! Safe(r) helpers around the pjsua/pjsip C-string and header-building idioms +//! that recur across the SIP transport layer. +//! +//! Before this module existed, every callback that built a SIP header +//! re-implemented the same pattern: +//! +//! ```ignore +//! let name = CString::new("Contact").unwrap(); +//! let value = CString::new(runtime_str).unwrap(); +//! let name_pj = pj_str(name.as_ptr() as *mut c_char); +//! let value_pj = pj_str(value.as_ptr() as *mut c_char); +//! let hdr = pjsip_generic_string_hdr_create(pool, &name_pj, &value_pj); +//! // ... +//! ``` +//! +//! That sprouted two unwraps per header (so any header value containing a NUL +//! byte from upstream data would panic), repeated lifetime traps, and zero +//! shared failure handling. The helpers in this module turn those calls into +//! a single fallible call returning [`SipResponseError`]. + +use crate::transport::sip::error::SipResponseError; +use pjsua::*; +use std::ffi::{CStr, CString}; +use std::os::raw::c_char; +use std::ptr; + +/// Convert a [`CStr`] (typically a `c"..."` literal) into a [`pj_str_t`]. +/// +/// Zero-cost — `pj_str` just wraps the pointer and length. The caller must +/// keep the `CStr` alive for the `pj_str_t`'s usage window. For `&'static` +/// literals (the common case) that's trivially satisfied. +#[inline] +pub unsafe fn pj_str_from_cstr(s: &CStr) -> pj_str_t { + unsafe { pj_str(s.as_ptr() as *mut c_char) } +} + +// A `pj_str_owned(&str) -> Result<(CString, pj_str_t), _>` helper was considered +// but turned out unused: every runtime-string call site in this codebase ends +// up either inside `make_string_hdr` (which does the conversion internally) or +// in a function that already chains `CString::new(...).context("...")?` for a +// site-specific error message. Add it back if a true caller appears. + +/// Initialise a `pjsip_hdr` as an empty list head (equivalent to the +/// `pj_list_init` C macro). +#[inline] +pub unsafe fn pj_list_init_hdr(hdr: *mut pjsip_hdr) { + unsafe { + (*hdr).next = hdr as *mut _; + (*hdr).prev = hdr as *mut _; + } +} + +/// Create a generic string header in `pool`. +/// +/// `name` is a static `CStr` (use `c"Contact"` etc); `value` is a runtime +/// string that gets converted to a `CString` and duplicated into the pool +/// by pjsip. The temporary `CString` is dropped before this returns; +/// `pjsip_generic_string_hdr_create` uses `pj_strdup` internally to copy +/// the bytes. +pub unsafe fn make_string_hdr( + pool: *mut pj_pool_t, + name: &CStr, + value: &str, +) -> Result<*mut pjsip_generic_string_hdr, SipResponseError> { + unsafe { + let value_c = CString::new(value)?; + let name_pj = pj_str_from_cstr(name); + let value_pj = pj_str(value_c.as_ptr() as *mut c_char); + let hdr = pjsip_generic_string_hdr_create(pool, &name_pj, &value_pj); + if hdr.is_null() { + return Err(SipResponseError::HeaderCreate); + } + Ok(hdr) + } +} + +/// Append a generic string header onto the message buffer in `tdata`, +/// allocating from the tdata's own pool. +pub unsafe fn append_tdata_hdr( + tdata: *mut pjsip_tx_data, + name: &CStr, + value: &str, +) -> Result<(), SipResponseError> { + unsafe { + let hdr = make_string_hdr((*tdata).pool, name, value)?; + pj_list_insert_before( + &mut (*(*tdata).msg).hdr as *mut pjsip_hdr as *mut pj_list_type, + hdr as *mut pj_list_type, + ); + Ok(()) + } +} + +/// Answer a pjsua call with N custom headers attached to the response. +/// +/// Wraps the recurring `pjsua_msg_data_init` + pool + header build + +/// `pjsua_call_answer` dance used in 401 / 302 / 4xx code paths. +/// +/// **The pool is intentionally NOT released.** pjsua may continue to reference +/// the header data after `pjsua_call_answer` returns; releasing the pool here +/// triggers use-after-free. Each call leaks ~512 bytes that's reclaimed when +/// pjsua shuts down. (Tracking pools per-call and releasing them on call-end +/// would be a cleaner fix; not in scope here.) +/// +/// On error, the caller typically follows up with `pjsua_call_hangup` — this +/// helper does not hang up on its own so the caller can choose the strategy. +pub unsafe fn answer_call_with_headers( + call_id: i32, + status_code: u32, + reason: &CStr, + pool_name: &CStr, + headers: &[(&CStr, &str)], +) -> Result<(), SipResponseError> { + unsafe { + let mut msg_data = std::mem::MaybeUninit::::uninit(); + pjsua_msg_data_init(msg_data.as_mut_ptr()); + let msg_data_ptr = msg_data.assume_init_mut(); + + let pool = pjsua_pool_create(pool_name.as_ptr(), 512, 512); + if pool.is_null() { + return Err(SipResponseError::PoolAlloc); + } + // Intentionally leaked — see doc comment above. + + for (name, value) in headers { + let hdr = make_string_hdr(pool, name, value)?; + pj_list_insert_before( + &mut msg_data_ptr.hdr_list as *mut _ as *mut pj_list_type, + hdr as *mut pj_list_type, + ); + } + + let reason_pj = pj_str_from_cstr(reason); + let status = pjsua_call_answer(call_id, status_code, &reason_pj, msg_data_ptr); + if status != pj_constants__PJ_SUCCESS as i32 { + return Err(SipResponseError::CallAnswer(status)); + } + Ok(()) + } +} + +/// Send a stateless SIP response with N string headers. +/// +/// Wraps the recurring `pjsua_pool_create` → list-head alloc → header +/// build → `pjsip_endpt_respond_stateless` → `pj_pool_release` dance. Each +/// header in `headers` is a `(name, value)` pair where `name` is typically +/// a `c"..."` literal and `value` is any runtime string. +/// +/// `reason` is the SIP reason phrase (e.g. `Some(c"Unauthorized")`) or +/// `None` to let pjsip pick the default for `status_code`. +pub unsafe fn respond_stateless_with_headers( + rdata: *mut pjsip_rx_data, + status_code: u16, + reason: Option<&CStr>, + headers: &[(&CStr, &str)], +) -> Result<(), SipResponseError> { + unsafe { + let endpt = pjsua_get_pjsip_endpt(); + if endpt.is_null() { + return Err(SipResponseError::EndpointNull); + } + + let pool = pjsua_pool_create(c"sip_resp".as_ptr(), 1024, 1024); + if pool.is_null() { + return Err(SipResponseError::PoolAlloc); + } + + // Belt-and-braces: ensure the pool is released even if a step + // between here and the send returns Err via `?`. + let result = + (|| -> Result { + let hdr_list = + pj_pool_alloc(pool, std::mem::size_of::()) as *mut pjsip_hdr; + if hdr_list.is_null() { + return Err(SipResponseError::PoolAlloc); + } + pj_list_init_hdr(hdr_list); + + for (name, value) in headers { + let hdr = make_string_hdr(pool, name, value)?; + pj_list_insert_before( + hdr_list as *mut pj_list_type, + hdr as *mut pj_list_type, + ); + } + + let reason_pj = reason.map(|r| pj_str_from_cstr(r)); + let reason_ptr = reason_pj + .as_ref() + .map(|r| r as *const pj_str_t) + .unwrap_or(ptr::null()); + + Ok(pjsip_endpt_respond_stateless( + endpt, + rdata, + status_code.into(), + reason_ptr, + hdr_list, + ptr::null(), + )) + })(); + + pj_pool_release(pool); + + match result { + Ok(status) if status == pj_constants__PJ_SUCCESS as i32 => Ok(()), + Ok(status) => Err(SipResponseError::StatelessSend(status)), + Err(e) => Err(e), + } + } +} diff --git a/sipcord-bridge/src/transport/sip/ffi/streaming_player.rs b/sipcord-bridge/src/transport/sip/ffi/streaming_player.rs index 9213796..8b3da67 100644 --- a/sipcord-bridge/src/transport/sip/ffi/streaming_player.rs +++ b/sipcord-bridge/src/transport/sip/ffi/streaming_player.rs @@ -19,7 +19,7 @@ use super::types::*; use crate::services::sound::StreamingPlayer; -use anyhow::Result; +use crate::transport::sip::error::SipAudioError; use parking_lot::Mutex; use pjsua::*; use std::collections::HashMap; @@ -152,19 +152,16 @@ pub fn start_streaming_to_call( call_id: CallId, path: &Path, hangup_on_complete: bool, -) -> Result<()> { +) -> Result<(), SipAudioError> { use super::frame_utils::{PortCallbacks, create_and_connect_port}; - // Create the streaming player let player = StreamingPlayer::new(path)?; // Get call's conference port let call_conf_port = CALL_CONF_PORTS .get() .and_then(|p| p.get(&call_id).map(|r| *r)) - .ok_or_else(|| { - anyhow::anyhow!("No conf_port for call {} - media not ready yet", call_id) - })?; + .ok_or(SipAudioError::NoConfPort { call_id })?; let guard = unsafe { let callbacks = PortCallbacks { diff --git a/sipcord-bridge/src/transport/sip/ffi/test_tone.rs b/sipcord-bridge/src/transport/sip/ffi/test_tone.rs index 13e3c6c..89aa794 100644 --- a/sipcord-bridge/src/transport/sip/ffi/test_tone.rs +++ b/sipcord-bridge/src/transport/sip/ffi/test_tone.rs @@ -5,7 +5,7 @@ use super::streaming_player::STREAMING_PLAYER_POOL; use super::types::*; -use anyhow::Result; +use crate::transport::sip::error::SipAudioError; use parking_lot::Mutex; use pjsua::*; use std::collections::HashMap; @@ -129,16 +129,14 @@ pub unsafe extern "C" fn test_tone_on_destroy(this_port: *mut pjmedia_port) -> p /// Start playing a 440Hz test tone to a call /// /// The tone plays indefinitely until the caller hangs up. No automatic hangup. -pub fn start_test_tone_to_call(call_id: CallId) -> Result<()> { +pub fn start_test_tone_to_call(call_id: CallId) -> Result<(), SipAudioError> { use super::frame_utils::{PortCallbacks, create_and_connect_port}; // Get call's conference port let call_conf_port = CALL_CONF_PORTS .get() .and_then(|p| p.get(&call_id).map(|r| *r)) - .ok_or_else(|| { - anyhow::anyhow!("No conf_port for call {} - media not ready yet", call_id) - })?; + .ok_or(SipAudioError::NoConfPort { call_id })?; let guard = unsafe { let callbacks = PortCallbacks { diff --git a/sipcord-bridge/src/transport/sip/ffi/types.rs b/sipcord-bridge/src/transport/sip/ffi/types.rs index 2458845..7ef20e9 100644 --- a/sipcord-bridge/src/transport/sip/ffi/types.rs +++ b/sipcord-bridge/src/transport/sip/ffi/types.rs @@ -327,7 +327,7 @@ pub static TLS_RELOAD_PENDING: AtomicBool = AtomicBool::new(false); pub static CALL_RTP_ACTIVITY: OnceLock>> = OnceLock::new(); /// Event sender for timeout events (set during callback setup) -pub static TIMEOUT_EVENT_TX: OnceLock>>> = +pub static TIMEOUT_EVENT_TX: OnceLock>>> = OnceLock::new(); // Per-channel audio isolation statics diff --git a/sipcord-bridge/src/transport/sip/mod.rs b/sipcord-bridge/src/transport/sip/mod.rs index 33b7afb..badf40f 100644 --- a/sipcord-bridge/src/transport/sip/mod.rs +++ b/sipcord-bridge/src/transport/sip/mod.rs @@ -3,6 +3,7 @@ pub mod ffi; mod audio_thread; mod callbacks; mod channel_audio; +pub mod error; pub mod fork_group; mod nat; mod register_handler; @@ -24,7 +25,7 @@ pub use register_handler::{PendingRegisterTsx, set_register_event_sender, set_si use crate::config::{SipConfig, TlsConfig}; use crate::transport::discord::send_audio_to_discord_direct; -use anyhow::Result; +use crate::transport::sip::error::{SipCallError, SipError, SipInitError}; use crossbeam_channel::{Receiver, Sender, bounded}; use dashmap::DashMap; use parking_lot::RwLock; @@ -178,7 +179,7 @@ impl SipTransport { } /// Start the SIP transport - pub async fn run(&self) -> Result<()> { + pub async fn run(&self) -> Result<(), SipError> { info!( "Starting SIP server on {}:{}", self.config.public_host, self.config.port @@ -206,7 +207,11 @@ impl SipTransport { } }); - pjsua_handle.await?; + // JoinError -> log only; pjsua loop errors are already logged inside the + // spawned task. + if let Err(e) = pjsua_handle.await { + tracing::error!("pjsua event loop join error: {}", e); + } Ok(()) } } @@ -222,7 +227,7 @@ fn run_pjsua_loop( event_tx: Sender, initialized: Arc>, command_rx: Receiver, -) -> Result<()> { +) -> Result<(), SipInitError> { // Initialize pjsua with optional TLS init_pjsua(&config, tls_config.as_ref())?; *initialized.write() = true; @@ -449,15 +454,27 @@ fn process_sip_command(cmd: SipCommand, calls: &Arc>) } if auth_ok { - use register_handler::append_tdata_hdr; - append_tdata_hdr(tdata, c"Expires", &pending.expires.to_string()); + use self::ffi::pj_str::append_tdata_hdr; + if let Err(e) = + append_tdata_hdr(tdata, c"Expires", &pending.expires.to_string()) + { + tracing::warn!( + "deferred REGISTER 200 OK: failed to append Expires header: {}", + e + ); + } // RFC 3261 §10.3: echo the client's binding back as Contact. // Required for strict clients like 3CX to accept registration. - if let Some(ref uri) = pending.contact_uri { - append_tdata_hdr( + if let Some(ref uri) = pending.contact_uri + && let Err(e) = append_tdata_hdr( tdata, c"Contact", &format!("<{}>;expires={}", uri, pending.expires), + ) + { + tracing::warn!( + "deferred REGISTER 200 OK: failed to append Contact header ({}); strict clients may reject", + e ); } } else { @@ -506,9 +523,16 @@ pub fn remove_outbound_tracking(call_id: CallId) -> Option { /// /// If `caller_display_name` is provided, it sets the From header display name /// to show who initiated the call from Discord (e.g., "Discord: username"). -fn make_outbound_call(sip_uri: &str, caller_display_name: Option<&str>) -> Result { +fn make_outbound_call( + sip_uri: &str, + caller_display_name: Option<&str>, +) -> Result { unsafe { - let uri = std::ffi::CString::new(sip_uri).map_err(|e| e.to_string())?; + let uri = + std::ffi::CString::new(sip_uri).map_err(|source| SipCallError::InvalidString { + field: "sip_uri", + source, + })?; let mut call_id: ::pjsua::pjsua_call_id = -1; // Explicit call settings: audio only, no video, no T.140 text. @@ -551,7 +575,11 @@ fn make_outbound_call(sip_uri: &str, caller_display_name: Option<&str>) -> Resul .take(64) .collect(); let from_uri = format!("\"{}\" <{}>", sanitized, acc_uri); - from_uri_cstring = std::ffi::CString::new(from_uri).map_err(|e| e.to_string())?; + from_uri_cstring = + std::ffi::CString::new(from_uri).map_err(|source| SipCallError::InvalidString { + field: "caller_display_name", + source, + })?; msg_data_ptr.local_uri = ::pjsua::pj_str(from_uri_cstring.as_ptr() as *mut std::os::raw::c_char); } @@ -566,7 +594,7 @@ fn make_outbound_call(sip_uri: &str, caller_display_name: Option<&str>) -> Resul ); if status != ::pjsua::pj_constants__PJ_SUCCESS as i32 { - return Err(format!("pjsua_call_make_call failed: {}", status)); + return Err(SipCallError::MakeCall(status)); } Ok(CallId::new(call_id)) diff --git a/sipcord-bridge/src/transport/sip/register_handler.rs b/sipcord-bridge/src/transport/sip/register_handler.rs index 9cb6736..963dfac 100644 --- a/sipcord-bridge/src/transport/sip/register_handler.rs +++ b/sipcord-bridge/src/transport/sip/register_handler.rs @@ -7,12 +7,13 @@ use super::callbacks::{ extract_digest_auth_from_rdata, extract_source_ip, extract_user_agent, is_sipvicious_scanner, }; +use super::error::SipResponseError; +use super::ffi::pj_str::respond_stateless_with_headers; use super::ffi::types::*; use super::ffi::utils::pj_str_to_string; use pjsua::*; -use std::ffi::{CStr, CString}; +use std::ffi::CStr; use std::net::SocketAddr; -use std::os::raw::c_char; use std::ptr; use std::sync::atomic::{AtomicPtr, Ordering}; @@ -74,69 +75,19 @@ pub fn set_register_module_ptr(ptr: *mut pjsip_module) { // Helpers -/// Initialize a pjsip_hdr as a list head (equivalent to pj_list_init C macro). -#[inline] -unsafe fn pj_list_init_hdr(hdr: *mut pjsip_hdr) { +/// Send a stateless SIP response with a status code and reason phrase but no +/// extra headers. Logs (and otherwise swallows) any pjsip failure — these +/// responses are best-effort from inside an FFI callback. +unsafe fn send_simple_response(rdata: *mut pjsip_rx_data, status_code: u16, reason: &CStr) { unsafe { - (*hdr).next = hdr as *mut _; - (*hdr).prev = hdr as *mut _; - } -} - -/// Create a generic string header in `pool`. Returns null on failure (alloc or -/// interior-NUL in `value`). pjsip duplicates name/value into `pool`, so the -/// caller's CStrings can be dropped immediately after this returns. -#[inline] -unsafe fn make_string_hdr( - pool: *mut pj_pool_t, - name: &CStr, - value: &str, -) -> *mut pjsip_generic_string_hdr { - unsafe { - let Ok(value_c) = CString::new(value) else { - return ptr::null_mut(); - }; - let name_pj = pj_str(name.as_ptr() as *mut c_char); - let value_pj = pj_str(value_c.as_ptr() as *mut c_char); - pjsip_generic_string_hdr_create(pool, &name_pj, &value_pj) - } -} - -/// Append a generic string header onto the message buffer in `tdata`, -/// allocating from the tdata's own pool. Returns false on failure. -#[inline] -pub(super) unsafe fn append_tdata_hdr( - tdata: *mut pjsip_tx_data, - name: &CStr, - value: &str, -) -> bool { - unsafe { - let hdr = make_string_hdr((*tdata).pool, name, value); - if hdr.is_null() { - return false; - } - pj_list_insert_before( - &mut (*(*tdata).msg).hdr as *mut pjsip_hdr as *mut pj_list_type, - hdr as *mut pj_list_type, - ); - true - } -} - -/// Send a simple stateless SIP response (no custom headers). -unsafe fn send_simple_response(rdata: *mut pjsip_rx_data, status_code: u16, reason: &str) { - unsafe { - let endpt = pjsua_get_pjsip_endpt(); - if !endpt.is_null() { - let reason_cstr = CString::new(reason).unwrap(); - let reason_pj = pj_str(reason_cstr.as_ptr() as *mut c_char); - pjsip_endpt_respond_stateless( - endpt, - rdata, - status_code.into(), - &reason_pj, - ptr::null(), - ptr::null(), + if let Err(e) = + respond_stateless_with_headers(rdata, status_code, Some(reason), &[]) + { + tracing::warn!( + "Failed to respond {} {:?} to SIP request: {}", + status_code, + reason, + e ); } } @@ -148,67 +99,30 @@ unsafe fn send_simple_response(rdata: *mut pjsip_rx_data, status_code: u16, reas /// client's current bindings via Contact header(s). Strict clients like 3CX /// interpret a Contact-less response as "forced unregister" and tear down the /// trunk even though the binding was accepted server-side. -unsafe fn send_register_ok(rdata: *mut pjsip_rx_data, expires: u32, contact_uri: Option<&str>) { +unsafe fn send_register_ok( + rdata: *mut pjsip_rx_data, + expires: u32, + contact_uri: Option<&str>, +) -> Result<(), SipResponseError> { unsafe { - let endpt = pjsua_get_pjsip_endpt(); - if endpt.is_null() { - return; - } + let expires_str = expires.to_string(); + let contact_str = contact_uri.map(|uri| format!("<{}>;expires={}", uri, expires)); - let pool = pjsua_pool_create(c"register_ok".as_ptr(), 1024, 1024); - if !pool.is_null() { - let exp_hdr = make_string_hdr(pool, c"Expires", &expires.to_string()); - let contact_hdr = match contact_uri { - Some(uri) => make_string_hdr( - pool, - c"Contact", - &format!("<{}>;expires={}", uri, expires), - ), - None => ptr::null_mut(), - }; - - if !exp_hdr.is_null() { - let hdr_list = - pj_pool_alloc(pool, std::mem::size_of::()) as *mut pjsip_hdr; - if !hdr_list.is_null() { - pj_list_init_hdr(hdr_list); - pj_list_insert_before( - hdr_list as *mut pj_list_type, - exp_hdr as *mut pj_list_type, - ); - if !contact_hdr.is_null() { - pj_list_insert_before( - hdr_list as *mut pj_list_type, - contact_hdr as *mut pj_list_type, - ); - } - - let status = pjsip_endpt_respond_stateless( - endpt, - rdata, - 200, - ptr::null(), - hdr_list, - ptr::null(), - ); - if status != pj_constants__PJ_SUCCESS as i32 { - tracing::warn!("Failed to respond 200 OK to REGISTER: {}", status); - } - // Release pool — pjsip_endpt_respond_stateless clones what it - // needs into rdata's pool, so our header pool can be freed now. - pj_pool_release(pool); - return; - } - } - // Header creation failed — release the pool before falling through - pj_pool_release(pool); - } - - // Fallback: respond without extra headers - let status = - pjsip_endpt_respond_stateless(endpt, rdata, 200, ptr::null(), ptr::null(), ptr::null()); - if status != pj_constants__PJ_SUCCESS as i32 { - tracing::warn!("Failed to respond 200 OK to REGISTER: {}", status); + // Two-header common case + if let Some(ref contact) = contact_str { + respond_stateless_with_headers( + rdata, + 200, + None, + &[(c"Expires", expires_str.as_str()), (c"Contact", contact.as_str())], + ) + } else { + respond_stateless_with_headers( + rdata, + 200, + None, + &[(c"Expires", expires_str.as_str())], + ) } } } @@ -232,26 +146,27 @@ unsafe fn detect_transport(rdata: *mut pjsip_rx_data) -> crate::services::regist } /// Create a UAS transaction + pre-built response tdata for deferred REGISTER -/// responses. Returns `None` if transaction creation fails (caller should fall -/// back to stateless response). +/// responses. Caller falls back to a stateless 200 if this errors. unsafe fn create_register_tsx( rdata: *mut pjsip_rx_data, expires: u32, contact_uri: Option, -) -> Option { +) -> Result { unsafe { let endpt = pjsua_get_pjsip_endpt(); + if endpt.is_null() { + return Err(SipResponseError::EndpointNull); + } let module_ptr = REGISTER_MODULE_PTR.load(Ordering::Acquire); - - if endpt.is_null() || module_ptr.is_null() { - return None; + if module_ptr.is_null() { + return Err(SipResponseError::EndpointNull); } // Create UAS transaction let mut tsx: *mut pjsip_transaction = ptr::null_mut(); let status = pjsip_tsx_create_uas2(module_ptr, rdata, ptr::null_mut(), &mut tsx); if status != pj_constants__PJ_SUCCESS as i32 || tsx.is_null() { - return None; + return Err(SipResponseError::TsxCreate(status)); } // Feed the request to the transaction (starts Timer F, stores headers) @@ -263,10 +178,10 @@ unsafe fn create_register_tsx( let status = pjsip_endpt_create_response(endpt, rdata, 200, ptr::null(), &mut tdata); if status != pj_constants__PJ_SUCCESS as i32 || tdata.is_null() { pjsip_tsx_terminate(tsx, 500); - return None; + return Err(SipResponseError::ResponseBuild(status)); } - Some(PendingRegisterTsx { + Ok(PendingRegisterTsx { tsx: SendableTsx(tsx), tdata: SendableTdata(tdata), expires, @@ -326,7 +241,7 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_ let result = ban_mgr.check_banned(&ip); if result.is_banned { tracing::debug!("Rejecting REGISTER from banned IP {}", ip); - send_simple_response(rdata, 403, "Forbidden"); + send_simple_response(rdata, 403, c"Forbidden"); return pj_constants__PJ_TRUE as pj_bool_t; } } @@ -355,7 +270,7 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_ user_agent ); } - send_simple_response(rdata, 403, "Forbidden"); + send_simple_response(rdata, 403, c"Forbidden"); return pj_constants__PJ_TRUE as pj_bool_t; } @@ -367,7 +282,7 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_ && ban_mgr.record_register(ip) { tracing::debug!("Rejecting REGISTER from {} - rate limit exceeded", ip); - send_simple_response(rdata, 429, "Too Many Requests"); + send_simple_response(rdata, 429, c"Too Many Requests"); return pj_constants__PJ_TRUE as pj_bool_t; } @@ -387,7 +302,7 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_ ip_str, params.username ); - send_simple_response(rdata, 429, "Too Many Requests"); + send_simple_response(rdata, 429, c"Too Many Requests"); return pj_constants__PJ_TRUE as pj_bool_t; } @@ -408,7 +323,13 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_ params.username, ip_str ); - send_register_ok(rdata, expires, contact_uri.as_deref()); + if let Err(e) = send_register_ok(rdata, expires, contact_uri.as_deref()) { + tracing::warn!( + "REGISTER 200 OK (cached) send failed for {}: {} — strict clients may reject", + params.username, + e + ); + } // Send to async handler for registrar update if let Some(tx) = REGISTER_EVENT_TX.get() { let _ = tx.try_send(RegisterRequest { @@ -429,7 +350,7 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_ params.username, ip_str ); - send_simple_response(rdata, 403, "Forbidden"); + send_simple_response(rdata, 403, c"Forbidden"); // Send to async so API can re-verify (cache may be stale // after a password change) and update failure counts if let Some(tx) = REGISTER_EVENT_TX.get() { @@ -453,24 +374,29 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_ params.username, ip_str ); - if let Some(pending) = create_register_tsx(rdata, expires, contact_uri.clone()) { - if let Some(tx) = REGISTER_EVENT_TX.get() { - let _ = tx.try_send(RegisterRequest { - digest_auth: params, - contact_uri: contact_uri.unwrap_or_default(), - source_addr, - transport, - expires, - pending_tsx: Some(pending), - }); + match create_register_tsx(rdata, expires, contact_uri.clone()) { + Ok(pending) => { + if let Some(tx) = REGISTER_EVENT_TX.get() { + let _ = tx.try_send(RegisterRequest { + digest_auth: params, + contact_uri: contact_uri.unwrap_or_default(), + source_addr, + transport, + expires, + pending_tsx: Some(pending), + }); + } + return pj_constants__PJ_TRUE as pj_bool_t; + } + Err(e) => { + // Transaction creation failed — fall through to + // stateless 200 OK below. + tracing::warn!( + "Failed to create tsx for deferred REGISTER ({}), falling back to stateless 200", + e + ); } - return pj_constants__PJ_TRUE as pj_bool_t; } - // Transaction creation failed — fall through to stateless - // 200 OK below (same behaviour as before this change). - tracing::warn!( - "Failed to create tsx for deferred REGISTER, falling back to stateless 200" - ); } } } @@ -483,6 +409,7 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_ params.username ); let contact_uri_for_response = contact_uri.clone(); + let user_for_log = params.username.clone(); if let Some(tx) = REGISTER_EVENT_TX.get() { let _ = tx.try_send(RegisterRequest { digest_auth: params, @@ -493,7 +420,14 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_ pending_tsx: None, }); } - send_register_ok(rdata, expires, contact_uri_for_response.as_deref()); + if let Err(e) = send_register_ok(rdata, expires, contact_uri_for_response.as_deref()) + { + tracing::warn!( + "REGISTER 200 OK (stateless) send failed for {}: {} — strict clients may reject", + user_for_log, + e + ); + } } else { // No Authorization header - send 401 challenge tracing::debug!( @@ -501,63 +435,24 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_ ip_str ); - let endpt = pjsua_get_pjsip_endpt(); - if endpt.is_null() { - tracing::error!("Failed to get PJSIP endpoint for REGISTER 401 response"); - return pj_constants__PJ_TRUE as pj_bool_t; - } - // Generate a cryptographically random nonce - let nonce = { + let nonce: String = { let bytes: [u8; 16] = rand::random(); - bytes - .iter() - .map(|b| format!("{:02x}", b)) - .collect::() + bytes.iter().map(|b| format!("{:02x}", b)).collect() }; - let www_auth = format!( "Digest realm=\"{}\", nonce=\"{}\", algorithm=MD5, qop=\"auth\"", SIP_REALM, nonce ); - // Create WWW-Authenticate header - let hdr_name = CString::new("WWW-Authenticate").unwrap(); - let hdr_value = CString::new(www_auth).unwrap(); - - let pool = pjsua_pool_create(c"register_401".as_ptr(), 512, 512); - if pool.is_null() { - tracing::error!("Failed to create pool for REGISTER 401 response"); - return pj_constants__PJ_TRUE as pj_bool_t; + if let Err(e) = respond_stateless_with_headers( + rdata, + 401, + None, + &[(c"WWW-Authenticate", www_auth.as_str())], + ) { + tracing::warn!("Failed to send 401 challenge to REGISTER: {}", e); } - - let name = pj_str(hdr_name.as_ptr() as *mut c_char); - let value = pj_str(hdr_value.as_ptr() as *mut c_char); - let hdr = pjsip_generic_string_hdr_create(pool, &name, &value); - - if !hdr.is_null() { - let hdr_list = - pj_pool_alloc(pool, std::mem::size_of::()) as *mut pjsip_hdr; - if !hdr_list.is_null() { - pj_list_init_hdr(hdr_list); - pj_list_insert_before(hdr_list as *mut pj_list_type, hdr as *mut pj_list_type); - - let status = pjsip_endpt_respond_stateless( - endpt, - rdata, - 401, - ptr::null(), - hdr_list, - ptr::null(), - ); - - if status != pj_constants__PJ_SUCCESS as i32 { - tracing::warn!("Failed to respond 401 to REGISTER: {}", status); - } - } - } - // Release pool — pjsip_endpt_respond_stateless clones headers internally - pj_pool_release(pool); } // Return TRUE to indicate we handled this request