Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: introduce DialogScheduler as the unified dialog entry point
Route dialog and enhanced-message submissions through DialogScheduler and
wire coordinator outcome notifications to centralize turn scheduling.
  • Loading branch information
wsp1911 committed Mar 6, 2026
commit 8b7b092f2194d1458cca6bf09089251e27f222f7
7 changes: 4 additions & 3 deletions src/apps/desktop/src/api/agentic_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tauri::{AppHandle, State};

use crate::api::app_state::AppState;
use bitfun_core::agentic::tools::image_context::get_image_context;
use bitfun_core::agentic::coordination::{ConversationCoordinator, DialogTriggerSource};
use bitfun_core::agentic::coordination::{ConversationCoordinator, DialogScheduler, DialogTriggerSource};
use bitfun_core::agentic::core::*;
use bitfun_core::agentic::image_analysis::ImageContextData;
use bitfun_core::infrastructure::get_workspace_path;
Expand Down Expand Up @@ -186,6 +186,7 @@ pub async fn create_session(
pub async fn start_dialog_turn(
_app: AppHandle,
coordinator: State<'_, Arc<ConversationCoordinator>>,
scheduler: State<'_, Arc<DialogScheduler>>,
request: StartDialogTurnRequest,
) -> Result<StartDialogTurnResponse, String> {
let StartDialogTurnRequest {
Expand Down Expand Up @@ -214,8 +215,8 @@ pub async fn start_dialog_turn(
.await
.map_err(|e| format!("Failed to start dialog turn: {}", e))?;
} else {
coordinator
.start_dialog_turn(
scheduler
.submit(
session_id,
user_input,
turn_id,
Expand Down
15 changes: 9 additions & 6 deletions src/apps/desktop/src/api/image_analysis_api.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
//! Image Analysis API

use crate::api::app_state::AppState;
use bitfun_core::agentic::coordination::{ConversationCoordinator, DialogTriggerSource};
use bitfun_core::agentic::image_analysis::*;
use bitfun_core::agentic::coordination::{DialogScheduler, DialogTriggerSource};
use bitfun_core::agentic::image_analysis::{
resolve_vision_model_from_ai_config, AnalyzeImagesRequest, ImageAnalysisResult, ImageAnalyzer,
MessageEnhancer, SendEnhancedMessageRequest,
};
use log::error;
use std::sync::Arc;
use tauri::State;
Expand Down Expand Up @@ -56,7 +59,7 @@ pub async fn analyze_images(
#[tauri::command]
pub async fn send_enhanced_message(
request: SendEnhancedMessageRequest,
coordinator: State<'_, Arc<ConversationCoordinator>>,
scheduler: State<'_, Arc<DialogScheduler>>,
_state: State<'_, AppState>,
) -> Result<(), String> {
let enhanced_message = MessageEnhancer::enhance_with_image_analysis(
Expand All @@ -65,10 +68,10 @@ pub async fn send_enhanced_message(
&request.other_contexts,
);

let _stream = coordinator
.start_dialog_turn(
scheduler
.submit(
request.session_id.clone(),
enhanced_message.clone(),
enhanced_message,
Some(request.dialog_turn_id.clone()),
request.agent_type.clone(),
DialogTriggerSource::DesktopApi,
Expand Down
53 changes: 42 additions & 11 deletions src/apps/desktop/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ pub struct CoordinatorState {
pub coordinator: Arc<bitfun_core::agentic::coordination::ConversationCoordinator>,
}

/// Dialog scheduler state (primary entry point for user messages)
#[derive(Clone)]
pub struct SchedulerState {
pub scheduler: Arc<bitfun_core::agentic::coordination::DialogScheduler>,
}

/// Tauri application entry point
#[cfg_attr(mobile, tauri::mobile_entry_point)]
pub async fn run() {
Expand All @@ -72,7 +78,7 @@ pub async fn run() {
return;
}

let (coordinator, event_queue, event_router, ai_client_factory) =
let (coordinator, scheduler, event_queue, event_router, ai_client_factory) =
match init_agentic_system().await {
Ok(state) => state,
Err(e) => {
Expand All @@ -98,6 +104,10 @@ pub async fn run() {
coordinator: coordinator.clone(),
};

let scheduler_state = SchedulerState {
scheduler: scheduler.clone(),
};

let terminal_state = api::terminal_api::TerminalState::new();

let path_manager = get_path_manager_arc();
Expand Down Expand Up @@ -149,8 +159,10 @@ pub async fn run() {
})
.manage(app_state)
.manage(coordinator_state)
.manage(scheduler_state)
.manage(path_manager)
.manage(coordinator)
.manage(scheduler)
.manage(terminal_state)
.setup(move |app| {
logging::register_runtime_log_state(startup_log_level, session_log_dir.clone());
Expand All @@ -160,14 +172,13 @@ pub async fn run() {
// so the primary candidate is "mobile-web/dist". Additional fallbacks
// handle legacy or non-standard bundle layouts.
{
let candidates = [
"mobile-web/dist",
"mobile-web",
"dist",
];
let candidates = ["mobile-web/dist", "mobile-web", "dist"];
let mut found = false;
for candidate in &candidates {
if let Ok(p) = app.path().resolve(candidate, tauri::path::BaseDirectory::Resource) {
if let Ok(p) = app
.path()
.resolve(candidate, tauri::path::BaseDirectory::Resource)
{
if p.join("index.html").exists() {
log::info!("Found bundled mobile-web at: {}", p.display());
api::remote_connect_api::set_mobile_web_resource_path(p);
Expand All @@ -180,9 +191,16 @@ pub async fn run() {
// Last resort: scan the resource root for any index.html
if let Ok(res_dir) = app.path().resource_dir() {
for sub in &["mobile-web/dist", "mobile-web", "dist", ""] {
let p = if sub.is_empty() { res_dir.clone() } else { res_dir.join(sub) };
let p = if sub.is_empty() {
res_dir.clone()
} else {
res_dir.join(sub)
};
if p.join("index.html").exists() {
log::info!("Found mobile-web via resource root scan: {}", p.display());
log::info!(
"Found mobile-web via resource root scan: {}",
p.display()
);
api::remote_connect_api::set_mobile_web_resource_path(p);
break;
}
Expand Down Expand Up @@ -575,6 +593,7 @@ pub async fn run() {

async fn init_agentic_system() -> anyhow::Result<(
Arc<bitfun_core::agentic::coordination::ConversationCoordinator>,
Arc<bitfun_core::agentic::coordination::DialogScheduler>,
Arc<bitfun_core::agentic::events::EventQueue>,
Arc<bitfun_core::agentic::events::EventRouter>,
Arc<AIClientFactory>,
Expand Down Expand Up @@ -636,7 +655,7 @@ async fn init_agentic_system() -> anyhow::Result<(
));

let coordinator = Arc::new(coordination::ConversationCoordinator::new(
session_manager,
session_manager.clone(),
execution_engine,
tool_pipeline,
event_queue.clone(),
Expand All @@ -645,8 +664,20 @@ async fn init_agentic_system() -> anyhow::Result<(

coordination::ConversationCoordinator::set_global(coordinator.clone());

// Create the DialogScheduler and wire up the outcome notification channel
let scheduler =
coordination::DialogScheduler::new(coordinator.clone(), session_manager.clone());
coordinator.set_scheduler_notifier(scheduler.outcome_sender());
coordination::set_global_scheduler(scheduler.clone());

log::info!("Agentic system initialized");
Ok((coordinator, event_queue, event_router, ai_client_factory))
Ok((
coordinator,
scheduler,
event_queue,
event_router,
ai_client_factory,
))
}

async fn init_function_agents(ai_client_factory: Arc<AIClientFactory>) -> anyhow::Result<()> {
Expand Down
52 changes: 47 additions & 5 deletions src/crates/core/src/agentic/coordination/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::util::errors::{BitFunError, BitFunResult};
use log::{debug, error, info, warn};
use std::sync::Arc;
use std::sync::OnceLock;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

/// Subagent execution result
Expand Down Expand Up @@ -65,13 +66,26 @@ impl Drop for CancelTokenGuard {
}
}

/// Outcome of a completed dialog turn, used to notify DialogScheduler
#[derive(Debug, Clone)]
pub enum TurnOutcome {
/// Turn completed normally
Completed,
/// Turn was cancelled by user
Cancelled,
/// Turn failed with an error
Failed,
}

/// Conversation coordinator
pub struct ConversationCoordinator {
session_manager: Arc<SessionManager>,
execution_engine: Arc<ExecutionEngine>,
tool_pipeline: Arc<ToolPipeline>,
event_queue: Arc<EventQueue>,
event_router: Arc<EventRouter>,
/// Notifies DialogScheduler of turn outcomes; injected after construction
scheduler_notify_tx: OnceLock<mpsc::Sender<(String, TurnOutcome)>>,
}

impl ConversationCoordinator {
Expand All @@ -88,17 +102,25 @@ impl ConversationCoordinator {
tool_pipeline,
event_queue,
event_router,
scheduler_notify_tx: OnceLock::new(),
}
}

/// Inject the DialogScheduler notification channel after construction.
/// Called once during app initialization after the scheduler is created.
pub fn set_scheduler_notifier(&self, tx: mpsc::Sender<(String, TurnOutcome)>) {
let _ = self.scheduler_notify_tx.set(tx);
}

/// Create a new session
pub async fn create_session(
&self,
session_name: String,
agent_type: String,
config: SessionConfig,
) -> BitFunResult<Session> {
self.create_session_with_workspace(None, session_name, agent_type, config, None).await
self.create_session_with_workspace(None, session_name, agent_type, config, None)
.await
}

/// Create a new session with optional session ID
Expand All @@ -109,7 +131,8 @@ impl ConversationCoordinator {
agent_type: String,
config: SessionConfig,
) -> BitFunResult<Session> {
self.create_session_with_workspace(session_id, session_name, agent_type, config, None).await
self.create_session_with_workspace(session_id, session_name, agent_type, config, None)
.await
}

/// Create a new session with optional session ID and workspace binding.
Expand Down Expand Up @@ -561,6 +584,7 @@ impl ConversationCoordinator {
let turn_id_clone = turn_id.clone();
let session_workspace_path = session.config.workspace_path.clone();
let effective_agent_type_clone = effective_agent_type.clone();
let scheduler_notify_tx = self.scheduler_notify_tx.get().cloned();

tokio::spawn(async move {
// Note: Don't check cancellation here as cancel token hasn't been created yet
Expand Down Expand Up @@ -621,6 +645,10 @@ impl ConversationCoordinator {
let _ = session_manager
.update_session_state(&session_id_clone, SessionState::Idle)
.await;

if let Some(tx) = &scheduler_notify_tx {
let _ = tx.try_send((session_id_clone.clone(), TurnOutcome::Completed));
}
}
Err(e) => {
let is_cancellation = matches!(&e, BitFunError::Cancelled(_));
Expand All @@ -632,6 +660,10 @@ impl ConversationCoordinator {
let _ = session_manager
.update_session_state(&session_id_clone, SessionState::Idle)
.await;

if let Some(tx) = &scheduler_notify_tx {
let _ = tx.try_send((session_id_clone.clone(), TurnOutcome::Cancelled));
}
} else {
error!("Dialog turn execution failed: {}", e);

Expand Down Expand Up @@ -659,6 +691,10 @@ impl ConversationCoordinator {
},
)
.await;

if let Some(tx) = &scheduler_notify_tx {
let _ = tx.try_send((session_id_clone.clone(), TurnOutcome::Failed));
}
}
}
}
Expand Down Expand Up @@ -765,7 +801,9 @@ impl ConversationCoordinator {
limit: usize,
before_message_id: Option<&str>,
) -> BitFunResult<(Vec<Message>, bool)> {
self.session_manager.get_messages_paginated(session_id, limit, before_message_id).await
self.session_manager
.get_messages_paginated(session_id, limit, before_message_id)
.await
}

/// Subscribe to internal events
Expand Down Expand Up @@ -830,7 +868,9 @@ impl ConversationCoordinator {
if let Some(token) = cancel_token {
if token.is_cancelled() {
debug!("Subagent task cancelled before execution");
return Err(BitFunError::Cancelled("Subagent task has been cancelled".to_string()));
return Err(BitFunError::Cancelled(
"Subagent task has been cancelled".to_string(),
));
}
}

Expand All @@ -851,7 +891,9 @@ impl ConversationCoordinator {
if token.is_cancelled() {
debug!("Subagent task cancelled before AI call, cleaning up resources");
let _ = self.cleanup_subagent_resources(&session.session_id).await;
return Err(BitFunError::Cancelled("Subagent task has been cancelled".to_string()));
return Err(BitFunError::Cancelled(
"Subagent task has been cancelled".to_string(),
));
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/crates/core/src/agentic/coordination/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
//! Top-level component that integrates all subsystems

pub mod coordinator;
pub mod scheduler;
pub mod state_manager;

pub use coordinator::*;
pub use scheduler::*;
pub use state_manager::*;

pub use coordinator::get_global_coordinator;

pub use scheduler::get_global_scheduler;
Loading