dont wait for menu to end befor button press accepts

This commit is contained in:
legop3 2026-06-14 05:28:05 -04:00
parent 2630ca154f
commit 4d8dd36538
5 changed files with 109 additions and 25 deletions

View file

@ -1598,14 +1598,8 @@ async fn select_guild_from_menu(
page,
guilds.len(),
);
if let Err(e) = play_tts_prompt(call_id, &prompt, &ctx.sip_cmd_tx).await {
error!("Failed to play guild menu TTS for call {}: {}", call_id, e);
ctx.dtmf_waiters.remove(&call_id);
let _ = ctx.sip_cmd_tx.send(SipCommand::Hangup { call_id });
return None;
}
let digit = wait_for_menu_digit(call_id, menu, dtmf_rx, ctx).await?;
let digit =
play_tts_prompt_and_wait_for_digit(call_id, menu, &prompt, dtmf_rx, ctx).await?;
match digit {
'#' => continue,
'9' if has_next_page(guilds.len(), page) => {
@ -1659,14 +1653,8 @@ async fn select_channel_from_menu(
page,
channels.len(),
);
if let Err(e) = play_tts_prompt(call_id, &prompt, &ctx.sip_cmd_tx).await {
error!("Failed to play channel menu TTS for call {}: {}", call_id, e);
ctx.dtmf_waiters.remove(&call_id);
let _ = ctx.sip_cmd_tx.send(SipCommand::Hangup { call_id });
return None;
}
let digit = wait_for_menu_digit(call_id, menu, dtmf_rx, ctx).await?;
let digit =
play_tts_prompt_and_wait_for_digit(call_id, menu, &prompt, dtmf_rx, ctx).await?;
match digit {
'#' => continue,
'*' if page > 0 => {
@ -1782,9 +1770,10 @@ fn is_tts_skipped_symbol(ch: char) -> bool {
)
}
async fn wait_for_menu_digit(
async fn play_tts_prompt_and_wait_for_digit(
call_id: CallId,
menu: &MenuRoute,
text: &str,
dtmf_rx: &mut mpsc::UnboundedReceiver<char>,
ctx: &MenuCallContext,
) -> Option<char> {
@ -1793,13 +1782,28 @@ async fn wait_for_menu_digit(
return None;
}
match tokio::time::timeout(
Duration::from_secs(menu.timeout_seconds.max(1)),
dtmf_rx.recv(),
)
.await
{
Ok(Some(digit)) => Some(digit),
let samples = match synthesize_tts_samples(call_id, text).await {
Ok(samples) => samples,
Err(e) => {
error!("Failed to synthesize menu TTS for call {}: {}", call_id, e);
ctx.dtmf_waiters.remove(&call_id);
let _ = ctx.sip_cmd_tx.send(SipCommand::Hangup { call_id });
return None;
}
};
let duration_ms = (samples.len() as u64 * 1000) / CONF_SAMPLE_RATE as u64;
let _ = ctx
.sip_cmd_tx
.send(SipCommand::PlayDirectToCall { call_id, samples });
let wait_duration =
Duration::from_millis(duration_ms + 100) + Duration::from_secs(menu.timeout_seconds.max(1));
match tokio::time::timeout(wait_duration, dtmf_rx.recv()).await {
Ok(Some(digit)) => {
let _ = ctx.sip_cmd_tx.send(SipCommand::StopDirectToCall { call_id });
Some(digit)
}
Ok(None) => {
warn!("Menu {} DTMF channel closed for call {}", menu.id, call_id);
ctx.dtmf_waiters.remove(&call_id);

View file

@ -322,6 +322,9 @@ fn describe_op(op: &PendingPjsuaOp) -> String {
PendingPjsuaOp::PlayDirect { call_id, samples } => {
format!("PlayDirect {{ call_id: {}, samples: {} }}", call_id, samples.len())
}
PendingPjsuaOp::StopDirect { call_id } => {
format!("StopDirect {{ call_id: {} }}", call_id)
}
PendingPjsuaOp::StartLoop { call_id, samples } => {
format!("StartLoop {{ call_id: {}, samples: {} }}", call_id, samples.len())
}
@ -355,6 +358,7 @@ fn process_pending_pjsua_ops() {
// Validate that the call still exists before processing the op
let call_id = match &op {
PendingPjsuaOp::PlayDirect { call_id, .. } => Some(*call_id),
PendingPjsuaOp::StopDirect { call_id } => Some(*call_id),
PendingPjsuaOp::StartLoop { call_id, .. } => Some(*call_id),
PendingPjsuaOp::StartStreaming { call_id, .. } => Some(*call_id),
PendingPjsuaOp::StartTestTone { call_id } => Some(*call_id),
@ -387,6 +391,9 @@ fn process_pending_pjsua_ops() {
tracing::warn!("Failed to play direct audio to call {}: {}", call_id, e);
}
}
PendingPjsuaOp::StopDirect { call_id } => {
super::ffi::direct_player::stop_direct_audio_to_call_internal(call_id);
}
PendingPjsuaOp::StartStreaming {
call_id,
path,

View file

@ -7,7 +7,7 @@ use crate::transport::sip::error::SipAudioError;
use super::types::*;
use parking_lot::Mutex;
use pjsua::*;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
/// Custom get_frame callback for direct player ports
/// Returns samples from the player's buffer, advancing position each call
@ -73,6 +73,20 @@ pub unsafe extern "C" fn direct_player_on_destroy(this_port: *mut pjmedia_port)
if let Some(state) = DIRECT_PLAYER_STATE.get() {
state.lock().remove(&port_key);
}
let call_id = DIRECT_PLAYER_CALLS
.get()
.and_then(|calls| calls.lock().remove(&port_key));
if let Some(call_id) = call_id
&& let Some(ports) = DIRECT_PLAYER_PORTS.get()
{
let mut ports = ports.lock();
if let Some(call_ports) = ports.get_mut(&call_id) {
call_ports.remove(&port_key);
if call_ports.is_empty() {
ports.remove(&call_id);
}
}
}
tracing::debug!("Direct player port destroyed: {:p}", this_port);
}
pj_constants__PJ_SUCCESS as pj_status_t
@ -101,6 +115,43 @@ pub fn play_audio_to_call_direct(call_id: CallId, samples: &[i16]) -> Result<(),
Ok(())
}
/// Stop direct one-shot audio currently playing to a call.
pub fn stop_direct_audio_to_call(call_id: CallId) {
use super::types::{PendingPjsuaOp, queue_pjsua_op};
queue_pjsua_op(PendingPjsuaOp::StopDirect { call_id });
}
/// Internal implementation of direct audio stop, run on the audio thread.
pub fn stop_direct_audio_to_call_internal(call_id: CallId) {
let port_keys = DIRECT_PLAYER_PORTS
.get()
.and_then(|ports| ports.lock().remove(&call_id));
let Some(port_keys) = port_keys else {
return;
};
if let Some(state) = DIRECT_PLAYER_STATE.get() {
let mut state = state.lock();
for port_key in &port_keys {
state.remove(port_key);
}
}
if let Some(calls) = DIRECT_PLAYER_CALLS.get() {
let mut calls = calls.lock();
for port_key in &port_keys {
calls.remove(port_key);
}
}
tracing::debug!(
"Stopped {} direct player(s) for call {}",
port_keys.len(),
call_id
);
}
/// Internal implementation of play_audio_to_call_direct
/// Called from the audio thread to actually create and connect the player
pub fn play_audio_to_call_direct_internal(
@ -141,6 +192,14 @@ pub fn play_audio_to_call_direct_internal(
// Now store samples with the actual port key
let state = DIRECT_PLAYER_STATE.get_or_init(|| Mutex::new(HashMap::new()));
state.lock().insert(guard.port_key, (samples.to_vec(), 0));
let ports = DIRECT_PLAYER_PORTS.get_or_init(|| Mutex::new(HashMap::new()));
ports
.lock()
.entry(call_id)
.or_insert_with(HashSet::new)
.insert(guard.port_key);
let calls = DIRECT_PLAYER_CALLS.get_or_init(|| Mutex::new(HashMap::new()));
calls.lock().insert(guard.port_key, call_id);
tracing::debug!(
"Playing {} samples directly to call {} (player_slot={}, call_port={})",

View file

@ -283,6 +283,8 @@ pub enum PendingPjsuaOp {
/// Play samples directly to a call (for join sounds)
/// Note: This also stops any active looping player for the call first
PlayDirect { call_id: CallId, samples: Vec<i16> },
/// Stop one-shot direct players for a call.
StopDirect { call_id: CallId },
/// Start streaming audio from a file to a call (for large easter egg files)
/// Uses pull model for precise timing - audio thread pulls frames as needed
StartStreaming {
@ -378,6 +380,13 @@ pub static CHANNEL_DRAIN_CACHE: OnceLock<DashMap<Snowflake, DrainCacheEntry>> =
pub static DIRECT_PLAYER_STATE: OnceLock<Mutex<HashMap<usize, DirectPlayerEntry>>> =
OnceLock::new();
/// call_id -> direct player port keys.
pub static DIRECT_PLAYER_PORTS: OnceLock<Mutex<HashMap<CallId, HashSet<usize>>>> =
OnceLock::new();
/// direct player port key -> call_id.
pub static DIRECT_PLAYER_CALLS: OnceLock<Mutex<HashMap<usize, CallId>>> = OnceLock::new();
/// Memory pool for direct player ports
pub static DIRECT_PLAYER_POOL: OnceLock<Mutex<SendablePool>> = OnceLock::new();

View file

@ -93,6 +93,8 @@ pub enum SipCommand {
/// Play audio directly to a call (bypasses channel buffer)
/// Used for join sounds to avoid buffer overflow with Discord audio
PlayDirectToCall { call_id: CallId, samples: Vec<i16> },
/// Stop one-shot direct audio currently playing to a call.
StopDirectToCall { call_id: CallId },
/// Start a looping audio player for early media (183 Session Progress)
StartConnectingLoop { call_id: CallId, samples: Vec<i16> },
/// Hangup a call
@ -364,6 +366,9 @@ fn process_sip_command(cmd: SipCommand, calls: &Arc<DashMap<CallId, CallState>>)
tracing::error!("Failed to play direct audio to call {}: {}", call_id, e);
}
}
SipCommand::StopDirectToCall { call_id } => {
stop_direct_audio_to_call(call_id);
}
SipCommand::StartConnectingLoop { call_id, samples } => {
// Queue to audio thread to avoid race with pjmedia_port_get_frame
queue_pjsua_op(PendingPjsuaOp::StartLoop { call_id, samples });