refactor: migrate backend queue to SwitchDriver actor

This commit is contained in:
Slinetrac
2025-10-26 15:04:03 +08:00
Unverified
parent de35fef450
commit ceec922887
4 changed files with 231 additions and 33 deletions

View File

@@ -19,6 +19,7 @@ use futures::FutureExt;
use once_cell::sync::OnceCell;
use smartstring::alias::String;
use std::{
collections::VecDeque,
panic::AssertUnwindSafe,
sync::atomic::{AtomicBool, AtomicU64, Ordering},
time::Duration,
@@ -26,14 +27,41 @@ use std::{
use tokio::sync::{
Mutex,
mpsc::{self, error::TrySendError},
oneshot,
};
static SWITCH_MUTEX: OnceCell<Mutex<()>> = OnceCell::new();
static SWITCH_QUEUE: OnceCell<mpsc::Sender<(String, bool)>> = OnceCell::new();
static SWITCH_QUEUE: OnceCell<mpsc::Sender<SwitchDriverMessage>> = OnceCell::new();
const SWITCH_QUEUE_CAPACITY: usize = 32;
// Track global request sequence to avoid stale queued execution.
static CURRENT_REQUEST_SEQUENCE: AtomicU64 = AtomicU64::new(0);
static CURRENT_SWITCHING_PROFILE: AtomicBool = AtomicBool::new(false);
static SWITCH_TASK_SEQUENCE: AtomicU64 = AtomicU64::new(0);
#[derive(Debug, Clone)]
struct SwitchRequest {
task_id: u64,
profile_id: String,
notify: bool,
}
#[derive(Debug, Default)]
struct SwitchDriverState {
active: Option<SwitchRequest>,
queue: VecDeque<SwitchRequest>,
}
#[derive(Debug)]
enum SwitchDriverMessage {
Request {
request: SwitchRequest,
respond_to: oneshot::Sender<bool>,
},
Completion {
request: SwitchRequest,
success: bool,
},
}
struct SwitchScope;
@@ -50,21 +78,77 @@ impl Drop for SwitchScope {
}
}
fn switch_queue_sender() -> &'static mpsc::Sender<(String, bool)> {
fn switch_driver_sender() -> &'static mpsc::Sender<SwitchDriverMessage> {
SWITCH_QUEUE.get_or_init(|| {
let (tx, mut rx) = mpsc::channel::<(String, bool)>(SWITCH_QUEUE_CAPACITY);
let (tx, mut rx) = mpsc::channel::<SwitchDriverMessage>(SWITCH_QUEUE_CAPACITY);
let driver_tx = tx.clone();
tokio::spawn(async move {
let mutex = SWITCH_MUTEX.get_or_init(|| Mutex::new(()));
while let Some((profile, notify)) = rx.recv().await {
let _guard = mutex.lock().await;
if let Err(err) = process_switch_task(profile.clone(), notify).await {
logging!(
error,
Type::Cmd,
"Failed to process profile switch task ({}): {}",
profile,
err
);
let mut state = SwitchDriverState::default();
while let Some(message) = rx.recv().await {
match message {
SwitchDriverMessage::Request {
request,
respond_to,
} => {
let accepted = true;
let mut responder = Some(respond_to);
if let Some(active) = &mut state.active
&& active.profile_id == request.profile_id
{
active.notify |= request.notify;
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
continue;
}
if let Some(existing) = state
.queue
.iter_mut()
.find(|queued| queued.profile_id == request.profile_id)
{
existing.notify |= request.notify;
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
continue;
}
if state.active.is_none() {
state.active = Some(request.clone());
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
start_switch_job(driver_tx.clone(), mutex, request);
} else {
state.queue.push_back(request.clone());
if let Some(sender) = responder.take() {
let _ = sender.send(accepted);
}
}
}
SwitchDriverMessage::Completion { request, success } => {
logging!(
info,
Type::Cmd,
"Switch task {} completed (success={})",
request.task_id,
success
);
if let Some(active) = &state.active
&& active.task_id == request.task_id
{
state.active = None;
}
if state.active.is_none()
&& let Some(next) = state.queue.pop_front()
{
state.active = Some(next.clone());
start_switch_job(driver_tx.clone(), mutex, next);
}
}
}
}
});
@@ -72,7 +156,55 @@ fn switch_queue_sender() -> &'static mpsc::Sender<(String, bool)> {
})
}
async fn process_switch_task(profile_index: String, notify_success: bool) -> CmdResult<()> {
fn start_switch_job(
driver_tx: mpsc::Sender<SwitchDriverMessage>,
mutex: &'static Mutex<()>,
request: SwitchRequest,
) {
tokio::spawn(async move {
let profile_id = request.profile_id.clone();
let notify = request.notify;
logging!(
info,
Type::Cmd,
"Starting switch task {} for profile {} (notify={})",
request.task_id,
profile_id,
notify
);
let success = match process_switch_task_with_guard(
mutex,
request.task_id,
profile_id.clone(),
notify,
)
.await
{
Ok(result) => result,
Err(err) => {
logging!(
error,
Type::Cmd,
"Switch task execution failed ({}): {}",
profile_id,
err
);
false
}
};
let _ = driver_tx
.send(SwitchDriverMessage::Completion { request, success })
.await;
});
}
async fn process_switch_task_with_guard(
mutex: &'static Mutex<()>,
task_id: u64,
profile_index: String,
notify_success: bool,
) -> CmdResult<bool> {
logging!(
info,
Type::Cmd,
@@ -81,6 +213,8 @@ async fn process_switch_task(profile_index: String, notify_success: bool) -> Cmd
notify_success
);
let _guard = mutex.lock().await;
let switch_result = AssertUnwindSafe(patch_profiles_config_internal(IProfiles {
current: Some(profile_index.clone()),
items: None,
@@ -105,8 +239,9 @@ async fn process_switch_task(profile_index: String, notify_success: bool) -> Cmd
profile_index.clone(),
false,
notify_success,
task_id,
);
return Ok(());
return Ok(false);
}
};
@@ -125,12 +260,18 @@ async fn process_switch_task(profile_index: String, notify_success: bool) -> Cmd
profile_index.clone(),
false,
notify_success,
task_id,
);
return Ok(());
return Ok(false);
}
};
handle::Handle::notify_profile_switch_finished(profile_index.clone(), success, notify_success);
handle::Handle::notify_profile_switch_finished(
profile_index.clone(),
success,
notify_success,
task_id,
);
if let Err(err) = handle::Handle::mihomo().await.close_all_connections().await {
logging!(
@@ -154,7 +295,7 @@ async fn process_switch_task(profile_index: String, notify_success: bool) -> Cmd
success
);
Ok(())
Ok(success)
}
#[tauri::command]
@@ -773,30 +914,68 @@ pub async fn patch_profiles_config_by_profile_index(profile_index: String) -> Cm
#[tauri::command]
pub async fn switch_profile(profile_index: String, notify_success: bool) -> CmdResult<bool> {
let sender = switch_driver_sender();
let task_id = SWITCH_TASK_SEQUENCE.fetch_add(1, Ordering::SeqCst) + 1;
logging!(
info,
Type::Cmd,
"Queue profile switch to: {}",
&profile_index
"Queue profile switch task {} -> {} (notify={})",
task_id,
profile_index,
notify_success
);
let sender = switch_queue_sender();
match sender.try_send((profile_index.clone(), notify_success)) {
Ok(_) => Ok(true),
Err(TrySendError::Full(task)) => {
let request = SwitchRequest {
task_id,
profile_id: profile_index.clone(),
notify: notify_success,
};
let (tx, rx) = oneshot::channel();
match sender.try_send(SwitchDriverMessage::Request {
request,
respond_to: tx,
}) {
Ok(_) => match rx.await {
Ok(result) => Ok(result),
Err(err) => {
logging!(
error,
Type::Cmd,
"Failed to receive enqueue result for profile {}: {}",
profile_index,
err
);
Err("switch profile queue unavailable".into())
}
},
Err(TrySendError::Full(msg)) => {
logging!(
warn,
Type::Cmd,
"Profile switch queue is full, waiting for space: {}",
&profile_index
"Profile switch queue is full; waiting for space: {}",
profile_index
);
match sender.send(task).await {
Ok(_) => Ok(true),
match sender.send(msg).await {
Ok(_) => match rx.await {
Ok(result) => Ok(result),
Err(err) => {
logging!(
error,
Type::Cmd,
"Failed to receive enqueue result after wait for {}: {}",
profile_index,
err
);
Err("switch profile queue unavailable".into())
}
},
Err(err) => {
logging!(
error,
Type::Cmd,
"Profile switch queue closed while waiting ({}): {}",
&profile_index,
profile_index,
err
);
Err("switch profile queue unavailable".into())
@@ -808,7 +987,7 @@ pub async fn switch_profile(profile_index: String, notify_success: bool) -> CmdR
error,
Type::Cmd,
"Profile switch queue is closed, cannot enqueue: {}",
&profile_index
profile_index
);
Err("switch profile queue unavailable".into())
}

View File

@@ -100,11 +100,17 @@ impl Handle {
});
}
pub fn notify_profile_switch_finished(profile_id: String, success: bool, notify: bool) {
pub fn notify_profile_switch_finished(
profile_id: String,
success: bool,
notify: bool,
task_id: u64,
) {
Self::send_event(FrontendEvent::ProfileSwitchFinished {
profile_id,
success,
notify,
task_id,
});
}

View File

@@ -34,6 +34,7 @@ pub enum FrontendEvent {
profile_id: String,
success: bool,
notify: bool,
task_id: u64,
},
TimerUpdated {
profile_index: String,
@@ -193,9 +194,15 @@ impl NotificationSystem {
profile_id,
success,
notify,
task_id,
} => (
"profile-switch-finished",
Ok(json!({ "profileId": profile_id, "success": success, "notify": notify })),
Ok(json!({
"profileId": profile_id,
"success": success,
"notify": notify,
"taskId": task_id
})),
),
FrontendEvent::TimerUpdated { profile_index } => {
("verge://timer-updated", Ok(json!(profile_index)))

View File

@@ -81,6 +81,7 @@ type ProfileSwitchFinishedPayload = {
profileId: string;
success: boolean;
notify: boolean;
taskId: number;
};
type RustPanicPayload = {
@@ -695,7 +696,12 @@ const ProfilePage = () => {
const payload = event.payload;
if (!payload) return;
const { profileId, success, notify } = payload;
const { profileId, success, notify, taskId } = payload;
debugProfileSwitch("SWITCH_FINISHED", profileId, {
success,
taskId,
notify,
});
setManualActivatings((prev) => prev.filter((id) => id !== profileId));
if (success) {