diff --git a/src-tauri/src/cmd/profile.rs b/src-tauri/src/cmd/profile.rs index 620bb0f4..36e0f101 100644 --- a/src-tauri/src/cmd/profile.rs +++ b/src-tauri/src/cmd/profile.rs @@ -19,6 +19,7 @@ use futures::FutureExt; use once_cell::sync::OnceCell; use smartstring::alias::String; use std::{ + collections::VecDeque, panic::AssertUnwindSafe, sync::atomic::{AtomicBool, AtomicU64, Ordering}, time::Duration, @@ -26,14 +27,41 @@ use std::{ use tokio::sync::{ Mutex, mpsc::{self, error::TrySendError}, + oneshot, }; static SWITCH_MUTEX: OnceCell> = OnceCell::new(); -static SWITCH_QUEUE: OnceCell> = OnceCell::new(); +static SWITCH_QUEUE: OnceCell> = OnceCell::new(); const SWITCH_QUEUE_CAPACITY: usize = 32; // Track global request sequence to avoid stale queued execution. static CURRENT_REQUEST_SEQUENCE: AtomicU64 = AtomicU64::new(0); static CURRENT_SWITCHING_PROFILE: AtomicBool = AtomicBool::new(false); +static SWITCH_TASK_SEQUENCE: AtomicU64 = AtomicU64::new(0); + +#[derive(Debug, Clone)] +struct SwitchRequest { + task_id: u64, + profile_id: String, + notify: bool, +} + +#[derive(Debug, Default)] +struct SwitchDriverState { + active: Option, + queue: VecDeque, +} + +#[derive(Debug)] +enum SwitchDriverMessage { + Request { + request: SwitchRequest, + respond_to: oneshot::Sender, + }, + Completion { + request: SwitchRequest, + success: bool, + }, +} struct SwitchScope; @@ -50,21 +78,77 @@ impl Drop for SwitchScope { } } -fn switch_queue_sender() -> &'static mpsc::Sender<(String, bool)> { +fn switch_driver_sender() -> &'static mpsc::Sender { SWITCH_QUEUE.get_or_init(|| { - let (tx, mut rx) = mpsc::channel::<(String, bool)>(SWITCH_QUEUE_CAPACITY); + let (tx, mut rx) = mpsc::channel::(SWITCH_QUEUE_CAPACITY); + let driver_tx = tx.clone(); tokio::spawn(async move { let mutex = SWITCH_MUTEX.get_or_init(|| Mutex::new(())); - while let Some((profile, notify)) = rx.recv().await { - let _guard = mutex.lock().await; - if let Err(err) = process_switch_task(profile.clone(), notify).await { - logging!( - error, - Type::Cmd, - "Failed to process profile switch task ({}): {}", - profile, - err - ); + let mut state = SwitchDriverState::default(); + while let Some(message) = rx.recv().await { + match message { + SwitchDriverMessage::Request { + request, + respond_to, + } => { + let accepted = true; + let mut responder = Some(respond_to); + + if let Some(active) = &mut state.active + && active.profile_id == request.profile_id + { + active.notify |= request.notify; + if let Some(sender) = responder.take() { + let _ = sender.send(accepted); + } + continue; + } + + if let Some(existing) = state + .queue + .iter_mut() + .find(|queued| queued.profile_id == request.profile_id) + { + existing.notify |= request.notify; + if let Some(sender) = responder.take() { + let _ = sender.send(accepted); + } + continue; + } + + if state.active.is_none() { + state.active = Some(request.clone()); + if let Some(sender) = responder.take() { + let _ = sender.send(accepted); + } + start_switch_job(driver_tx.clone(), mutex, request); + } else { + state.queue.push_back(request.clone()); + if let Some(sender) = responder.take() { + let _ = sender.send(accepted); + } + } + } + SwitchDriverMessage::Completion { request, success } => { + logging!( + info, + Type::Cmd, + "Switch task {} completed (success={})", + request.task_id, + success + ); + if let Some(active) = &state.active + && active.task_id == request.task_id + { + state.active = None; + } + if state.active.is_none() + && let Some(next) = state.queue.pop_front() + { + state.active = Some(next.clone()); + start_switch_job(driver_tx.clone(), mutex, next); + } + } } } }); @@ -72,7 +156,55 @@ fn switch_queue_sender() -> &'static mpsc::Sender<(String, bool)> { }) } -async fn process_switch_task(profile_index: String, notify_success: bool) -> CmdResult<()> { +fn start_switch_job( + driver_tx: mpsc::Sender, + mutex: &'static Mutex<()>, + request: SwitchRequest, +) { + tokio::spawn(async move { + let profile_id = request.profile_id.clone(); + let notify = request.notify; + logging!( + info, + Type::Cmd, + "Starting switch task {} for profile {} (notify={})", + request.task_id, + profile_id, + notify + ); + let success = match process_switch_task_with_guard( + mutex, + request.task_id, + profile_id.clone(), + notify, + ) + .await + { + Ok(result) => result, + Err(err) => { + logging!( + error, + Type::Cmd, + "Switch task execution failed ({}): {}", + profile_id, + err + ); + false + } + }; + + let _ = driver_tx + .send(SwitchDriverMessage::Completion { request, success }) + .await; + }); +} + +async fn process_switch_task_with_guard( + mutex: &'static Mutex<()>, + task_id: u64, + profile_index: String, + notify_success: bool, +) -> CmdResult { logging!( info, Type::Cmd, @@ -81,6 +213,8 @@ async fn process_switch_task(profile_index: String, notify_success: bool) -> Cmd notify_success ); + let _guard = mutex.lock().await; + let switch_result = AssertUnwindSafe(patch_profiles_config_internal(IProfiles { current: Some(profile_index.clone()), items: None, @@ -105,8 +239,9 @@ async fn process_switch_task(profile_index: String, notify_success: bool) -> Cmd profile_index.clone(), false, notify_success, + task_id, ); - return Ok(()); + return Ok(false); } }; @@ -125,12 +260,18 @@ async fn process_switch_task(profile_index: String, notify_success: bool) -> Cmd profile_index.clone(), false, notify_success, + task_id, ); - return Ok(()); + return Ok(false); } }; - handle::Handle::notify_profile_switch_finished(profile_index.clone(), success, notify_success); + handle::Handle::notify_profile_switch_finished( + profile_index.clone(), + success, + notify_success, + task_id, + ); if let Err(err) = handle::Handle::mihomo().await.close_all_connections().await { logging!( @@ -154,7 +295,7 @@ async fn process_switch_task(profile_index: String, notify_success: bool) -> Cmd success ); - Ok(()) + Ok(success) } #[tauri::command] @@ -773,30 +914,68 @@ pub async fn patch_profiles_config_by_profile_index(profile_index: String) -> Cm #[tauri::command] pub async fn switch_profile(profile_index: String, notify_success: bool) -> CmdResult { + let sender = switch_driver_sender(); + + let task_id = SWITCH_TASK_SEQUENCE.fetch_add(1, Ordering::SeqCst) + 1; logging!( info, Type::Cmd, - "Queue profile switch to: {}", - &profile_index + "Queue profile switch task {} -> {} (notify={})", + task_id, + profile_index, + notify_success ); - let sender = switch_queue_sender(); - match sender.try_send((profile_index.clone(), notify_success)) { - Ok(_) => Ok(true), - Err(TrySendError::Full(task)) => { + let request = SwitchRequest { + task_id, + profile_id: profile_index.clone(), + notify: notify_success, + }; + let (tx, rx) = oneshot::channel(); + + match sender.try_send(SwitchDriverMessage::Request { + request, + respond_to: tx, + }) { + Ok(_) => match rx.await { + Ok(result) => Ok(result), + Err(err) => { + logging!( + error, + Type::Cmd, + "Failed to receive enqueue result for profile {}: {}", + profile_index, + err + ); + Err("switch profile queue unavailable".into()) + } + }, + Err(TrySendError::Full(msg)) => { logging!( warn, Type::Cmd, - "Profile switch queue is full, waiting for space: {}", - &profile_index + "Profile switch queue is full; waiting for space: {}", + profile_index ); - match sender.send(task).await { - Ok(_) => Ok(true), + match sender.send(msg).await { + Ok(_) => match rx.await { + Ok(result) => Ok(result), + Err(err) => { + logging!( + error, + Type::Cmd, + "Failed to receive enqueue result after wait for {}: {}", + profile_index, + err + ); + Err("switch profile queue unavailable".into()) + } + }, Err(err) => { logging!( error, Type::Cmd, "Profile switch queue closed while waiting ({}): {}", - &profile_index, + profile_index, err ); Err("switch profile queue unavailable".into()) @@ -808,7 +987,7 @@ pub async fn switch_profile(profile_index: String, notify_success: bool) -> CmdR error, Type::Cmd, "Profile switch queue is closed, cannot enqueue: {}", - &profile_index + profile_index ); Err("switch profile queue unavailable".into()) } diff --git a/src-tauri/src/core/handle.rs b/src-tauri/src/core/handle.rs index 79991cbd..30fc651a 100644 --- a/src-tauri/src/core/handle.rs +++ b/src-tauri/src/core/handle.rs @@ -100,11 +100,17 @@ impl Handle { }); } - pub fn notify_profile_switch_finished(profile_id: String, success: bool, notify: bool) { + pub fn notify_profile_switch_finished( + profile_id: String, + success: bool, + notify: bool, + task_id: u64, + ) { Self::send_event(FrontendEvent::ProfileSwitchFinished { profile_id, success, notify, + task_id, }); } diff --git a/src-tauri/src/core/notification.rs b/src-tauri/src/core/notification.rs index d7c3627d..3f39b29f 100644 --- a/src-tauri/src/core/notification.rs +++ b/src-tauri/src/core/notification.rs @@ -34,6 +34,7 @@ pub enum FrontendEvent { profile_id: String, success: bool, notify: bool, + task_id: u64, }, TimerUpdated { profile_index: String, @@ -193,9 +194,15 @@ impl NotificationSystem { profile_id, success, notify, + task_id, } => ( "profile-switch-finished", - Ok(json!({ "profileId": profile_id, "success": success, "notify": notify })), + Ok(json!({ + "profileId": profile_id, + "success": success, + "notify": notify, + "taskId": task_id + })), ), FrontendEvent::TimerUpdated { profile_index } => { ("verge://timer-updated", Ok(json!(profile_index))) diff --git a/src/pages/profiles.tsx b/src/pages/profiles.tsx index 0225032d..e7c4d305 100644 --- a/src/pages/profiles.tsx +++ b/src/pages/profiles.tsx @@ -81,6 +81,7 @@ type ProfileSwitchFinishedPayload = { profileId: string; success: boolean; notify: boolean; + taskId: number; }; type RustPanicPayload = { @@ -695,7 +696,12 @@ const ProfilePage = () => { const payload = event.payload; if (!payload) return; - const { profileId, success, notify } = payload; + const { profileId, success, notify, taskId } = payload; + debugProfileSwitch("SWITCH_FINISHED", profileId, { + success, + taskId, + notify, + }); setManualActivatings((prev) => prev.filter((id) => id !== profileId)); if (success) {