the war on unwrap()

This commit is contained in:
coral 2026-05-25 09:59:17 -07:00
parent ecd47078e6
commit 67bdb7f033
34 changed files with 1261 additions and 606 deletions

View file

@ -83,7 +83,6 @@ rtrb = "0.3.2"
async-trait = "0.1"
# Utilities
anyhow = "1.0.100"
thiserror = "2.0.18"
tracing = "0.1.44"
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }

View file

@ -2,7 +2,7 @@
//!
//! Parses FLAC file bytes to extract raw PCM i16 samples.
use anyhow::{Context, bail};
use super::AudioParseError;
use tracing::debug;
/// Parse a FLAC file and return the raw PCM i16 samples (mono).
@ -11,9 +11,9 @@ use tracing::debug;
/// - Standard FLAC files
/// - Stereo to mono conversion (if needed)
/// - Various bit depths (converted to 16-bit)
pub fn parse_flac(data: &[u8]) -> anyhow::Result<(Vec<i16>, u32)> {
pub fn parse_flac(data: &[u8]) -> Result<(Vec<i16>, u32), AudioParseError> {
let cursor = std::io::Cursor::new(data);
let mut reader = claxon::FlacReader::new(cursor).context("Failed to create FLAC reader")?;
let mut reader = claxon::FlacReader::new(cursor)?;
let info = reader.streaminfo();
let sample_rate = info.sample_rate;
@ -28,7 +28,7 @@ pub fn parse_flac(data: &[u8]) -> anyhow::Result<(Vec<i16>, u32)> {
// Read all samples
let mut raw_samples: Vec<i32> = Vec::new();
for sample in reader.samples() {
raw_samples.push(sample.context("Failed to read FLAC sample")?);
raw_samples.push(sample?);
}
// Convert to i16 based on bit depth
@ -37,7 +37,12 @@ pub fn parse_flac(data: &[u8]) -> anyhow::Result<(Vec<i16>, u32)> {
16 => raw_samples.iter().map(|&s| s as i16).collect(),
24 => raw_samples.iter().map(|&s| (s >> 8) as i16).collect(),
32 => raw_samples.iter().map(|&s| (s >> 16) as i16).collect(),
_ => bail!("Unsupported FLAC bit depth: {}", bits_per_sample),
n => {
return Err(AudioParseError::Unsupported(format!(
"FLAC bit depth: {}",
n
)));
}
};
// Convert to mono if stereo (samples are interleaved)

View file

@ -6,3 +6,22 @@
pub mod flac;
pub mod simd;
pub mod wav;
/// Errors that can occur while parsing a WAV or FLAC file.
#[derive(thiserror::Error, Debug)]
pub enum AudioParseError {
/// File header, chunk structure, or sample data was malformed for the
/// format implied by the magic bytes. Carries a short human-readable
/// reason (chunk name, byte offset, etc.).
#[error("malformed audio data: {0}")]
Malformed(String),
/// Audio format is recognised but not supported by this parser (e.g.
/// non-PCM WAV, or a FLAC stream with an exotic bit depth).
#[error("unsupported audio: {0}")]
Unsupported(String),
/// Underlying claxon FLAC decoder error.
#[error("FLAC decode error: {0}")]
Flac(#[from] claxon::Error),
}

View file

@ -3,9 +3,17 @@
//! Parses WAV file bytes to extract raw PCM i16 samples.
//! Supports standard PCM WAV files (format code 1).
use anyhow::ensure;
use super::AudioParseError;
use tracing::debug;
fn ensure(cond: bool, msg: &'static str) -> Result<(), AudioParseError> {
if cond {
Ok(())
} else {
Err(AudioParseError::Malformed(msg.into()))
}
}
/// WAV format chunk data
#[derive(Debug)]
struct WavFormat {
@ -25,11 +33,11 @@ struct WavFormat {
/// - Standard PCM WAV files (format code 1)
/// - Stereo to mono conversion (if needed)
/// - 16-bit samples
pub fn parse_wav(data: &[u8]) -> anyhow::Result<(Vec<i16>, u32)> {
pub fn parse_wav(data: &[u8]) -> Result<(Vec<i16>, u32), AudioParseError> {
// Validate RIFF header
ensure!(data.len() >= 12, "WAV file too short for header");
ensure!(&data[0..4] == b"RIFF", "Missing RIFF header");
ensure!(&data[8..12] == b"WAVE", "Missing WAVE format");
ensure(data.len() >= 12, "WAV file too short for header")?;
ensure(&data[0..4] == b"RIFF", "missing RIFF header")?;
ensure(&data[8..12] == b"WAVE", "missing WAVE format")?;
let mut pos = 12;
let mut format: Option<WavFormat> = None;
@ -45,7 +53,7 @@ pub fn parse_wav(data: &[u8]) -> anyhow::Result<(Vec<i16>, u32)> {
match chunk_id {
b"fmt " => {
ensure!(chunk_size >= 16, "fmt chunk too small");
ensure(chunk_size >= 16, "fmt chunk too small")?;
format = Some(WavFormat {
audio_format: u16::from_le_bytes([data[pos], data[pos + 1]]),
num_channels: u16::from_le_bytes([data[pos + 2], data[pos + 3]]),
@ -63,9 +71,19 @@ pub fn parse_wav(data: &[u8]) -> anyhow::Result<(Vec<i16>, u32)> {
b"data" => {
let fmt = format
.as_ref()
.ok_or_else(|| anyhow::anyhow!("data chunk before fmt chunk"))?;
ensure!(fmt.audio_format == 1, "Only PCM format supported");
ensure!(fmt.bits_per_sample == 16, "Only 16-bit samples supported");
.ok_or_else(|| AudioParseError::Malformed("data chunk before fmt chunk".into()))?;
if fmt.audio_format != 1 {
return Err(AudioParseError::Unsupported(format!(
"WAV audio_format={} (only PCM=1 supported)",
fmt.audio_format
)));
}
if fmt.bits_per_sample != 16 {
return Err(AudioParseError::Unsupported(format!(
"WAV bits_per_sample={} (only 16 supported)",
fmt.bits_per_sample
)));
}
let data_end = (pos + chunk_size).min(data.len());
let sample_data = &data[pos..data_end];
@ -115,7 +133,7 @@ pub fn parse_wav(data: &[u8]) -> anyhow::Result<(Vec<i16>, u32)> {
let sample_rate = format
.as_ref()
.map(|f| f.sample_rate)
.ok_or_else(|| anyhow::anyhow!("No fmt chunk found"))?;
.ok_or_else(|| AudioParseError::Malformed("no fmt chunk found".into()))?;
Ok((samples, sample_rate))
}

View file

@ -28,7 +28,8 @@ use crate::transport::sip::{
clear_channel_stale_audio, empty_bridge_grace_period_secs, register_call_channel,
register_discord_to_sip, stop_loop, unregister_call_channel, unregister_discord_to_sip,
};
use anyhow::Result;
use crate::BridgeError;
use crate::services::sound::SoundError;
use crossbeam_channel::{Receiver, Sender, bounded};
use dashmap::{DashMap, DashSet};
use std::collections::HashSet;
@ -146,16 +147,14 @@ impl BridgeCoordinator {
sip_cmd_tx: Sender<SipCommand>,
sip_event_rx: Receiver<SipEvent>,
shared_discord: Arc<SharedDiscordClient>,
) -> Self {
) -> Result<Self, SoundError> {
let (discord_event_tx, discord_event_rx) = bounded(1000);
// Load sounds from config.toml
let sounds_dir = PathBuf::from(&crate::config::EnvConfig::global().sounds_dir);
let sound_manager = create_sound_manager(sounds_dir)?;
let sound_manager = create_sound_manager(sounds_dir)
.expect("Failed to create SoundManager - check config.toml and sound files");
Self {
Ok(Self {
backend,
sip_cmd_tx,
sip_event_rx,
@ -169,11 +168,11 @@ impl BridgeCoordinator {
discord_event_rx,
sound_manager,
shared_discord,
}
})
}
/// Run the bridge coordinator (consumes self)
pub async fn run(self) -> Result<()> {
pub async fn run(self) -> Result<(), BridgeError> {
info!("Bridge coordinator started");
// Shared notify: VoiceReceiver signals this on unexpected DriverDisconnect,

View file

@ -1,8 +1,34 @@
use anyhow::{Context, Result};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::OnceLock;
/// Errors that can occur loading and validating bridge configuration.
#[derive(thiserror::Error, Debug)]
pub enum ConfigError {
#[error("failed to read config file {path:?}: {source}")]
Read {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("failed to parse config file {path:?}: {source}")]
TomlParse {
path: PathBuf,
#[source]
source: toml::de::Error,
},
#[error("failed to parse environment variables: {0}")]
Envy(#[from] envy::Error),
#[error("global EnvConfig has already been initialised")]
EnvAlreadyInitialised,
#[error("required environment variable {0} is not set")]
MissingEnvVar(&'static str),
}
/// Global application config (loaded from config.toml)
pub static APP_CONFIG: OnceLock<AppConfig> = OnceLock::new();
@ -85,30 +111,29 @@ pub struct EnvConfig {
impl EnvConfig {
/// Parse environment variables (via `envy`) and store in the global `OnceLock`.
/// Call once at the top of `main()`.
pub fn init() -> Result<()> {
pub fn init() -> Result<(), ConfigError> {
dotenvy::dotenv().ok();
let cfg: EnvConfig =
envy::from_env().context("Failed to parse environment variables into EnvConfig")?;
let cfg: EnvConfig = envy::from_env()?;
ENV_CONFIG
.set(cfg)
.ok()
.context("EnvConfig already initialized")?;
.map_err(|_| ConfigError::EnvAlreadyInitialised)?;
Ok(())
}
/// Access the global `EnvConfig` (panics if `init()` was not called).
/// Access the global `EnvConfig` (panics if `init()` was not called — a
/// programmer error, not a recoverable condition).
pub fn global() -> &'static EnvConfig {
ENV_CONFIG
.get()
.expect("EnvConfig not initialized — call EnvConfig::init() first")
ENV_CONFIG.get().unwrap_or_else(|| {
panic!("EnvConfig not initialized — call EnvConfig::init() first")
})
}
/// Build a `SipConfig` from the parsed environment.
pub fn to_sip_config(&self) -> Result<SipConfig> {
pub fn to_sip_config(&self) -> Result<SipConfig, ConfigError> {
let public_host = self
.sip_public_host
.clone()
.context("SIP_PUBLIC_HOST required")?;
.ok_or(ConfigError::MissingEnvVar("SIP_PUBLIC_HOST"))?;
let local_net = match (&self.sip_local_host, &self.sip_local_cidr) {
(Some(host), Some(cidr)) => Some(LocalNetConfig {
@ -299,18 +324,23 @@ pub struct SoundEntry {
impl AppConfig {
/// Load configuration from a TOML file
pub fn load(path: &Path) -> Result<Self> {
let contents = std::fs::read_to_string(path)
.with_context(|| format!("Failed to read config file: {}", path.display()))?;
toml::from_str(&contents)
.with_context(|| format!("Failed to parse config file: {}", path.display()))
pub fn load(path: &Path) -> Result<Self, ConfigError> {
let contents = std::fs::read_to_string(path).map_err(|source| ConfigError::Read {
path: path.to_path_buf(),
source,
})?;
toml::from_str(&contents).map_err(|source| ConfigError::TomlParse {
path: path.to_path_buf(),
source,
})
}
/// Get the global application config (panics if not initialized)
/// Get the global application config (panics if not initialized — a
/// programmer error: caller must `AppConfig::load(...)` first).
pub fn global() -> &'static AppConfig {
APP_CONFIG
.get()
.expect("AppConfig not initialized - call AppConfig::load() first")
APP_CONFIG.get().unwrap_or_else(|| {
panic!("AppConfig not initialized — call AppConfig::load() first")
})
}
/// Get bridge config (with defaults if not loaded yet)
@ -370,7 +400,7 @@ pub struct LocalNetConfig {
impl SipConfig {
/// Load SIP configuration from environment variables.
/// Standalone method for backends that don't need the full Config.
pub fn from_env() -> Result<Self> {
pub fn from_env() -> Result<Self, ConfigError> {
EnvConfig::global().to_sip_config()
}
}

View file

@ -0,0 +1,47 @@
//! Top-level error type for the `sipcord-bridge` crate.
//!
//! [`BridgeError`] aggregates every subsystem error so callers (main binaries,
//! adapter crates) can use a single `Result` type and rely on `?` propagation
//! via `#[from]` conversions.
use crate::audio::AudioParseError;
use crate::config::ConfigError;
use crate::fax::FaxError;
use crate::routing::CallError;
use crate::services::sound::SoundError;
use crate::transport::discord::DiscordError;
use crate::transport::sip::error::SipError;
/// Umbrella error for the entire bridge crate.
#[derive(thiserror::Error, Debug)]
pub enum BridgeError {
#[error(transparent)]
Config(#[from] ConfigError),
#[error(transparent)]
Sip(#[from] SipError),
#[error(transparent)]
Discord(#[from] DiscordError),
#[error(transparent)]
Routing(#[from] CallError),
#[error(transparent)]
Fax(#[from] FaxError),
#[error(transparent)]
Sound(#[from] SoundError),
#[error(transparent)]
AudioParse(#[from] AudioParseError),
/// Generic I/O at the top level (file ops in main, etc.) that aren't tied
/// to a particular subsystem.
#[error("I/O ({context}): {source}")]
Io {
context: String,
#[source]
source: std::io::Error,
},
}

View file

@ -5,8 +5,8 @@
//! - Replaced with "Fax Received" (green) with page image gallery on success
//! - Edited to "Fax Failed" (red) with reason on failure
use super::FaxError;
use crate::services::snowflake::Snowflake;
use anyhow::{Context, Result};
use serenity::all::{ChannelId, MessageId, UserId};
use serenity::builder::{
CreateAttachment, CreateEmbed, CreateEmbedFooter, CreateMessage, EditMessage,
@ -30,14 +30,20 @@ pub struct DiscordPoster {
}
impl DiscordPoster {
pub fn new(bot_token: String, channel_id: Snowflake, user_id: String) -> Self {
let token: Token = bot_token.parse().expect("invalid Discord bot token");
Self {
pub fn new(
bot_token: String,
channel_id: Snowflake,
user_id: String,
) -> Result<Self, FaxError> {
let token: Token = bot_token
.parse()
.map_err(|e| FaxError::InvalidToken(format!("{e}")))?;
Ok(Self {
http: Arc::new(Http::new(token)),
channel_id: ChannelId::new(*channel_id),
user_id,
display_name: None,
}
})
}
/// Resolve and cache the Discord display name for the user.
@ -70,7 +76,7 @@ impl DiscordPoster {
}
/// Post a "Receiving fax..." status message. Returns the message ID for future edits.
pub async fn post_fax_receiving(&mut self) -> Result<u64> {
pub async fn post_fax_receiving(&mut self) -> Result<u64, FaxError> {
self.resolve_display_name().await;
let embed = CreateEmbed::new()
@ -83,8 +89,7 @@ impl DiscordPoster {
.channel_id
.widen()
.send_message(&self.http, CreateMessage::new().embed(embed))
.await
.context("Failed to post fax receiving message")?;
.await?;
debug!("Posted fax receiving message: {}", msg.id);
Ok(msg.id.get())
@ -104,7 +109,7 @@ impl DiscordPoster {
image_pages: Vec<Vec<u8>>,
page_count: u32,
file_ext: &str,
) -> Result<()> {
) -> Result<(), FaxError> {
/// Discord's maximum number of embeds per message.
const MAX_EMBEDS: u32 = 10;
@ -170,14 +175,18 @@ impl DiscordPoster {
"Discord API error editing fax complete (msg={}, {} pages): {}",
message_id, page_count, e
);
anyhow::bail!("Failed to edit fax complete message: {}", e);
return Err(FaxError::Discord(e));
}
Ok(())
}
/// Edit the status message to show a failure reason.
pub async fn edit_fax_failed(&self, message_id: u64, reason: &str) -> Result<()> {
pub async fn edit_fax_failed(
&self,
message_id: u64,
reason: &str,
) -> Result<(), FaxError> {
let embed = CreateEmbed::new()
.title("Fax Failed")
.description(reason)
@ -201,7 +210,7 @@ impl DiscordPoster {
}
/// Post a standalone failure message (when no "receiving" message was posted).
pub async fn post_fax_failed(&mut self, reason: &str) -> Result<()> {
pub async fn post_fax_failed(&mut self, reason: &str) -> Result<(), FaxError> {
self.resolve_display_name().await;
let embed = CreateEmbed::new()

View file

@ -16,3 +16,50 @@ pub mod discord_poster;
pub mod session;
pub mod spandsp;
pub mod tiff_decoder;
/// Errors from the fax subsystem.
///
/// Variants are intentionally coarse — fax flows are end-to-end best-effort
/// (a missed page or codec mismatch logs and aborts the session) and the
/// detailed `String` payloads carry enough context for triage. Where a more
/// structured upstream type already exists (`serenity::Error`, `io::Error`),
/// we wrap it via `#[from]` / `#[source]`.
#[derive(thiserror::Error, Debug)]
pub enum FaxError {
/// Discord REST / gateway error while posting or editing fax status.
#[error("Discord post failed: {0}")]
Discord(#[from] serenity::Error),
/// Token parsing failure when constructing the fax-posting client.
#[error("invalid Discord bot token: {0}")]
InvalidToken(String),
/// I/O error reading/writing TIFFs or working with paths.
#[error("fax I/O ({context}): {source}")]
Io {
context: String,
#[source]
source: std::io::Error,
},
/// Path couldn't be converted to UTF-8 for the SpanDSP / TIFF API.
#[error("path is not valid UTF-8: {0}")]
NonUtf8Path(String),
/// SpanDSP FFI returned an error from one of its setters or state-init
/// functions.
#[error("SpanDSP ({operation}): {detail}")]
SpanDsp {
operation: &'static str,
detail: String,
},
/// TIFF parsing / decoding failure. Carries a human-readable reason.
#[error("TIFF decode: {0}")]
Tiff(String),
/// A received fax produced no pages (decoder bail-out, session closed
/// before any page was completed, etc.).
#[error("no pages in received fax")]
NoPages,
}

View file

@ -7,12 +7,12 @@
//! 4. On completion, TIFF is converted to PNG and posted to Discord
//! 5. On failure or timeout, an error message is posted to Discord
use crate::fax::FaxError;
use crate::fax::discord_poster::DiscordPoster;
use crate::fax::spandsp::{FaxReceiver, FaxRxStatus, FaxT38Receiver};
use crate::fax::tiff_decoder;
use crate::services::snowflake::Snowflake;
use crate::transport::sip::CallId;
use anyhow::Result;
use std::io::Cursor;
use std::path::PathBuf;
use std::time::Instant;
@ -89,7 +89,7 @@ impl FaxSession {
guild_id: Snowflake,
user_id: String,
bot_token: String,
) -> Result<Self> {
) -> Result<Self, FaxError> {
let fax_config = crate::config::AppConfig::fax();
// Use configured tmp_folder or system temp dir
@ -109,12 +109,15 @@ impl FaxSession {
});
let tiff_dir = base_dir.join(format!("{}{}", fax_config.prefix, session_id));
std::fs::create_dir_all(&tiff_dir)?;
std::fs::create_dir_all(&tiff_dir).map_err(|source| FaxError::Io {
context: format!("create tiff dir {}", tiff_dir.display()),
source,
})?;
let tiff_path = tiff_dir.join(format!("{}{}.tiff", fax_config.prefix, session_id));
let receiver = FaxReceiver::new_audio_receiver(&tiff_path)?;
let poster = DiscordPoster::new(bot_token, text_channel_id, user_id.clone());
let poster = DiscordPoster::new(bot_token, text_channel_id, user_id.clone())?;
Ok(Self {
call_id,
@ -278,7 +281,7 @@ impl FaxSession {
/// Post the initial "Receiving fax..." message to Discord.
/// Called when fax negotiation is detected.
pub async fn post_receiving_message(&mut self) -> Result<()> {
pub async fn post_receiving_message(&mut self) -> Result<(), FaxError> {
match self.poster.post_fax_receiving().await {
Ok(msg_id) => {
debug!(
@ -317,7 +320,7 @@ impl FaxSession {
/// Convert the received TIFF to images and post to Discord.
/// Called after fax reception is complete.
pub async fn convert_and_post(&mut self) -> Result<()> {
pub async fn convert_and_post(&mut self) -> Result<(), FaxError> {
// Guard against double-processing: if we've already posted (Complete) or failed,
// another caller (e.g., CallEnded racing with T.38 completion) already handled it.
// Note: FaxState::Received is NOT skipped — that's the normal entry state.
@ -364,11 +367,12 @@ impl FaxSession {
.write_to(&mut Cursor::new(&mut buf), output_format.image_format())
.map(|_| buf)
})
.collect::<std::result::Result<Vec<_>, _>>()?;
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| FaxError::Tiff(format!("image encode: {e}")))?;
if image_pages.is_empty() {
self.post_failure("No pages in received fax").await;
anyhow::bail!("No pages in received fax");
return Err(FaxError::NoPages);
}
let page_count = image_pages.len() as u32;

View file

@ -3,7 +3,7 @@
//! Uses the `spandsp` safe wrapper crate to decode G.711 audio into TIFF images.
//! Audio arrives at 16kHz from PJSUA conference bridge; we downsample to 8kHz for SpanDSP.
use anyhow::{Context, Result};
use super::FaxError;
use spandsp::fax::FaxState;
use spandsp::logging::{LogLevel, LogShowFlags};
use spandsp::spandsp_sys;
@ -75,19 +75,30 @@ fn configure_t30(
t30: &spandsp::t30::T30State,
tiff_path: &str,
callback_state: &mut FaxCallbackState,
) -> Result<()> {
) -> Result<(), FaxError> {
/// Local macro: tag a SpanDSP error with the operation name. Avoids
/// boilerplate at every setter call site.
macro_rules! spandsp_err {
($op:expr) => {
|e| FaxError::SpanDsp {
operation: $op,
detail: e.to_string(),
}
};
}
t30.set_rx_file(tiff_path, -1)
.map_err(|e| anyhow::anyhow!("Failed to set rx file: {}", e))?;
.map_err(spandsp_err!("set_rx_file"))?;
t30.set_supported_modems(T30ModemSupport::default())
.map_err(|e| anyhow::anyhow!("Failed to set supported modems: {}", e))?;
.map_err(spandsp_err!("set_supported_modems"))?;
t30.set_ecm_capability(true)
.map_err(|e| anyhow::anyhow!("Failed to set ECM: {}", e))?;
.map_err(spandsp_err!("set_ecm_capability"))?;
let compressions = T4_COMPRESSION_T4_1D | T4_COMPRESSION_T4_2D | T4_COMPRESSION_T6;
t30.set_supported_compressions(compressions)
.map_err(|e| anyhow::anyhow!("Failed to set compressions: {}", e))?;
.map_err(spandsp_err!("set_supported_compressions"))?;
let sizes = T4_SUPPORT_WIDTH_215MM
| T4_SUPPORT_WIDTH_255MM
@ -97,7 +108,7 @@ fn configure_t30(
| T4_RESOLUTION_R8_SUPERFINE
| T4_RESOLUTION_200_200;
t30.set_supported_image_sizes(sizes)
.map_err(|e| anyhow::anyhow!("Failed to set image sizes: {}", e))?;
.map_err(spandsp_err!("set_supported_image_sizes"))?;
let user_data = callback_state as *mut FaxCallbackState as *mut std::ffi::c_void;
unsafe {
@ -208,15 +219,20 @@ impl FaxReceiver {
/// Create a new fax receiver in audio mode.
///
/// Initializes SpanDSP in receive mode and sets the output TIFF path.
pub fn new_audio_receiver(tiff_path: &Path) -> Result<Self> {
let tiff_path_str = tiff_path.to_str().context("Invalid TIFF path")?;
pub fn new_audio_receiver(tiff_path: &Path) -> Result<Self, FaxError> {
let tiff_path_str = tiff_path
.to_str()
.ok_or_else(|| FaxError::NonUtf8Path(tiff_path.display().to_string()))?;
let fax = FaxState::new(false)
.map_err(|e| anyhow::anyhow!("Failed to initialize SpanDSP fax state: {}", e))?;
let fax = FaxState::new(false).map_err(|e| FaxError::SpanDsp {
operation: "FaxState::new",
detail: e.to_string(),
})?;
let t30 = fax
.get_t30_state()
.map_err(|e| anyhow::anyhow!("Failed to get T.30 state: {}", e))?;
let t30 = fax.get_t30_state().map_err(|e| FaxError::SpanDsp {
operation: "FaxState::get_t30_state",
detail: e.to_string(),
})?;
let mut callback_state = Box::new(FaxCallbackState {
negotiation_started: false,
@ -350,8 +366,13 @@ impl FaxT38Receiver {
///
/// `tiff_path`: Where to write the received fax TIFF file.
/// `tx_ifp_sender`: Channel for outgoing IFP packets (sent to UDPTL socket).
pub fn new(tiff_path: &Path, tx_ifp_sender: mpsc::UnboundedSender<Vec<u8>>) -> Result<Self> {
let tiff_path_str = tiff_path.to_str().context("Invalid TIFF path")?;
pub fn new(
tiff_path: &Path,
tx_ifp_sender: mpsc::UnboundedSender<Vec<u8>>,
) -> Result<Self, FaxError> {
let tiff_path_str = tiff_path
.to_str()
.ok_or_else(|| FaxError::NonUtf8Path(tiff_path.display().to_string()))?;
let tx_callback_state = Box::new(TxCallbackState {
sender: tx_ifp_sender,
@ -359,13 +380,18 @@ impl FaxT38Receiver {
let tx_user_data = &*tx_callback_state as *const TxCallbackState as *mut std::ffi::c_void;
let terminal = unsafe {
T38Terminal::new_raw(false, Some(tx_packet_handler), tx_user_data)
.map_err(|e| anyhow::anyhow!("Failed to initialize T38Terminal: {}", e))?
T38Terminal::new_raw(false, Some(tx_packet_handler), tx_user_data).map_err(|e| {
FaxError::SpanDsp {
operation: "T38Terminal::new_raw",
detail: e.to_string(),
}
})?
};
let t30 = terminal
.get_t30_state()
.map_err(|e| anyhow::anyhow!("Failed to get T.30 state from T38Terminal: {}", e))?;
let t30 = terminal.get_t30_state().map_err(|e| FaxError::SpanDsp {
operation: "T38Terminal::get_t30_state",
detail: e.to_string(),
})?;
let mut callback_state = Box::new(FaxCallbackState {
negotiation_started: false,
@ -384,7 +410,10 @@ impl FaxT38Receiver {
let t38_core = terminal
.get_t38_core_state()
.map_err(|e| anyhow::anyhow!("Failed to get T38Core: {}", e))?;
.map_err(|e| FaxError::SpanDsp {
operation: "T38Terminal::get_t38_core_state",
detail: e.to_string(),
})?;
configure_log_state(spandsp_sys::t38_core_get_logging_state(t38_core.as_ptr()));
}

View file

@ -6,12 +6,20 @@
//! Huffman table data derived from the ITU-T T.4 standard.
//! Bit-reading approach inspired by the `fax` crate (MIT licensed).
use anyhow::{Result, bail};
use super::FaxError;
use image::GrayImage;
use std::path::Path;
use std::sync::OnceLock;
use tracing::debug;
/// Convenience: most failures in this module are malformed-TIFF conditions
/// that map cleanly onto `FaxError::Tiff(String)`.
macro_rules! tiff_bail {
($($arg:tt)*) => {
return Err(FaxError::Tiff(format!($($arg)*)))
};
}
// Public API
/// Maximum TIFF file size (50 MB). Well above any reasonable fax output from SpanDSP,
@ -19,19 +27,27 @@ use tracing::debug;
const MAX_TIFF_SIZE: u64 = 50 * 1024 * 1024;
/// Decode all pages of a fax TIFF file into grayscale images.
pub fn decode_fax_tiff(path: &Path) -> Result<Vec<GrayImage>> {
pub fn decode_fax_tiff(path: &Path) -> Result<Vec<GrayImage>, FaxError> {
if !path.exists() {
bail!("TIFF file not found: {}", path.display());
tiff_bail!("TIFF file not found: {}", path.display());
}
let file_size = std::fs::metadata(path)?.len();
let file_size = std::fs::metadata(path)
.map_err(|source| FaxError::Io {
context: format!("metadata({})", path.display()),
source,
})?
.len();
if file_size > MAX_TIFF_SIZE {
bail!(
tiff_bail!(
"TIFF file too large: {} bytes (max {} bytes)",
file_size,
MAX_TIFF_SIZE
);
}
let data = std::fs::read(path)?;
let data = std::fs::read(path).map_err(|source| FaxError::Io {
context: format!("read({})", path.display()),
source,
})?;
let pages = parse_tiff_ifds(&data)?;
let mut images = Vec::with_capacity(pages.len());
@ -51,7 +67,7 @@ pub fn decode_fax_tiff(path: &Path) -> Result<Vec<GrayImage>> {
let start = *off as usize;
let end = start + *len as usize;
if end > data.len() {
bail!(
tiff_bail!(
"TIFF strip extends past file: offset={}, count={}, file_len={}",
off,
len,
@ -71,7 +87,7 @@ pub fn decode_fax_tiff(path: &Path) -> Result<Vec<GrayImage>> {
let transitions_per_line = match page.compression {
3 => decode_group3(&strip_data, page.width, page.height, page.t4_options)?,
4 => decode_group4(&strip_data, page.width, page.height)?,
other => bail!("Unsupported TIFF compression: {}", other),
other => tiff_bail!("Unsupported TIFF compression: {}", other),
};
let img = assemble_image(
@ -104,18 +120,18 @@ struct TiffPage {
y_resolution: Option<(u32, u32)>, // numerator, denominator (RATIONAL)
}
fn parse_tiff_ifds(data: &[u8]) -> Result<Vec<TiffPage>> {
fn parse_tiff_ifds(data: &[u8]) -> Result<Vec<TiffPage>, FaxError> {
if data.len() < 8 {
bail!("TIFF file too short");
tiff_bail!("TIFF file too short");
}
let le = match (data[0], data[1]) {
(0x49, 0x49) => true,
(0x4D, 0x4D) => false,
_ => bail!("Not a TIFF file"),
_ => tiff_bail!("Not a TIFF file"),
};
let magic = read_u16(data, 2, le);
if magic != 42 {
bail!("Bad TIFF magic: {}", magic);
tiff_bail!("Bad TIFF magic: {}", magic);
}
let mut ifd_offset = read_u32(data, 4, le) as usize;
@ -184,7 +200,7 @@ fn parse_tiff_ifds(data: &[u8]) -> Result<Vec<TiffPage>> {
}
if pages.is_empty() {
bail!("No IFDs found in TIFF");
tiff_bail!("No IFDs found in TIFF");
}
Ok(pages)
}
@ -830,7 +846,12 @@ fn decode_line_2d(reader: &mut BitReader, reference: &[u16], width: u16) -> Opti
// Group 3 Image Driver
fn decode_group3(data: &[u8], width: u32, height: u32, t4_options: u32) -> Result<Vec<Vec<u16>>> {
fn decode_group3(
data: &[u8],
width: u32,
height: u32,
t4_options: u32,
) -> Result<Vec<Vec<u16>>, FaxError> {
let w = width as u16;
let is_2d = (t4_options & 1) != 0;
let has_fill_bits = (t4_options & 4) != 0;
@ -840,7 +861,7 @@ fn decode_group3(data: &[u8], width: u32, height: u32, t4_options: u32) -> Resul
// Scan for the first EOL
if !reader.scan_for_eol() {
bail!("No EOL found at start of Group 3 data");
tiff_bail!("No EOL found at start of Group 3 data");
}
for _ in 0..height {
@ -926,14 +947,14 @@ fn decode_group3(data: &[u8], width: u32, height: u32, t4_options: u32) -> Resul
}
if lines.is_empty() {
bail!("Group 3 decoder produced no lines");
tiff_bail!("Group 3 decoder produced no lines");
}
Ok(lines)
}
// Group 4 Image Driver
fn decode_group4(data: &[u8], width: u32, height: u32) -> Result<Vec<Vec<u16>>> {
fn decode_group4(data: &[u8], width: u32, height: u32) -> Result<Vec<Vec<u16>>, FaxError> {
let w = width as u16;
let mut reader = BitReader::new(data);
let mut lines: Vec<Vec<u16>> = Vec::with_capacity(height as usize);
@ -958,7 +979,7 @@ fn decode_group4(data: &[u8], width: u32, height: u32) -> Result<Vec<Vec<u16>>>
}
if lines.is_empty() {
bail!("Group 4 decoder produced no lines");
tiff_bail!("Group 4 decoder produced no lines");
}
Ok(lines)
}

View file

@ -8,11 +8,19 @@
//! and authentication. A built-in `StaticBackend` (TOML dialplan) is included.
#![feature(portable_simd)]
// Lock down the no-unwrap policy. Test modules opt out via the
// `#[cfg_attr(test, allow(...))]` shim at their boundary (or `#[allow]` at
// the test fn level for isolated cases). See feedback memories
// `feedback-no-unwrap-in-production` and `feedback-fix-clippy-at-source`.
#![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::expect_used))]
pub mod audio;
pub mod call;
pub mod config;
pub mod error;
pub mod fax;
pub mod routing;
pub mod services;
pub mod transport;
pub use error::BridgeError;

View file

@ -3,25 +3,32 @@
//! Standalone SIP-to-Discord voice bridge using a TOML dialplan.
#![feature(portable_simd)]
#![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::expect_used))]
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{Context, Result};
use tracing::{error, info};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use sipcord_bridge::BridgeError;
use sipcord_bridge::call::BridgeCoordinator;
use sipcord_bridge::config::{APP_CONFIG, AppConfig, EnvConfig, SipConfig};
use sipcord_bridge::config::{APP_CONFIG, AppConfig, ConfigError, EnvConfig, SipConfig};
use sipcord_bridge::routing::static_router::StaticBackend;
use sipcord_bridge::transport::discord::SharedDiscordClient;
use sipcord_bridge::transport::sip::SipTransport;
#[tokio::main]
async fn main() -> Result<()> {
rustls::crypto::ring::default_provider()
async fn main() -> Result<(), BridgeError> {
// Pre-init failures here are programmer errors (missing rustls feature
// flag, double-init of the global crypto provider) — panicking is the
// right behaviour and there's no caller that could recover.
if rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install rustls crypto provider");
.is_err()
{
panic!("rustls crypto provider already installed or feature missing");
}
tracing_subscriber::registry()
.with(
@ -39,17 +46,17 @@ async fn main() -> Result<()> {
let app_config = AppConfig::load(&config_path)?;
APP_CONFIG
.set(app_config)
.expect("AppConfig already initialized");
.map_err(|_| BridgeError::Config(ConfigError::EnvAlreadyInitialised))?;
info!("Loaded config from {}", config_path.display());
run_static_router().await
}
async fn run_static_router() -> Result<()> {
async fn run_static_router() -> Result<(), BridgeError> {
let bot_token = EnvConfig::global()
.discord_bot_token
.clone()
.context("DISCORD_BOT_TOKEN required")?;
.ok_or(ConfigError::MissingEnvVar("DISCORD_BOT_TOKEN"))?;
let sip_config = SipConfig::from_env()?;
// Load dialplan
@ -73,9 +80,7 @@ async fn run_static_router() -> Result<()> {
});
// Create shared Discord client
let shared_discord = SharedDiscordClient::new(&bot_token)
.await
.expect("Failed to create shared Discord client");
let shared_discord = SharedDiscordClient::new(&bot_token).await?;
info!("Shared Discord client initialized");
let bridge = BridgeCoordinator::new(
@ -83,7 +88,7 @@ async fn run_static_router() -> Result<()> {
sip_transport.commands(),
sip_transport.events(),
shared_discord,
);
)?;
info!("Starting components...");

View file

@ -41,12 +41,17 @@ pub enum RouteDecision {
}
/// Errors that trigger audio playback before hangup
#[derive(Debug, Clone, Copy)]
#[derive(thiserror::Error, Debug, Clone, Copy)]
pub enum CallError {
#[error("no channel mapping for the dialed extension")]
NoChannelMapping,
#[error("user lacks permission for the target Discord channel")]
NoPermissions,
#[error("Discord API error")]
DiscordApiError,
#[error("server is busy")]
ServerBusy,
#[error("unknown call error")]
Unknown,
}

View file

@ -19,6 +19,7 @@ use async_trait::async_trait;
use serde::Deserialize;
use tracing::info;
use crate::config::ConfigError;
use crate::routing::{Backend, CallError, CallStartedInfo, OutboundCallRequest, RouteDecision};
use crate::services::snowflake::Snowflake;
use crate::transport::sip::DigestAuthParams;
@ -46,11 +47,15 @@ pub struct StaticBackend {
impl StaticBackend {
/// Load the dialplan from a TOML file. `bot_token` comes from the environment.
pub fn load(path: &Path, bot_token: String) -> anyhow::Result<Self> {
let content = std::fs::read_to_string(path)
.map_err(|e| anyhow::anyhow!("Failed to read {}: {}", path.display(), e))?;
let dialplan: Dialplan = toml::from_str(&content)
.map_err(|e| anyhow::anyhow!("Failed to parse {}: {}", path.display(), e))?;
pub fn load(path: &Path, bot_token: String) -> Result<Self, ConfigError> {
let content = std::fs::read_to_string(path).map_err(|source| ConfigError::Read {
path: path.to_path_buf(),
source,
})?;
let dialplan: Dialplan = toml::from_str(&content).map_err(|source| ConfigError::TomlParse {
path: path.to_path_buf(),
source,
})?;
info!(
"Loaded dialplan from {} ({} extensions)",

View file

@ -169,6 +169,18 @@ impl Registrar {
}
}
/// Start the periodic cleanup task
pub fn spawn_cleanup_task(registrar: Arc<Registrar>) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
registrar.remove_expired();
debug!("Registrar cleanup complete");
}
});
}
#[cfg(test)]
mod tests {
use super::*;
@ -313,15 +325,3 @@ mod tests {
assert_eq!(contacts[0].0, "sip:charlie@5.6.7.8");
}
}
/// Start the periodic cleanup task
pub fn spawn_cleanup_task(registrar: Arc<Registrar>) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
registrar.remove_expired();
debug!("Registrar cleanup complete");
}
});
}

View file

@ -8,16 +8,51 @@
mod streaming;
use crate::audio::{flac, wav};
use crate::audio::{AudioParseError, flac, wav};
use crate::config::{AppConfig, SoundEntry};
use crate::transport::sip::CONF_SAMPLE_RATE;
use anyhow::{Context, Result};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tracing::{debug, info, warn};
pub use streaming::StreamingPlayer;
pub use streaming::{StreamingError, StreamingPlayer};
/// Errors raised by sound loading and parsing.
#[derive(thiserror::Error, Debug)]
pub enum SoundError {
/// Failed to read the sound file from disk.
#[error("failed to read sound file {path:?}: {source}")]
Read {
path: PathBuf,
#[source]
source: std::io::Error,
},
/// Parse failure from the WAV/FLAC decoder.
#[error("failed to parse audio for {name}: {source}")]
Parse {
name: String,
#[source]
source: AudioParseError,
},
/// Sound's sample rate didn't match the bridge's configured rate.
#[error("sound {name} has wrong sample rate: {got} Hz (expected {expected} Hz)")]
WrongSampleRate {
name: String,
got: u32,
expected: u32,
},
/// File header doesn't match any supported format (WAV / FLAC).
#[error("unknown audio format for {name}: header bytes {header:02x?}")]
UnknownFormat { name: String, header: Vec<u8> },
/// Streaming player setup failure.
#[error(transparent)]
Streaming(#[from] StreamingError),
}
/// A preloaded sound ready for immediate playback
#[derive(Debug, Clone)]
@ -49,7 +84,7 @@ pub struct SoundManager {
impl SoundManager {
/// Create a new SoundManager and load sounds from config
pub fn new(sounds_dir: PathBuf) -> Result<Self> {
pub fn new(sounds_dir: PathBuf) -> Result<Self, SoundError> {
let config = AppConfig::global();
let mut manager = Self {
preloaded: HashMap::new(),
@ -63,7 +98,7 @@ impl SoundManager {
}
/// Load all sounds from config entries
fn load_sounds(&mut self, entries: &HashMap<String, SoundEntry>) -> Result<()> {
fn load_sounds(&mut self, entries: &HashMap<String, SoundEntry>) -> Result<(), SoundError> {
let mut preloaded_count = 0;
let mut streaming_count = 0;
let mut virtual_count = 0;
@ -135,9 +170,15 @@ impl SoundManager {
}
/// Load a preloaded sound from a file
fn load_preloaded_sound(&self, path: &Path, name: &str) -> Result<PreloadedSound> {
let data = std::fs::read(path)
.with_context(|| format!("Failed to read sound file: {}", path.display()))?;
fn load_preloaded_sound(
&self,
path: &Path,
name: &str,
) -> Result<PreloadedSound, SoundError> {
let data = std::fs::read(path).map_err(|source| SoundError::Read {
path: path.to_path_buf(),
source,
})?;
let samples = self.parse_audio(&data, name)?;
@ -149,21 +190,22 @@ impl SoundManager {
})
}
/// Parse audio data (auto-detect WAV or FLAC format)
/// Expects 16kHz mono - panics if wrong sample rate
fn parse_audio(&self, data: &[u8], name: &str) -> Result<Vec<i16>> {
/// Parse audio data (auto-detect WAV or FLAC format).
/// Expects 16kHz mono — returns `WrongSampleRate` otherwise.
fn parse_audio(&self, data: &[u8], name: &str) -> Result<Vec<i16>, SoundError> {
// Check for FLAC magic number: "fLaC"
if data.len() >= 4 && &data[0..4] == b"fLaC" {
debug!("Detected FLAC format for '{}'", name);
let (samples, rate) = flac::parse_flac(data)
.with_context(|| format!("Failed to parse FLAC for sound '{}'", name))?;
let (samples, rate) = flac::parse_flac(data).map_err(|source| SoundError::Parse {
name: name.to_string(),
source,
})?;
if rate != CONF_SAMPLE_RATE {
anyhow::bail!(
"Sound '{}' has wrong sample rate: {} Hz (expected {} Hz). Pre-resample the file.",
name,
rate,
CONF_SAMPLE_RATE
);
return Err(SoundError::WrongSampleRate {
name: name.to_string(),
got: rate,
expected: CONF_SAMPLE_RATE,
});
}
return Ok(samples);
}
@ -171,24 +213,24 @@ impl SoundManager {
// Check for WAV magic number: "RIFF"
if data.len() >= 4 && &data[0..4] == b"RIFF" {
debug!("Detected WAV format for '{}'", name);
let (samples, rate) = wav::parse_wav(data)
.with_context(|| format!("Failed to parse WAV for sound '{}'", name))?;
let (samples, rate) = wav::parse_wav(data).map_err(|source| SoundError::Parse {
name: name.to_string(),
source,
})?;
if rate != CONF_SAMPLE_RATE {
anyhow::bail!(
"Sound '{}' has wrong sample rate: {} Hz (expected {} Hz). Pre-resample the file.",
name,
rate,
CONF_SAMPLE_RATE
);
return Err(SoundError::WrongSampleRate {
name: name.to_string(),
got: rate,
expected: CONF_SAMPLE_RATE,
});
}
return Ok(samples);
}
anyhow::bail!(
"Unknown audio format for '{}': header bytes {:?}",
name,
&data[..4.min(data.len())]
)
Err(SoundError::UnknownFormat {
name: name.to_string(),
header: data[..4.min(data.len())].to_vec(),
})
}
/// Get a preloaded sound by name
@ -235,6 +277,6 @@ impl SoundManager {
}
/// Create an Arc-wrapped SoundManager for sharing across async tasks
pub fn create_sound_manager(sounds_dir: PathBuf) -> Result<Arc<SoundManager>> {
pub fn create_sound_manager(sounds_dir: PathBuf) -> Result<Arc<SoundManager>, SoundError> {
Ok(Arc::new(SoundManager::new(sounds_dir)?))
}

View file

@ -6,10 +6,9 @@
//! Uses Symphonia for FLAC decoding (pure Rust).
use crate::transport::sip::CONF_SAMPLE_RATE;
use anyhow::{Context, Result};
use std::collections::VecDeque;
use std::fs::File;
use std::path::Path;
use std::path::{Path, PathBuf};
use symphonia::core::audio::{AudioBufferRef, Signal};
use symphonia::core::codecs::{CODEC_TYPE_NULL, DecoderOptions};
use symphonia::core::formats::FormatOptions;
@ -17,6 +16,40 @@ use symphonia::core::io::MediaSourceStream;
use symphonia::core::meta::MetadataOptions;
use symphonia::core::probe::Hint;
/// Errors raised by the streaming player.
#[derive(thiserror::Error, Debug)]
pub enum StreamingError {
#[error("failed to open streaming file {path:?}: {source}")]
Open {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("failed to probe streaming format {path:?}: {source}")]
Probe {
path: PathBuf,
#[source]
source: symphonia::core::errors::Error,
},
#[error("streaming file {path:?} has no audio track")]
NoTrack { path: PathBuf },
#[error("streaming file {path:?} has no sample rate")]
NoSampleRate { path: PathBuf },
#[error("streaming file {path:?} has wrong sample rate: {got} Hz (expected {expected} Hz)")]
WrongSampleRate {
path: PathBuf,
got: u32,
expected: u32,
},
#[error("failed to create streaming decoder: {0}")]
Decoder(#[source] symphonia::core::errors::Error),
}
/// Streaming player for large audio files
///
/// Reads FLAC frames on-demand to avoid loading entire file into memory.
@ -39,9 +72,11 @@ pub struct StreamingPlayer {
impl StreamingPlayer {
/// Create a new streaming player for a FLAC file
pub fn new(path: &Path) -> Result<Self> {
let file = File::open(path)
.with_context(|| format!("Failed to open streaming file: {}", path.display()))?;
pub fn new(path: &Path) -> Result<Self, StreamingError> {
let file = File::open(path).map_err(|source| StreamingError::Open {
path: path.to_path_buf(),
source,
})?;
let mss = MediaSourceStream::new(Box::new(file), Default::default());
@ -57,7 +92,10 @@ impl StreamingPlayer {
&FormatOptions::default(),
&MetadataOptions::default(),
)
.with_context(|| format!("Failed to probe format: {}", path.display()))?;
.map_err(|source| StreamingError::Probe {
path: path.to_path_buf(),
source,
})?;
let format = probed.format;
@ -66,7 +104,9 @@ impl StreamingPlayer {
.tracks()
.iter()
.find(|t| t.codec_params.codec != CODEC_TYPE_NULL)
.ok_or_else(|| anyhow::anyhow!("No audio track found in {}", path.display()))?;
.ok_or_else(|| StreamingError::NoTrack {
path: path.to_path_buf(),
})?;
let track_id = track.id;
@ -74,15 +114,16 @@ impl StreamingPlayer {
let sample_rate = track
.codec_params
.sample_rate
.ok_or_else(|| anyhow::anyhow!("No sample rate in track"))?;
.ok_or_else(|| StreamingError::NoSampleRate {
path: path.to_path_buf(),
})?;
if sample_rate != CONF_SAMPLE_RATE {
anyhow::bail!(
"Streaming file {} has wrong sample rate: {} Hz (expected {} Hz)",
path.display(),
sample_rate,
CONF_SAMPLE_RATE
);
return Err(StreamingError::WrongSampleRate {
path: path.to_path_buf(),
got: sample_rate,
expected: CONF_SAMPLE_RATE,
});
}
let channels = track.codec_params.channels.map(|c| c.count()).unwrap_or(1);
@ -99,7 +140,7 @@ impl StreamingPlayer {
let decoder = symphonia::default::get_codecs()
.make(&track.codec_params, &DecoderOptions::default())
.with_context(|| "Failed to create decoder")?;
.map_err(StreamingError::Decoder)?;
Ok(Self {
format,

View file

@ -2,7 +2,6 @@ mod voice;
use crate::audio::simd;
use crate::services::snowflake::Snowflake;
use anyhow::Result;
use audioadapter::Adapter;
use audioadapter_buffers::direct::SequentialSliceOfVecs;
use crossbeam_channel::Sender;
@ -29,6 +28,25 @@ use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::oneshot;
use tracing::{debug, error, info, trace, warn};
/// Errors raised by the Discord voice transport.
#[derive(thiserror::Error, Debug)]
pub enum DiscordError {
/// Discord bot token rejected by serenity (malformed, missing parts, etc.).
#[error("invalid Discord bot token: {0}")]
InvalidToken(String),
/// Serenity / songbird error (gateway, REST, voice connect).
#[error(transparent)]
Serenity(#[from] serenity::Error),
/// Songbird voice join failed after the configured number of retries.
#[error("failed to join voice channel after {attempts} attempts: {last_error}")]
JoinFailed {
attempts: u32,
last_error: String,
},
}
// Direct audio path: SIP audio thread → Discord
// Uses lock-free ring buffer for real-time audio streaming
@ -161,16 +179,19 @@ fn create_resampler() -> Async<f64> {
window: WindowFunction::BlackmanHarris2,
};
// 16kHz to 48kHz = ratio of 3.0, mono input, 320 samples per chunk (20ms at 16kHz)
// 16kHz to 48kHz = ratio of 3.0, mono input, 320 samples per chunk (20ms at 16kHz).
// Params are static and known-good; if rubato rejects them it's a programmer
// error (e.g. ratio constants changed inconsistently) and the program can't
// run anyway — panic explicitly with the rubato error for diagnostics.
Async::new_sinc(
48000.0 / 16000.0, // resample ratio (3.0x)
1.1, // max relative ratio (allow slight variation)
48000.0 / 16000.0,
1.1,
&params,
320, // chunk size (samples per frame at 16kHz)
1, // mono channel
FixedAsync::Input, // fixed input size
320,
1,
FixedAsync::Input,
)
.expect("Failed to create resampler")
.unwrap_or_else(|e| panic!("create_resampler: rubato rejected static params: {e}"))
}
/// RAII guard for a registered Discord audio sender.
@ -481,7 +502,7 @@ impl SharedDiscordClient {
/// This opens a single gateway WebSocket connection that stays alive for
/// the bridge's lifetime. The returned Songbird manager is used by all
/// voice connections to join/leave channels.
pub async fn new(bot_token: &str) -> Result<Arc<Self>> {
pub async fn new(bot_token: &str) -> Result<Arc<Self>, DiscordError> {
info!("Creating shared Discord client (single gateway connection)");
let intents = GatewayIntents::GUILDS | GatewayIntents::GUILD_VOICE_STATES;
@ -494,7 +515,7 @@ impl SharedDiscordClient {
let token: Token = bot_token
.parse()
.map_err(|e| anyhow::anyhow!("Invalid bot token: {}", e))?;
.map_err(|e| DiscordError::InvalidToken(format!("{e}")))?;
let mut client = Client::builder(token, intents)
.event_handler(Arc::new(SharedClientEventHandler { ready_tx }))
@ -607,7 +628,7 @@ impl DiscordVoiceConnection {
channel_id: Snowflake,
event_tx: Sender<DiscordEvent>,
health_check_notify: Arc<tokio::sync::Notify>,
) -> Result<Self> {
) -> Result<Self, DiscordError> {
info!(
"Joining voice channel {} in guild {} for bridge {} (using shared client)",
channel_id, guild_id, bridge_id
@ -654,10 +675,12 @@ impl DiscordVoiceConnection {
// This allows the pjsua audio thread to bypass tokio entirely
let audio_sender = RegisteredAudioSender::new(channel_id, producer);
// Create shared timestamp for health tracking
// Create shared timestamp for health tracking. The system
// clock would have to be set before 1970 to produce Err
// here; default to 0 in that case rather than panic.
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.unwrap_or_default()
.as_millis() as u64;
let last_audio_received = Arc::new(AtomicU64::new(now_ms));
@ -774,11 +797,10 @@ impl DiscordVoiceConnection {
}
// All retries failed
anyhow::bail!(
"Failed to join voice channel after {} attempts: {:?}",
max_retries,
last_error
)
Err(DiscordError::JoinFailed {
attempts: max_retries,
last_error: format!("{:?}", last_error),
})
}
/// Send audio to the Discord voice channel
@ -799,7 +821,7 @@ impl DiscordVoiceConnection {
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.unwrap_or_default()
.as_millis() as u64;
let last_recv = self.inner.last_audio_received.load(Ordering::Relaxed);
@ -1070,7 +1092,7 @@ impl VoiceEventHandler for VoiceReceiver {
// Update health tracking timestamp - VoiceTick arriving means Discord is alive
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.unwrap_or_default()
.as_millis() as u64;
self.last_audio_received.store(now_ms, Ordering::Relaxed);

View file

@ -181,8 +181,21 @@ impl Read for StreamingAudioSource {
return Ok(buf.len());
}
// Read samples from ring buffer directly into output buffer
let chunk = consumer.read_chunk(samples_to_read).unwrap();
// Read samples from ring buffer directly into output buffer.
// `samples_to_read <= samples_available` by construction, so this
// should never error; if rtrb's state ever desyncs, log + silence
// rather than panic on the audio thread.
let chunk = match consumer.read_chunk(samples_to_read) {
Ok(c) => c,
Err(e) => {
tracing::warn!(
"StreamingAudioSource: rtrb read_chunk unexpectedly failed ({:?}), filling with silence",
e
);
buf.fill(0);
return Ok(buf.len());
}
};
let (first, second) = chunk.as_slices();
// Bulk copy f32 samples as raw bytes (memcpy instead of per-sample loop)

View file

@ -49,10 +49,9 @@ use super::ffi::utils::{extract_sip_username, pj_str_to_string};
use dashmap::DashMap;
use parking_lot::Mutex;
use pjsua::*;
use std::ffi::CString;
use std::mem::MaybeUninit;
use std::net::IpAddr;
use std::os::raw::{c_char, c_int};
use std::os::raw::c_int;
use std::ptr;
/// Global sender for outbound call events (set during initialization)
@ -112,9 +111,7 @@ pub unsafe fn extract_user_agent(rdata: *const pjsip_rx_data) -> Option<String>
}
// Find User-Agent header by name
let hdr_name = CString::new("User-Agent").ok()?;
let name = pj_str(hdr_name.as_ptr() as *mut c_char);
let name = super::ffi::pj_str::pj_str_from_cstr(c"User-Agent");
let hdr = pjsip_msg_find_hdr_by_name(msg, &name, ptr::null_mut());
if hdr.is_null() {
return None;
@ -236,58 +233,16 @@ pub unsafe fn extract_digest_auth_from_rdata(
/// Send 401 Unauthorized response with WWW-Authenticate header
pub unsafe fn send_401_challenge(call_id: CallId, www_auth: &str) {
unsafe {
// Create the WWW-Authenticate header
let hdr_name = CString::new("WWW-Authenticate").unwrap();
let hdr_value = CString::new(www_auth).unwrap();
// Create msg_data with the WWW-Authenticate header
let mut msg_data = MaybeUninit::<pjsua_msg_data>::uninit();
pjsua_msg_data_init(msg_data.as_mut_ptr());
let msg_data_ptr = msg_data.assume_init_mut();
// Create a pool for the header
let pool = pjsua_pool_create(c"auth".as_ptr(), 512, 512);
if pool.is_null() {
tracing::error!("Failed to create pool for 401 challenge");
pjsua_call_hangup(*call_id, 500, ptr::null(), ptr::null());
return;
}
// Create the header
let name = pj_str(hdr_name.as_ptr() as *mut c_char);
let value = pj_str(hdr_value.as_ptr() as *mut c_char);
let hdr = pjsip_generic_string_hdr_create(pool, &name, &value);
if !hdr.is_null() {
// Add header to the list using pj_list_insert_before (insert at end of list)
pj_list_insert_before(
&mut msg_data_ptr.hdr_list as *mut _ as *mut pj_list_type,
hdr as *mut pj_list_type,
);
}
// Send 401 response - this will cause pjsua to send the response and then
// the client should retry with Authorization header
let reason = CString::new("Unauthorized").unwrap();
let reason_pj = pj_str(reason.as_ptr() as *mut c_char);
let status = pjsua_call_answer(*call_id, 401, &reason_pj, msg_data_ptr);
if status != pj_constants__PJ_SUCCESS as i32 {
tracing::warn!(
"Failed to send 401 challenge for call {}: {}",
call_id,
status
);
// Hangup if we can't send challenge
if let Err(e) = super::ffi::pj_str::answer_call_with_headers(
*call_id,
401,
c"Unauthorized",
c"auth",
&[(c"WWW-Authenticate", www_auth)],
) {
tracing::warn!("Failed to send 401 challenge for call {}: {}", call_id, e);
pjsua_call_hangup(*call_id, 500, ptr::null(), ptr::null());
}
// DO NOT release the pool here - PJSUA may still need the header data
// after pjsua_call_answer returns. The pool will be cleaned up when
// pjsua is destroyed. This leaks ~512 bytes per 401 challenge but
// prevents use-after-free crashes.
// TODO: Track pools per-call and release them in on_call_state when call ends
}
}
@ -329,59 +284,26 @@ pub unsafe fn send_302_redirect(call_id: CallId, target_domain: &str, extension:
// Create the Contact header: sip:extension@target_domain
let contact_uri = format!("sip:{}@{}", extension, target_domain);
let hdr_name = CString::new("Contact").unwrap();
let hdr_value = CString::new(contact_uri).unwrap();
// Create msg_data with the Contact header
let mut msg_data = MaybeUninit::<pjsua_msg_data>::uninit();
pjsua_msg_data_init(msg_data.as_mut_ptr());
let msg_data_ptr = msg_data.assume_init_mut();
// Create a pool for the header
let pool = pjsua_pool_create(c"redirect".as_ptr(), 512, 512);
if pool.is_null() {
tracing::error!("Failed to create pool for 302 redirect");
pjsua_call_hangup(*call_id, 500, ptr::null(), ptr::null());
return;
match super::ffi::pj_str::answer_call_with_headers(
*call_id,
302,
c"Moved Temporarily",
c"redirect",
&[(c"Contact", contact_uri.as_str())],
) {
Err(e) => {
tracing::warn!("Failed to send 302 redirect for call {}: {}", call_id, e);
pjsua_call_hangup(*call_id, 500, ptr::null(), ptr::null());
}
Ok(()) => {
tracing::info!(
"Sent 302 redirect for call {} to {}",
call_id,
target_domain
);
}
}
// Create the header
let name = pj_str(hdr_name.as_ptr() as *mut c_char);
let value = pj_str(hdr_value.as_ptr() as *mut c_char);
let hdr = pjsip_generic_string_hdr_create(pool, &name, &value);
if !hdr.is_null() {
// Add header to the list using pj_list_insert_before (insert at end of list)
pj_list_insert_before(
&mut msg_data_ptr.hdr_list as *mut _ as *mut pj_list_type,
hdr as *mut pj_list_type,
);
}
// Send 302 response
let reason = CString::new("Moved Temporarily").unwrap();
let reason_pj = pj_str(reason.as_ptr() as *mut c_char);
let status = pjsua_call_answer(*call_id, 302, &reason_pj, msg_data_ptr);
if status != pj_constants__PJ_SUCCESS as i32 {
tracing::warn!(
"Failed to send 302 redirect for call {}: {}",
call_id,
status
);
// Hangup if we can't send redirect
pjsua_call_hangup(*call_id, 500, ptr::null(), ptr::null());
} else {
tracing::info!(
"Sent 302 redirect for call {} to {}",
call_id,
target_domain
);
}
// DO NOT release the pool here - PJSUA may still need the header data
// after pjsua_call_answer returns. Same issue as send_401_challenge.
}
}

View file

@ -0,0 +1,162 @@
//! Typed error types for the SIP transport layer.
//!
//! Three sibling enums cover the three phases of the SIP path:
//!
//! - [`SipInitError`] — startup: pjsua create/init/start, transports, codecs,
//! account registration. One-shot failures that take down the process.
//! - [`SipResponseError`] — building or sending an individual SIP response
//! (401/302/200 etc.) from inside an FFI callback. Per-call, recoverable
//! in the sense that we log and continue.
//! - [`SipAudioError`] — runtime audio plumbing: hooking players into a
//! call's conference port. Surfaces when media isn't ready yet, when a
//! port name has an interior NUL, or when pjsua refuses a connect.
//!
//! [`SipError`] is the umbrella for callers that want to handle any of them
//! uniformly. Conversion is via `#[from]`, so `?` propagation works through
//! the hierarchy without explicit `map_err`.
use std::ffi::NulError;
/// Umbrella error for everything the SIP transport layer can return.
#[derive(thiserror::Error, Debug)]
pub enum SipError {
#[error(transparent)]
Init(#[from] SipInitError),
#[error(transparent)]
Response(#[from] SipResponseError),
#[error(transparent)]
Audio(#[from] SipAudioError),
#[error(transparent)]
Call(#[from] SipCallError),
}
/// Errors raised by outbound-call setup (`make_outbound_call`).
#[derive(thiserror::Error, Debug)]
pub enum SipCallError {
/// A URI / display-name string couldn't be converted to CString because
/// of an interior NUL byte.
#[error("invalid {field} for outbound call: {source}")]
InvalidString {
field: &'static str,
#[source]
source: NulError,
},
/// `pjsua_call_make_call` returned non-success.
#[error("pjsua_call_make_call failed (status {0})")]
MakeCall(i32),
}
/// Errors raised by `init_pjsua`, `create_tls_transport`, `reload_tls_transport`,
/// `process_pjsua_events`, and friends.
#[derive(thiserror::Error, Debug)]
pub enum SipInitError {
/// A pjsua API returned a non-success status code. `operation` names the
/// specific call (`"pjsua_create"`, `"pjsua_init"`, `"pjsua_start"`,
/// `"pjsua_acc_add"`, `"pjsua_set_null_snd_dev"`, `"pjsua_handle_events"`,
/// etc.).
#[error("pjsua {operation} failed (status {status})")]
Pjsua {
operation: &'static str,
status: i32,
},
/// `pjsua_transport_create` failed for `kind` ("UDP", "TCP", or "TLS").
#[error("transport create ({kind}) failed (status {status})")]
TransportCreate {
kind: &'static str,
status: i32,
},
/// A configuration string (host name, URI, etc.) couldn't be converted
/// to a `CString` because of an interior NUL byte.
#[error("invalid {field} string for FFI: {source}")]
InvalidString {
field: &'static str,
#[source]
source: NulError,
},
/// A `Path` to be passed into pjsua wasn't valid UTF-8.
#[error("{field} path is not valid UTF-8")]
NonUtf8Path { field: &'static str },
}
/// Errors raised by audio-port plumbing (`play_audio_to_call_direct`,
/// `start_loop`, `start_test_tone_to_call`, etc.) and the helpers in
/// `frame_utils`.
#[derive(thiserror::Error, Debug)]
pub enum SipAudioError {
/// The call doesn't have a conference port yet — media negotiation is
/// still in progress, or the call has just ended. Caller can retry or
/// drop the audio.
#[error("no conference port for call {call_id} (media not ready yet)")]
NoConfPort { call_id: super::ffi::types::CallId },
/// A port name (used to identify the player in pjsua's mixer) contains
/// an interior NUL.
#[error("invalid port name: {0}")]
InvalidPortName(#[from] NulError),
/// `pjsua_conf_add_port`, `pjsua_conf_connect`, etc. returned non-success.
#[error("pjsua conf {operation} failed (status {status})")]
Pjsua {
operation: &'static str,
status: i32,
},
/// Frame size / port count mismatch between the audio source and the
/// pjsua port.
#[error("frame mismatch: {0}")]
FrameMismatch(String),
/// Failure setting up a streaming player (file read, decoder, etc.).
#[error(transparent)]
Streaming(#[from] crate::services::sound::StreamingError),
}
/// Errors that can occur while building or sending a SIP response.
///
/// Surfaces failures from the pjsua/pjsip FFI surface — CString conversion,
/// pool allocation, header creation, and the final stateless / transactional
/// send. Variants stay coarse-grained because the typical caller is a pjsip
/// callback that can only log and continue.
#[derive(thiserror::Error, Debug)]
pub enum SipResponseError {
/// A runtime string contained an interior NUL byte and could not be
/// converted to `CString`.
#[error("CString conversion failed (interior NUL)")]
CStringNul(#[from] NulError),
/// `pjsua_pool_create` returned null — out of memory or pjsua not
/// initialised.
#[error("pjsua pool allocation failed")]
PoolAlloc,
/// `pjsip_generic_string_hdr_create` returned null.
#[error("pjsip header creation failed")]
HeaderCreate,
/// `pjsua_get_pjsip_endpt` returned null — pjsua not initialised.
#[error("pjsip endpoint is null (pjsua not initialised)")]
EndpointNull,
/// `pjsip_endpt_respond_stateless` returned a non-success pj status code.
#[error("pjsip stateless send failed (status {0})")]
StatelessSend(i32),
/// `pjsip_tsx_create_uas2` returned a non-success pj status code.
#[error("pjsip UAS transaction creation failed (status {0})")]
TsxCreate(i32),
/// `pjsip_endpt_create_response` returned a non-success pj status code.
#[error("pjsip response build failed (status {0})")]
ResponseBuild(i32),
/// `pjsua_call_answer` returned a non-success pj status code.
#[error("pjsua_call_answer failed (status {0})")]
CallAnswer(i32),
}

View file

@ -3,8 +3,8 @@
//! This module provides one-shot audio playback (e.g., join sounds) that
//! bypasses the channel buffer and plays directly to a specific call.
use crate::transport::sip::error::SipAudioError;
use super::types::*;
use anyhow::Result;
use parking_lot::Mutex;
use pjsua::*;
use std::collections::HashMap;
@ -86,7 +86,7 @@ pub unsafe extern "C" fn direct_player_on_destroy(this_port: *mut pjmedia_port)
///
/// This queues the operation to be executed by the audio thread to avoid
/// deadlocks with the audio thread's pjsua_conf_connect/disconnect calls.
pub fn play_audio_to_call_direct(call_id: CallId, samples: &[i16]) -> Result<()> {
pub fn play_audio_to_call_direct(call_id: CallId, samples: &[i16]) -> Result<(), SipAudioError> {
use super::types::{PendingPjsuaOp, queue_pjsua_op};
tracing::debug!(
@ -103,14 +103,17 @@ pub fn play_audio_to_call_direct(call_id: CallId, samples: &[i16]) -> Result<()>
/// Internal implementation of play_audio_to_call_direct
/// Called from the audio thread to actually create and connect the player
pub fn play_audio_to_call_direct_internal(call_id: CallId, samples: &[i16]) -> Result<()> {
pub fn play_audio_to_call_direct_internal(
call_id: CallId,
samples: &[i16],
) -> Result<(), SipAudioError> {
use super::frame_utils::{PortCallbacks, create_and_connect_port};
// Get call's conference port
let call_conf_port = CALL_CONF_PORTS
.get()
.and_then(|p| p.get(&call_id).map(|r| *r))
.ok_or_else(|| anyhow::anyhow!("No conf_port for call {}", call_id))?;
.ok_or(SipAudioError::NoConfPort { call_id })?;
// Store samples in the player state BEFORE creating port (get_frame needs them)
// We'll clean up if port creation fails

View file

@ -3,11 +3,11 @@
//! Provides common helpers for filling audio frames and a shared no-op
//! put_frame callback used by ports that only produce audio.
use crate::transport::sip::error::SipAudioError;
use super::types::{
CONF_CHANNELS, CONF_MASTER_PORT, CONF_SAMPLE_RATE, CallId, ConfPort, SAMPLES_PER_FRAME,
SendablePool,
};
use anyhow::Result;
use parking_lot::Mutex;
use pjsua::*;
use std::sync::OnceLock;
@ -120,7 +120,7 @@ pub unsafe fn create_and_connect_port(
signature: u32,
callbacks: PortCallbacks,
call_conf_port: ConfPort,
) -> Result<ConfPortGuard> {
) -> Result<ConfPortGuard, SipAudioError> {
// Get or create the memory pool
let pool = pool.get_or_init(|| {
let p = unsafe { pjsua_pool_create(pool_name.as_ptr() as *const _, 4096, 4096) };
@ -132,18 +132,16 @@ pub unsafe fn create_and_connect_port(
let port_size = std::mem::size_of::<pjmedia_port>();
let port = unsafe { pj_pool_alloc(pool_ptr, port_size) as *mut pjmedia_port };
if port.is_null() {
anyhow::bail!(
"Failed to allocate {} port for call {}",
name_prefix,
call_id
);
return Err(SipAudioError::FrameMismatch(format!(
"failed to allocate {} port for call {}",
name_prefix, call_id
)));
}
unsafe { std::ptr::write_bytes(port as *mut u8, 0, port_size) };
// Create port name
let port_name = format!("{}{}", name_prefix, call_id);
let port_name_cstr = std::ffi::CString::new(port_name)
.map_err(|e| anyhow::anyhow!("Invalid port name: {}", e))?;
let port_name_cstr = std::ffi::CString::new(port_name)?;
// Initialize port info
unsafe {
@ -167,21 +165,30 @@ pub unsafe fn create_and_connect_port(
let mut player_slot: i32 = 0;
let status = unsafe { pjsua_conf_add_port(pool_ptr, port, &mut player_slot) };
if status != pj_constants__PJ_SUCCESS as i32 {
anyhow::bail!("Failed to add {} port to conf: {}", name_prefix, status);
return Err(SipAudioError::Pjsua {
operation: "pjsua_conf_add_port",
status,
});
}
// Connect player port to the target call's port
let conf = unsafe { get_conference_bridge() };
let Some(conf) = conf else {
unsafe { pjsua_conf_remove_port(player_slot) };
anyhow::bail!("Failed to get conference bridge for {} port", name_prefix);
return Err(SipAudioError::FrameMismatch(format!(
"no conference bridge available for {} port",
name_prefix
)));
};
let status =
unsafe { pjmedia_conf_connect_port(conf, player_slot as u32, *call_conf_port as u32, 0) };
if status != pj_constants__PJ_SUCCESS as i32 {
unsafe { pjsua_conf_remove_port(player_slot) };
anyhow::bail!("Failed to connect {} port to call: {}", name_prefix, status);
return Err(SipAudioError::Pjsua {
operation: "pjmedia_conf_connect_port",
status,
});
}
Ok(ConfPortGuard {

View file

@ -5,7 +5,7 @@
//! - TLS transport creation and hot-reload
//! - Shutdown and thread registration
use super::super::audio_thread::stop_audio_thread;
use crate::transport::sip::audio_thread::stop_audio_thread;
use std::fmt;
/// SIP invite session state (Rust wrapper for pjsip_inv_state)
@ -50,22 +50,23 @@ impl fmt::Display for InvState {
}
}
}
use super::super::callbacks::{
use crate::transport::sip::callbacks::{
on_call_media_state_cb, on_call_rx_reinvite_cb, on_call_state_cb, on_dtmf_digit_cb,
on_incoming_call_cb,
};
use super::super::nat::{
use crate::transport::sip::nat::{
on_rx_request_nat_fixup_cb, on_rx_response_nat_fixup_cb, on_tx_request_cb, on_tx_response_cb,
};
use super::super::register_handler::on_rx_request_cb;
use crate::transport::sip::error::SipInitError;
use crate::transport::sip::register_handler::on_rx_request_cb;
use super::pj_str;
use super::types::*;
use crate::config::{SipConfig, TlsConfig};
use anyhow::{Context, Result};
use ipnet::Ipv4Net;
use parking_lot::Mutex;
use pjsua::*;
use std::collections::BTreeMap;
use std::ffi::CString;
use std::ffi::{CStr, CString};
use std::mem::MaybeUninit;
use std::net::IpAddr;
use std::os::raw::{c_char, c_int};
@ -170,7 +171,7 @@ fn extract_packet_source(msg: &str) -> Option<&str> {
let space = rest.find(' ')?;
let after_transport = &rest[space + 1..];
let end = after_transport
.find(|c: char| c == ' ' || c == '\t')
.find([' ', '\t'])
.unwrap_or(after_transport.len());
Some(&after_transport[..end])
}
@ -312,7 +313,10 @@ pub fn set_callbacks(handlers: CallbackHandlers) {
}
/// Initialize pjsua with optional TLS support
pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result<()> {
pub fn init_pjsua(
config: &SipConfig,
tls_config: Option<&TlsConfig>,
) -> Result<(), SipInitError> {
// Initialize public host config for Contact header rewriting on outgoing responses.
// pjsua derives Contact from the TCP connection's local address (private IP), but
// external clients need the public hostname to route BYE back to us.
@ -352,7 +356,10 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result<
// Create pjsua instance
let status = pjsua_create();
if status != pj_constants__PJ_SUCCESS as i32 {
anyhow::bail!("Failed to create pjsua: {}", status);
return Err(SipInitError::Pjsua {
operation: "pjsua_create",
status,
});
}
// Disable automatic UDP->TCP switch for large SIP messages.
@ -421,7 +428,10 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result<
// Initialize pjsua
let status = pjsua_init(cfg_ptr, log_cfg_ptr, media_cfg_ptr);
if status != pj_constants__PJ_SUCCESS as i32 {
anyhow::bail!("Failed to init pjsua: {}", status);
return Err(SipInitError::Pjsua {
operation: "pjsua_init",
status,
});
}
// Create UDP transport
@ -432,7 +442,12 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result<
// Set public address if specified - keep CString alive until transport is created
let public_host_cstring = if !config.public_host.is_empty() {
let host = CString::new(config.public_host.as_str()).context("Invalid public host")?;
let host = CString::new(config.public_host.as_str()).map_err(|source| {
SipInitError::InvalidString {
field: "public_host",
source,
}
})?;
t_cfg_ptr.public_addr = pj_str(host.as_ptr() as *mut c_char);
Some(host)
} else {
@ -450,7 +465,10 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result<
drop(public_host_cstring);
if status != pj_constants__PJ_SUCCESS as i32 {
anyhow::bail!("Failed to create UDP transport: {}", status);
return Err(SipInitError::TransportCreate {
kind: "UDP",
status,
});
}
// Create TCP transport on the same port
@ -461,8 +479,12 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result<
// Set public address for TCP - keep CString alive
let tcp_public_host_cstring = if !config.public_host.is_empty() {
let host =
CString::new(config.public_host.as_str()).context("Invalid public host for TCP")?;
let host = CString::new(config.public_host.as_str()).map_err(|source| {
SipInitError::InvalidString {
field: "public_host (TCP)",
source,
}
})?;
tcp_cfg_ptr.public_addr = pj_str(host.as_ptr() as *mut c_char);
Some(host)
} else {
@ -479,7 +501,10 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result<
drop(tcp_public_host_cstring);
if status != pj_constants__PJ_SUCCESS as i32 {
anyhow::bail!("Failed to create TCP transport: {}", status);
return Err(SipInitError::TransportCreate {
kind: "TCP",
status,
});
}
tracing::info!("TCP transport created on port {}", config.port);
@ -494,7 +519,10 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result<
// Start pjsua
let status = pjsua_start();
if status != pj_constants__PJ_SUCCESS as i32 {
anyhow::bail!("Failed to start pjsua: {}", status);
return Err(SipInitError::Pjsua {
operation: "pjsua_start",
status,
});
}
// Configure codec priorities to keep INVITE SDP small.
@ -507,27 +535,27 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result<
// ordered by quality (highest priority = preferred in SDP negotiation).
{
// Disable all audio codecs first
let all = CString::new("*").unwrap();
pjsua_codec_set_priority(&pj_str(all.as_ptr() as *mut c_char), 0);
pjsua_codec_set_priority(&pj_str::pj_str_from_cstr(c"*"), 0);
// Re-enable desired codecs (highest priority = preferred in negotiation).
// NOTE: G722 is registered internally at 16000Hz in PJSIP despite the
// RFC 3551 SDP convention of advertising clock_rate=8000.
let codecs: &[(&str, u8)] = &[
("opus/48000", 255), // Best quality: adaptive, wideband/fullband
("G722/16000", 254), // Wideband 16kHz, widely supported
("AMR/8000", 252), // Adaptive narrowband
("PCMU/8000", 200), // G.711 mu-law, ubiquitous fallback
("PCMA/8000", 199), // G.711 A-law, ubiquitous fallback
("telephone-event", 200), // DTMF support (all sample rates)
// Codec IDs are static — use &CStr literals so neither CString allocation
// nor an NUL check is needed.
let codecs: &[(&CStr, u8)] = &[
(c"opus/48000", 255), // Best quality: adaptive, wideband/fullband
(c"G722/16000", 254), // Wideband 16kHz, widely supported
(c"AMR/8000", 252), // Adaptive narrowband
(c"PCMU/8000", 200), // G.711 mu-law, ubiquitous fallback
(c"PCMA/8000", 199), // G.711 A-law, ubiquitous fallback
(c"telephone-event", 200), // DTMF support (all sample rates)
];
for (name, priority) in codecs {
let codec_id = CString::new(*name).unwrap();
let status =
pjsua_codec_set_priority(&pj_str(codec_id.as_ptr() as *mut c_char), *priority);
pjsua_codec_set_priority(&pj_str::pj_str_from_cstr(name), *priority);
if status != pj_constants__PJ_SUCCESS as i32 {
tracing::warn!("Failed to set codec priority for {}: {}", name, status);
tracing::warn!("Failed to set codec priority for {:?}: {}", name, status);
}
}
@ -535,7 +563,7 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result<
"Codec priorities configured: {}",
codecs
.iter()
.map(|(n, p)| format!("{}={}", n, p))
.map(|(n, p)| format!("{}={}", n.to_string_lossy(), p))
.collect::<Vec<_>>()
.join(", ")
);
@ -579,7 +607,7 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result<
tracing::info!("Registered REGISTER handler module");
// Store the module pointer so register_handler can create
// UAS transactions for deferred REGISTER responses.
super::super::register_handler::set_register_module_ptr(&raw mut REGISTER_MODULE);
crate::transport::sip::register_handler::set_register_module_ptr(&raw mut REGISTER_MODULE);
}
} else {
tracing::warn!("Could not get PJSIP endpoint for module registration");
@ -633,7 +661,10 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result<
// This allows us to manually control audio I/O
let master_port = pjsua_set_no_snd_dev();
if master_port.is_null() {
anyhow::bail!("Failed to set null sound device");
return Err(SipInitError::Pjsua {
operation: "pjsua_set_no_snd_dev",
status: -1, // pjsua returned null pointer rather than a status code
});
}
// Verify the master port's actual sample rate
@ -682,8 +713,12 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result<
let acc_cfg_ptr = acc_cfg.assume_init_mut();
// Local account ID - keep CString alive until account is added
let local_uri = CString::new(format!("sip:sipcord@{}", config.public_host))
.context("Invalid local URI")?;
let local_uri = CString::new(format!("sip:sipcord@{}", config.public_host)).map_err(
|source| SipInitError::InvalidString {
field: "local account URI",
source,
},
)?;
acc_cfg_ptr.id = pj_str(local_uri.as_ptr() as *mut c_char);
// Enable incoming calls without registration
@ -710,7 +745,12 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result<
// Set public IP for RTP if configured - this is advertised in SDP c= line
// Without this, pjsua uses the local interface IP which won't work for NAT
let rtp_public_ip_cstring = if let Some(ref public_ip) = config.rtp_public_ip {
let ip_cstr = CString::new(public_ip.as_str()).context("Invalid RTP public IP")?;
let ip_cstr = CString::new(public_ip.as_str()).map_err(|source| {
SipInitError::InvalidString {
field: "rtp_public_ip",
source,
}
})?;
acc_cfg_ptr.rtp_cfg.public_addr = pj_str(ip_cstr.as_ptr() as *mut c_char);
tracing::info!(
"Account RTP config: port={}, port_range={} (ports {}-{}), public_addr={}",
@ -743,7 +783,10 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result<
drop(rtp_public_ip_cstring);
if status != pj_constants__PJ_SUCCESS as i32 {
anyhow::bail!("Failed to add account: {}", status);
return Err(SipInitError::Pjsua {
operation: "pjsua_acc_add",
status,
});
}
Ok(())
@ -752,7 +795,10 @@ pub fn init_pjsua(config: &SipConfig, tls_config: Option<&TlsConfig>) -> Result<
/// Create TLS transport for SIP-over-TLS
/// Returns Ok(true) if created, Ok(false) if skipped due to missing certs
fn create_tls_transport(tls_config: &TlsConfig, public_host: &str) -> Result<bool> {
fn create_tls_transport(
tls_config: &TlsConfig,
public_host: &str,
) -> Result<bool, SipInitError> {
// Check cert files exist before doing anything
let cert_path = tls_config.cert_path();
let key_path = tls_config.key_path();
@ -784,13 +830,29 @@ fn create_tls_transport(tls_config: &TlsConfig, public_host: &str) -> Result<boo
t_cfg_ptr.port = tls_config.port as u32;
// Set public address
let public_host_cstring = CString::new(public_host).context("Invalid public host")?;
let public_host_cstring =
CString::new(public_host).map_err(|source| SipInitError::InvalidString {
field: "TLS public_host",
source,
})?;
t_cfg_ptr.public_addr = pj_str(public_host_cstring.as_ptr() as *mut c_char);
let cert_path_str = cert_path.to_str().ok_or(SipInitError::NonUtf8Path {
field: "TLS cert",
})?;
let key_path_str = key_path.to_str().ok_or(SipInitError::NonUtf8Path {
field: "TLS key",
})?;
let cert_path_cstring =
CString::new(cert_path.to_str().unwrap()).context("Invalid cert path")?;
CString::new(cert_path_str).map_err(|source| SipInitError::InvalidString {
field: "TLS cert path",
source,
})?;
let key_path_cstring =
CString::new(key_path.to_str().unwrap()).context("Invalid key path")?;
CString::new(key_path_str).map_err(|source| SipInitError::InvalidString {
field: "TLS key path",
source,
})?;
// Set certificate and key
t_cfg_ptr.tls_setting.cert_file = pj_str(cert_path_cstring.as_ptr() as *mut c_char);
@ -813,7 +875,10 @@ fn create_tls_transport(tls_config: &TlsConfig, public_host: &str) -> Result<boo
drop(key_path_cstring);
if status != pj_constants__PJ_SUCCESS as i32 {
anyhow::bail!("Failed to create TLS transport: {}", status);
return Err(SipInitError::TransportCreate {
kind: "TLS",
status,
});
}
// Store transport ID for potential reload
@ -834,7 +899,10 @@ fn create_tls_transport(tls_config: &TlsConfig, public_host: &str) -> Result<boo
///
/// This should only be called when there are no active calls.
/// Returns Ok(true) if reload/create was successful, Ok(false) if skipped (certs missing or calls active).
pub fn reload_tls_transport(tls_config: &TlsConfig, public_host: &str) -> Result<bool> {
pub fn reload_tls_transport(
tls_config: &TlsConfig,
public_host: &str,
) -> Result<bool, SipInitError> {
// Check active calls - don't reload if calls are active
let active_calls = COUNTED_CALL_IDS
.get()
@ -899,7 +967,7 @@ pub fn active_media_call_count() -> usize {
}
/// Process pjsua events (call from event loop)
pub fn process_pjsua_events(timeout_ms: u32) -> Result<()> {
pub fn process_pjsua_events(timeout_ms: u32) -> Result<(), SipInitError> {
unsafe {
pj_thread_sleep(timeout_ms);
}
@ -968,10 +1036,7 @@ pub fn send_183_session_progress(call_id: CallId) {
);
}
// Create reason string
let reason = CString::new("Session Progress").unwrap();
let reason_pj = pj_str(reason.as_ptr() as *mut c_char);
let reason_pj = pj_str::pj_str_from_cstr(c"Session Progress");
let status = pjsua_call_answer(*call_id, 183, &reason_pj, ptr::null());
if status != pj_constants__PJ_SUCCESS as i32 {
tracing::warn!("Failed to send 183 for call {}: status={}", call_id, status);

View file

@ -4,7 +4,7 @@
//! Used for the "connecting" sound during call setup (183 Session Progress).
use super::types::*;
use anyhow::Result;
use crate::transport::sip::error::SipAudioError;
use parking_lot::Mutex;
use pjsua::*;
use std::collections::HashMap;
@ -118,7 +118,7 @@ pub unsafe extern "C" fn looping_player_on_destroy(this_port: *mut pjmedia_port)
///
/// Creates a pjmedia_port that loops the given samples and connects it to the call.
/// The loop continues until stop_loop is called.
pub fn start_loop(call_id: CallId, samples: Vec<i16>) -> Result<()> {
pub fn start_loop(call_id: CallId, samples: Vec<i16>) -> Result<(), SipAudioError> {
use super::frame_utils::{PortCallbacks, create_and_connect_port};
// Check if already looping for this call
@ -134,9 +134,7 @@ pub fn start_loop(call_id: CallId, samples: Vec<i16>) -> Result<()> {
let call_conf_port = CALL_CONF_PORTS
.get()
.and_then(|p| p.get(&call_id).map(|r| *r))
.ok_or_else(|| {
anyhow::anyhow!("No conf_port for call {} - media not ready yet", call_id)
})?;
.ok_or(SipAudioError::NoConfPort { call_id })?;
let guard = unsafe {
let callbacks = PortCallbacks {

View file

@ -19,6 +19,7 @@ pub(super) mod direct_player;
pub(crate) mod frame_utils;
pub(super) mod init;
pub(super) mod looping_player;
pub(super) mod pj_str;
pub(super) mod streaming_player;
pub(super) mod test_tone;
pub mod types;

View file

@ -0,0 +1,211 @@
//! Safe(r) helpers around the pjsua/pjsip C-string and header-building idioms
//! that recur across the SIP transport layer.
//!
//! Before this module existed, every callback that built a SIP header
//! re-implemented the same pattern:
//!
//! ```ignore
//! let name = CString::new("Contact").unwrap();
//! let value = CString::new(runtime_str).unwrap();
//! let name_pj = pj_str(name.as_ptr() as *mut c_char);
//! let value_pj = pj_str(value.as_ptr() as *mut c_char);
//! let hdr = pjsip_generic_string_hdr_create(pool, &name_pj, &value_pj);
//! // ...
//! ```
//!
//! That sprouted two unwraps per header (so any header value containing a NUL
//! byte from upstream data would panic), repeated lifetime traps, and zero
//! shared failure handling. The helpers in this module turn those calls into
//! a single fallible call returning [`SipResponseError`].
use crate::transport::sip::error::SipResponseError;
use pjsua::*;
use std::ffi::{CStr, CString};
use std::os::raw::c_char;
use std::ptr;
/// Convert a [`CStr`] (typically a `c"..."` literal) into a [`pj_str_t`].
///
/// Zero-cost — `pj_str` just wraps the pointer and length. The caller must
/// keep the `CStr` alive for the `pj_str_t`'s usage window. For `&'static`
/// literals (the common case) that's trivially satisfied.
#[inline]
pub unsafe fn pj_str_from_cstr(s: &CStr) -> pj_str_t {
unsafe { pj_str(s.as_ptr() as *mut c_char) }
}
// A `pj_str_owned(&str) -> Result<(CString, pj_str_t), _>` helper was considered
// but turned out unused: every runtime-string call site in this codebase ends
// up either inside `make_string_hdr` (which does the conversion internally) or
// in a function that already chains `CString::new(...).context("...")?` for a
// site-specific error message. Add it back if a true caller appears.
/// Initialise a `pjsip_hdr` as an empty list head (equivalent to the
/// `pj_list_init` C macro).
#[inline]
pub unsafe fn pj_list_init_hdr(hdr: *mut pjsip_hdr) {
unsafe {
(*hdr).next = hdr as *mut _;
(*hdr).prev = hdr as *mut _;
}
}
/// Create a generic string header in `pool`.
///
/// `name` is a static `CStr` (use `c"Contact"` etc); `value` is a runtime
/// string that gets converted to a `CString` and duplicated into the pool
/// by pjsip. The temporary `CString` is dropped before this returns;
/// `pjsip_generic_string_hdr_create` uses `pj_strdup` internally to copy
/// the bytes.
pub unsafe fn make_string_hdr(
pool: *mut pj_pool_t,
name: &CStr,
value: &str,
) -> Result<*mut pjsip_generic_string_hdr, SipResponseError> {
unsafe {
let value_c = CString::new(value)?;
let name_pj = pj_str_from_cstr(name);
let value_pj = pj_str(value_c.as_ptr() as *mut c_char);
let hdr = pjsip_generic_string_hdr_create(pool, &name_pj, &value_pj);
if hdr.is_null() {
return Err(SipResponseError::HeaderCreate);
}
Ok(hdr)
}
}
/// Append a generic string header onto the message buffer in `tdata`,
/// allocating from the tdata's own pool.
pub unsafe fn append_tdata_hdr(
tdata: *mut pjsip_tx_data,
name: &CStr,
value: &str,
) -> Result<(), SipResponseError> {
unsafe {
let hdr = make_string_hdr((*tdata).pool, name, value)?;
pj_list_insert_before(
&mut (*(*tdata).msg).hdr as *mut pjsip_hdr as *mut pj_list_type,
hdr as *mut pj_list_type,
);
Ok(())
}
}
/// Answer a pjsua call with N custom headers attached to the response.
///
/// Wraps the recurring `pjsua_msg_data_init` + pool + header build +
/// `pjsua_call_answer` dance used in 401 / 302 / 4xx code paths.
///
/// **The pool is intentionally NOT released.** pjsua may continue to reference
/// the header data after `pjsua_call_answer` returns; releasing the pool here
/// triggers use-after-free. Each call leaks ~512 bytes that's reclaimed when
/// pjsua shuts down. (Tracking pools per-call and releasing them on call-end
/// would be a cleaner fix; not in scope here.)
///
/// On error, the caller typically follows up with `pjsua_call_hangup` — this
/// helper does not hang up on its own so the caller can choose the strategy.
pub unsafe fn answer_call_with_headers(
call_id: i32,
status_code: u32,
reason: &CStr,
pool_name: &CStr,
headers: &[(&CStr, &str)],
) -> Result<(), SipResponseError> {
unsafe {
let mut msg_data = std::mem::MaybeUninit::<pjsua_msg_data>::uninit();
pjsua_msg_data_init(msg_data.as_mut_ptr());
let msg_data_ptr = msg_data.assume_init_mut();
let pool = pjsua_pool_create(pool_name.as_ptr(), 512, 512);
if pool.is_null() {
return Err(SipResponseError::PoolAlloc);
}
// Intentionally leaked — see doc comment above.
for (name, value) in headers {
let hdr = make_string_hdr(pool, name, value)?;
pj_list_insert_before(
&mut msg_data_ptr.hdr_list as *mut _ as *mut pj_list_type,
hdr as *mut pj_list_type,
);
}
let reason_pj = pj_str_from_cstr(reason);
let status = pjsua_call_answer(call_id, status_code, &reason_pj, msg_data_ptr);
if status != pj_constants__PJ_SUCCESS as i32 {
return Err(SipResponseError::CallAnswer(status));
}
Ok(())
}
}
/// Send a stateless SIP response with N string headers.
///
/// Wraps the recurring `pjsua_pool_create` → list-head alloc → header
/// build → `pjsip_endpt_respond_stateless` → `pj_pool_release` dance. Each
/// header in `headers` is a `(name, value)` pair where `name` is typically
/// a `c"..."` literal and `value` is any runtime string.
///
/// `reason` is the SIP reason phrase (e.g. `Some(c"Unauthorized")`) or
/// `None` to let pjsip pick the default for `status_code`.
pub unsafe fn respond_stateless_with_headers(
rdata: *mut pjsip_rx_data,
status_code: u16,
reason: Option<&CStr>,
headers: &[(&CStr, &str)],
) -> Result<(), SipResponseError> {
unsafe {
let endpt = pjsua_get_pjsip_endpt();
if endpt.is_null() {
return Err(SipResponseError::EndpointNull);
}
let pool = pjsua_pool_create(c"sip_resp".as_ptr(), 1024, 1024);
if pool.is_null() {
return Err(SipResponseError::PoolAlloc);
}
// Belt-and-braces: ensure the pool is released even if a step
// between here and the send returns Err via `?`.
let result =
(|| -> Result<i32, SipResponseError> {
let hdr_list =
pj_pool_alloc(pool, std::mem::size_of::<pjsip_hdr>()) as *mut pjsip_hdr;
if hdr_list.is_null() {
return Err(SipResponseError::PoolAlloc);
}
pj_list_init_hdr(hdr_list);
for (name, value) in headers {
let hdr = make_string_hdr(pool, name, value)?;
pj_list_insert_before(
hdr_list as *mut pj_list_type,
hdr as *mut pj_list_type,
);
}
let reason_pj = reason.map(|r| pj_str_from_cstr(r));
let reason_ptr = reason_pj
.as_ref()
.map(|r| r as *const pj_str_t)
.unwrap_or(ptr::null());
Ok(pjsip_endpt_respond_stateless(
endpt,
rdata,
status_code.into(),
reason_ptr,
hdr_list,
ptr::null(),
))
})();
pj_pool_release(pool);
match result {
Ok(status) if status == pj_constants__PJ_SUCCESS as i32 => Ok(()),
Ok(status) => Err(SipResponseError::StatelessSend(status)),
Err(e) => Err(e),
}
}
}

View file

@ -19,7 +19,7 @@
use super::types::*;
use crate::services::sound::StreamingPlayer;
use anyhow::Result;
use crate::transport::sip::error::SipAudioError;
use parking_lot::Mutex;
use pjsua::*;
use std::collections::HashMap;
@ -152,19 +152,16 @@ pub fn start_streaming_to_call(
call_id: CallId,
path: &Path,
hangup_on_complete: bool,
) -> Result<()> {
) -> Result<(), SipAudioError> {
use super::frame_utils::{PortCallbacks, create_and_connect_port};
// Create the streaming player
let player = StreamingPlayer::new(path)?;
// Get call's conference port
let call_conf_port = CALL_CONF_PORTS
.get()
.and_then(|p| p.get(&call_id).map(|r| *r))
.ok_or_else(|| {
anyhow::anyhow!("No conf_port for call {} - media not ready yet", call_id)
})?;
.ok_or(SipAudioError::NoConfPort { call_id })?;
let guard = unsafe {
let callbacks = PortCallbacks {

View file

@ -5,7 +5,7 @@
use super::streaming_player::STREAMING_PLAYER_POOL;
use super::types::*;
use anyhow::Result;
use crate::transport::sip::error::SipAudioError;
use parking_lot::Mutex;
use pjsua::*;
use std::collections::HashMap;
@ -129,16 +129,14 @@ pub unsafe extern "C" fn test_tone_on_destroy(this_port: *mut pjmedia_port) -> p
/// Start playing a 440Hz test tone to a call
///
/// The tone plays indefinitely until the caller hangs up. No automatic hangup.
pub fn start_test_tone_to_call(call_id: CallId) -> Result<()> {
pub fn start_test_tone_to_call(call_id: CallId) -> Result<(), SipAudioError> {
use super::frame_utils::{PortCallbacks, create_and_connect_port};
// Get call's conference port
let call_conf_port = CALL_CONF_PORTS
.get()
.and_then(|p| p.get(&call_id).map(|r| *r))
.ok_or_else(|| {
anyhow::anyhow!("No conf_port for call {} - media not ready yet", call_id)
})?;
.ok_or(SipAudioError::NoConfPort { call_id })?;
let guard = unsafe {
let callbacks = PortCallbacks {

View file

@ -327,7 +327,7 @@ pub static TLS_RELOAD_PENDING: AtomicBool = AtomicBool::new(false);
pub static CALL_RTP_ACTIVITY: OnceLock<Mutex<HashMap<CallId, (u64, Instant)>>> = OnceLock::new();
/// Event sender for timeout events (set during callback setup)
pub static TIMEOUT_EVENT_TX: OnceLock<Mutex<Option<Sender<super::super::SipEvent>>>> =
pub static TIMEOUT_EVENT_TX: OnceLock<Mutex<Option<Sender<crate::transport::sip::SipEvent>>>> =
OnceLock::new();
// Per-channel audio isolation statics

View file

@ -3,6 +3,7 @@ pub mod ffi;
mod audio_thread;
mod callbacks;
mod channel_audio;
pub mod error;
pub mod fork_group;
mod nat;
mod register_handler;
@ -24,7 +25,7 @@ pub use register_handler::{PendingRegisterTsx, set_register_event_sender, set_si
use crate::config::{SipConfig, TlsConfig};
use crate::transport::discord::send_audio_to_discord_direct;
use anyhow::Result;
use crate::transport::sip::error::{SipCallError, SipError, SipInitError};
use crossbeam_channel::{Receiver, Sender, bounded};
use dashmap::DashMap;
use parking_lot::RwLock;
@ -178,7 +179,7 @@ impl SipTransport {
}
/// Start the SIP transport
pub async fn run(&self) -> Result<()> {
pub async fn run(&self) -> Result<(), SipError> {
info!(
"Starting SIP server on {}:{}",
self.config.public_host, self.config.port
@ -206,7 +207,11 @@ impl SipTransport {
}
});
pjsua_handle.await?;
// JoinError -> log only; pjsua loop errors are already logged inside the
// spawned task.
if let Err(e) = pjsua_handle.await {
tracing::error!("pjsua event loop join error: {}", e);
}
Ok(())
}
}
@ -222,7 +227,7 @@ fn run_pjsua_loop(
event_tx: Sender<SipEvent>,
initialized: Arc<RwLock<bool>>,
command_rx: Receiver<SipCommand>,
) -> Result<()> {
) -> Result<(), SipInitError> {
// Initialize pjsua with optional TLS
init_pjsua(&config, tls_config.as_ref())?;
*initialized.write() = true;
@ -449,15 +454,27 @@ fn process_sip_command(cmd: SipCommand, calls: &Arc<DashMap<CallId, CallState>>)
}
if auth_ok {
use register_handler::append_tdata_hdr;
append_tdata_hdr(tdata, c"Expires", &pending.expires.to_string());
use self::ffi::pj_str::append_tdata_hdr;
if let Err(e) =
append_tdata_hdr(tdata, c"Expires", &pending.expires.to_string())
{
tracing::warn!(
"deferred REGISTER 200 OK: failed to append Expires header: {}",
e
);
}
// RFC 3261 §10.3: echo the client's binding back as Contact.
// Required for strict clients like 3CX to accept registration.
if let Some(ref uri) = pending.contact_uri {
append_tdata_hdr(
if let Some(ref uri) = pending.contact_uri
&& let Err(e) = append_tdata_hdr(
tdata,
c"Contact",
&format!("<{}>;expires={}", uri, pending.expires),
)
{
tracing::warn!(
"deferred REGISTER 200 OK: failed to append Contact header ({}); strict clients may reject",
e
);
}
} else {
@ -506,9 +523,16 @@ pub fn remove_outbound_tracking(call_id: CallId) -> Option<String> {
///
/// If `caller_display_name` is provided, it sets the From header display name
/// to show who initiated the call from Discord (e.g., "Discord: username").
fn make_outbound_call(sip_uri: &str, caller_display_name: Option<&str>) -> Result<CallId, String> {
fn make_outbound_call(
sip_uri: &str,
caller_display_name: Option<&str>,
) -> Result<CallId, SipCallError> {
unsafe {
let uri = std::ffi::CString::new(sip_uri).map_err(|e| e.to_string())?;
let uri =
std::ffi::CString::new(sip_uri).map_err(|source| SipCallError::InvalidString {
field: "sip_uri",
source,
})?;
let mut call_id: ::pjsua::pjsua_call_id = -1;
// Explicit call settings: audio only, no video, no T.140 text.
@ -551,7 +575,11 @@ fn make_outbound_call(sip_uri: &str, caller_display_name: Option<&str>) -> Resul
.take(64)
.collect();
let from_uri = format!("\"{}\" <{}>", sanitized, acc_uri);
from_uri_cstring = std::ffi::CString::new(from_uri).map_err(|e| e.to_string())?;
from_uri_cstring =
std::ffi::CString::new(from_uri).map_err(|source| SipCallError::InvalidString {
field: "caller_display_name",
source,
})?;
msg_data_ptr.local_uri =
::pjsua::pj_str(from_uri_cstring.as_ptr() as *mut std::os::raw::c_char);
}
@ -566,7 +594,7 @@ fn make_outbound_call(sip_uri: &str, caller_display_name: Option<&str>) -> Resul
);
if status != ::pjsua::pj_constants__PJ_SUCCESS as i32 {
return Err(format!("pjsua_call_make_call failed: {}", status));
return Err(SipCallError::MakeCall(status));
}
Ok(CallId::new(call_id))

View file

@ -7,12 +7,13 @@
use super::callbacks::{
extract_digest_auth_from_rdata, extract_source_ip, extract_user_agent, is_sipvicious_scanner,
};
use super::error::SipResponseError;
use super::ffi::pj_str::respond_stateless_with_headers;
use super::ffi::types::*;
use super::ffi::utils::pj_str_to_string;
use pjsua::*;
use std::ffi::{CStr, CString};
use std::ffi::CStr;
use std::net::SocketAddr;
use std::os::raw::c_char;
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
@ -74,69 +75,19 @@ pub fn set_register_module_ptr(ptr: *mut pjsip_module) {
// Helpers
/// Initialize a pjsip_hdr as a list head (equivalent to pj_list_init C macro).
#[inline]
unsafe fn pj_list_init_hdr(hdr: *mut pjsip_hdr) {
/// Send a stateless SIP response with a status code and reason phrase but no
/// extra headers. Logs (and otherwise swallows) any pjsip failure — these
/// responses are best-effort from inside an FFI callback.
unsafe fn send_simple_response(rdata: *mut pjsip_rx_data, status_code: u16, reason: &CStr) {
unsafe {
(*hdr).next = hdr as *mut _;
(*hdr).prev = hdr as *mut _;
}
}
/// Create a generic string header in `pool`. Returns null on failure (alloc or
/// interior-NUL in `value`). pjsip duplicates name/value into `pool`, so the
/// caller's CStrings can be dropped immediately after this returns.
#[inline]
unsafe fn make_string_hdr(
pool: *mut pj_pool_t,
name: &CStr,
value: &str,
) -> *mut pjsip_generic_string_hdr {
unsafe {
let Ok(value_c) = CString::new(value) else {
return ptr::null_mut();
};
let name_pj = pj_str(name.as_ptr() as *mut c_char);
let value_pj = pj_str(value_c.as_ptr() as *mut c_char);
pjsip_generic_string_hdr_create(pool, &name_pj, &value_pj)
}
}
/// Append a generic string header onto the message buffer in `tdata`,
/// allocating from the tdata's own pool. Returns false on failure.
#[inline]
pub(super) unsafe fn append_tdata_hdr(
tdata: *mut pjsip_tx_data,
name: &CStr,
value: &str,
) -> bool {
unsafe {
let hdr = make_string_hdr((*tdata).pool, name, value);
if hdr.is_null() {
return false;
}
pj_list_insert_before(
&mut (*(*tdata).msg).hdr as *mut pjsip_hdr as *mut pj_list_type,
hdr as *mut pj_list_type,
);
true
}
}
/// Send a simple stateless SIP response (no custom headers).
unsafe fn send_simple_response(rdata: *mut pjsip_rx_data, status_code: u16, reason: &str) {
unsafe {
let endpt = pjsua_get_pjsip_endpt();
if !endpt.is_null() {
let reason_cstr = CString::new(reason).unwrap();
let reason_pj = pj_str(reason_cstr.as_ptr() as *mut c_char);
pjsip_endpt_respond_stateless(
endpt,
rdata,
status_code.into(),
&reason_pj,
ptr::null(),
ptr::null(),
if let Err(e) =
respond_stateless_with_headers(rdata, status_code, Some(reason), &[])
{
tracing::warn!(
"Failed to respond {} {:?} to SIP request: {}",
status_code,
reason,
e
);
}
}
@ -148,67 +99,30 @@ unsafe fn send_simple_response(rdata: *mut pjsip_rx_data, status_code: u16, reas
/// client's current bindings via Contact header(s). Strict clients like 3CX
/// interpret a Contact-less response as "forced unregister" and tear down the
/// trunk even though the binding was accepted server-side.
unsafe fn send_register_ok(rdata: *mut pjsip_rx_data, expires: u32, contact_uri: Option<&str>) {
unsafe fn send_register_ok(
rdata: *mut pjsip_rx_data,
expires: u32,
contact_uri: Option<&str>,
) -> Result<(), SipResponseError> {
unsafe {
let endpt = pjsua_get_pjsip_endpt();
if endpt.is_null() {
return;
}
let expires_str = expires.to_string();
let contact_str = contact_uri.map(|uri| format!("<{}>;expires={}", uri, expires));
let pool = pjsua_pool_create(c"register_ok".as_ptr(), 1024, 1024);
if !pool.is_null() {
let exp_hdr = make_string_hdr(pool, c"Expires", &expires.to_string());
let contact_hdr = match contact_uri {
Some(uri) => make_string_hdr(
pool,
c"Contact",
&format!("<{}>;expires={}", uri, expires),
),
None => ptr::null_mut(),
};
if !exp_hdr.is_null() {
let hdr_list =
pj_pool_alloc(pool, std::mem::size_of::<pjsip_hdr>()) as *mut pjsip_hdr;
if !hdr_list.is_null() {
pj_list_init_hdr(hdr_list);
pj_list_insert_before(
hdr_list as *mut pj_list_type,
exp_hdr as *mut pj_list_type,
);
if !contact_hdr.is_null() {
pj_list_insert_before(
hdr_list as *mut pj_list_type,
contact_hdr as *mut pj_list_type,
);
}
let status = pjsip_endpt_respond_stateless(
endpt,
rdata,
200,
ptr::null(),
hdr_list,
ptr::null(),
);
if status != pj_constants__PJ_SUCCESS as i32 {
tracing::warn!("Failed to respond 200 OK to REGISTER: {}", status);
}
// Release pool — pjsip_endpt_respond_stateless clones what it
// needs into rdata's pool, so our header pool can be freed now.
pj_pool_release(pool);
return;
}
}
// Header creation failed — release the pool before falling through
pj_pool_release(pool);
}
// Fallback: respond without extra headers
let status =
pjsip_endpt_respond_stateless(endpt, rdata, 200, ptr::null(), ptr::null(), ptr::null());
if status != pj_constants__PJ_SUCCESS as i32 {
tracing::warn!("Failed to respond 200 OK to REGISTER: {}", status);
// Two-header common case
if let Some(ref contact) = contact_str {
respond_stateless_with_headers(
rdata,
200,
None,
&[(c"Expires", expires_str.as_str()), (c"Contact", contact.as_str())],
)
} else {
respond_stateless_with_headers(
rdata,
200,
None,
&[(c"Expires", expires_str.as_str())],
)
}
}
}
@ -232,26 +146,27 @@ unsafe fn detect_transport(rdata: *mut pjsip_rx_data) -> crate::services::regist
}
/// Create a UAS transaction + pre-built response tdata for deferred REGISTER
/// responses. Returns `None` if transaction creation fails (caller should fall
/// back to stateless response).
/// responses. Caller falls back to a stateless 200 if this errors.
unsafe fn create_register_tsx(
rdata: *mut pjsip_rx_data,
expires: u32,
contact_uri: Option<String>,
) -> Option<PendingRegisterTsx> {
) -> Result<PendingRegisterTsx, SipResponseError> {
unsafe {
let endpt = pjsua_get_pjsip_endpt();
if endpt.is_null() {
return Err(SipResponseError::EndpointNull);
}
let module_ptr = REGISTER_MODULE_PTR.load(Ordering::Acquire);
if endpt.is_null() || module_ptr.is_null() {
return None;
if module_ptr.is_null() {
return Err(SipResponseError::EndpointNull);
}
// Create UAS transaction
let mut tsx: *mut pjsip_transaction = ptr::null_mut();
let status = pjsip_tsx_create_uas2(module_ptr, rdata, ptr::null_mut(), &mut tsx);
if status != pj_constants__PJ_SUCCESS as i32 || tsx.is_null() {
return None;
return Err(SipResponseError::TsxCreate(status));
}
// Feed the request to the transaction (starts Timer F, stores headers)
@ -263,10 +178,10 @@ unsafe fn create_register_tsx(
let status = pjsip_endpt_create_response(endpt, rdata, 200, ptr::null(), &mut tdata);
if status != pj_constants__PJ_SUCCESS as i32 || tdata.is_null() {
pjsip_tsx_terminate(tsx, 500);
return None;
return Err(SipResponseError::ResponseBuild(status));
}
Some(PendingRegisterTsx {
Ok(PendingRegisterTsx {
tsx: SendableTsx(tsx),
tdata: SendableTdata(tdata),
expires,
@ -326,7 +241,7 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_
let result = ban_mgr.check_banned(&ip);
if result.is_banned {
tracing::debug!("Rejecting REGISTER from banned IP {}", ip);
send_simple_response(rdata, 403, "Forbidden");
send_simple_response(rdata, 403, c"Forbidden");
return pj_constants__PJ_TRUE as pj_bool_t;
}
}
@ -355,7 +270,7 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_
user_agent
);
}
send_simple_response(rdata, 403, "Forbidden");
send_simple_response(rdata, 403, c"Forbidden");
return pj_constants__PJ_TRUE as pj_bool_t;
}
@ -367,7 +282,7 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_
&& ban_mgr.record_register(ip)
{
tracing::debug!("Rejecting REGISTER from {} - rate limit exceeded", ip);
send_simple_response(rdata, 429, "Too Many Requests");
send_simple_response(rdata, 429, c"Too Many Requests");
return pj_constants__PJ_TRUE as pj_bool_t;
}
@ -387,7 +302,7 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_
ip_str,
params.username
);
send_simple_response(rdata, 429, "Too Many Requests");
send_simple_response(rdata, 429, c"Too Many Requests");
return pj_constants__PJ_TRUE as pj_bool_t;
}
@ -408,7 +323,13 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_
params.username,
ip_str
);
send_register_ok(rdata, expires, contact_uri.as_deref());
if let Err(e) = send_register_ok(rdata, expires, contact_uri.as_deref()) {
tracing::warn!(
"REGISTER 200 OK (cached) send failed for {}: {} — strict clients may reject",
params.username,
e
);
}
// Send to async handler for registrar update
if let Some(tx) = REGISTER_EVENT_TX.get() {
let _ = tx.try_send(RegisterRequest {
@ -429,7 +350,7 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_
params.username,
ip_str
);
send_simple_response(rdata, 403, "Forbidden");
send_simple_response(rdata, 403, c"Forbidden");
// Send to async so API can re-verify (cache may be stale
// after a password change) and update failure counts
if let Some(tx) = REGISTER_EVENT_TX.get() {
@ -453,24 +374,29 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_
params.username,
ip_str
);
if let Some(pending) = create_register_tsx(rdata, expires, contact_uri.clone()) {
if let Some(tx) = REGISTER_EVENT_TX.get() {
let _ = tx.try_send(RegisterRequest {
digest_auth: params,
contact_uri: contact_uri.unwrap_or_default(),
source_addr,
transport,
expires,
pending_tsx: Some(pending),
});
match create_register_tsx(rdata, expires, contact_uri.clone()) {
Ok(pending) => {
if let Some(tx) = REGISTER_EVENT_TX.get() {
let _ = tx.try_send(RegisterRequest {
digest_auth: params,
contact_uri: contact_uri.unwrap_or_default(),
source_addr,
transport,
expires,
pending_tsx: Some(pending),
});
}
return pj_constants__PJ_TRUE as pj_bool_t;
}
Err(e) => {
// Transaction creation failed — fall through to
// stateless 200 OK below.
tracing::warn!(
"Failed to create tsx for deferred REGISTER ({}), falling back to stateless 200",
e
);
}
return pj_constants__PJ_TRUE as pj_bool_t;
}
// Transaction creation failed — fall through to stateless
// 200 OK below (same behaviour as before this change).
tracing::warn!(
"Failed to create tsx for deferred REGISTER, falling back to stateless 200"
);
}
}
}
@ -483,6 +409,7 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_
params.username
);
let contact_uri_for_response = contact_uri.clone();
let user_for_log = params.username.clone();
if let Some(tx) = REGISTER_EVENT_TX.get() {
let _ = tx.try_send(RegisterRequest {
digest_auth: params,
@ -493,7 +420,14 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_
pending_tsx: None,
});
}
send_register_ok(rdata, expires, contact_uri_for_response.as_deref());
if let Err(e) = send_register_ok(rdata, expires, contact_uri_for_response.as_deref())
{
tracing::warn!(
"REGISTER 200 OK (stateless) send failed for {}: {} — strict clients may reject",
user_for_log,
e
);
}
} else {
// No Authorization header - send 401 challenge
tracing::debug!(
@ -501,63 +435,24 @@ pub unsafe extern "C" fn on_rx_request_cb(rdata: *mut pjsip_rx_data) -> pj_bool_
ip_str
);
let endpt = pjsua_get_pjsip_endpt();
if endpt.is_null() {
tracing::error!("Failed to get PJSIP endpoint for REGISTER 401 response");
return pj_constants__PJ_TRUE as pj_bool_t;
}
// Generate a cryptographically random nonce
let nonce = {
let nonce: String = {
let bytes: [u8; 16] = rand::random();
bytes
.iter()
.map(|b| format!("{:02x}", b))
.collect::<String>()
bytes.iter().map(|b| format!("{:02x}", b)).collect()
};
let www_auth = format!(
"Digest realm=\"{}\", nonce=\"{}\", algorithm=MD5, qop=\"auth\"",
SIP_REALM, nonce
);
// Create WWW-Authenticate header
let hdr_name = CString::new("WWW-Authenticate").unwrap();
let hdr_value = CString::new(www_auth).unwrap();
let pool = pjsua_pool_create(c"register_401".as_ptr(), 512, 512);
if pool.is_null() {
tracing::error!("Failed to create pool for REGISTER 401 response");
return pj_constants__PJ_TRUE as pj_bool_t;
if let Err(e) = respond_stateless_with_headers(
rdata,
401,
None,
&[(c"WWW-Authenticate", www_auth.as_str())],
) {
tracing::warn!("Failed to send 401 challenge to REGISTER: {}", e);
}
let name = pj_str(hdr_name.as_ptr() as *mut c_char);
let value = pj_str(hdr_value.as_ptr() as *mut c_char);
let hdr = pjsip_generic_string_hdr_create(pool, &name, &value);
if !hdr.is_null() {
let hdr_list =
pj_pool_alloc(pool, std::mem::size_of::<pjsip_hdr>()) as *mut pjsip_hdr;
if !hdr_list.is_null() {
pj_list_init_hdr(hdr_list);
pj_list_insert_before(hdr_list as *mut pj_list_type, hdr as *mut pj_list_type);
let status = pjsip_endpt_respond_stateless(
endpt,
rdata,
401,
ptr::null(),
hdr_list,
ptr::null(),
);
if status != pj_constants__PJ_SUCCESS as i32 {
tracing::warn!("Failed to respond 401 to REGISTER: {}", status);
}
}
}
// Release pool — pjsip_endpt_respond_stateless clones headers internally
pj_pool_release(pool);
}
// Return TRUE to indicate we handled this request