use std::sync::Arc;
use tokio::sync::mpsc;
use atomcode_core::agent::AgentEvent;
use atomcode_core::live::{LiveEvent, LiveSession};
use atomcode_core::turn::event::TurnEvent;
use super::bg_runtime::{RuntimeEvent, RuntimeId};
pub(crate) fn turn_to_agent_event(te: TurnEvent) -> Option<AgentEvent> {
Some(match te {
TurnEvent::TextDelta(s) => AgentEvent::TextDelta(s),
TurnEvent::ReasoningDelta(s) => AgentEvent::ReasoningDelta(s),
TurnEvent::ToolCallStarted { id, name, arguments } =>
AgentEvent::ToolCallStarted { id, name, arguments },
TurnEvent::ToolOutputChunk { call_id, chunk } =>
AgentEvent::ToolOutputChunk { call_id, chunk },
TurnEvent::ToolCallResult { call_id, name, output, success, duration } =>
AgentEvent::ToolCallResult { call_id, name, output, success, duration },
TurnEvent::TokenUsage { prompt_tokens, completion_tokens, cached_tokens, .. } =>
AgentEvent::TokenUsage(atomcode_core::stream::TokenUsage {
prompt_tokens,
completion_tokens,
cached_tokens,
}),
TurnEvent::Error(e) => AgentEvent::Error { error: e, messages: Vec::new() },
TurnEvent::Warning(w) => AgentEvent::Warning(w),
TurnEvent::ApprovalRequested { tool_name, reason, call, messages } =>
AgentEvent::ApprovalNeeded { tool_name, reason, call, messages },
TurnEvent::ToolCallStreaming { .. }
| TurnEvent::ToolBatchStarted { .. }
| TurnEvent::ToolBatchCompleted { .. }
| TurnEvent::ContextStats { .. }
| TurnEvent::WorkingDirChanged(_) => return None,
})
}
pub(crate) fn spawn_live_forwarder(
session: Arc<LiveSession>,
runtime_id: RuntimeId,
fan_tx: mpsc::UnboundedSender<RuntimeEvent>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let (_snapshot, mut rx) = session.join().await;
loop {
match rx.recv().await {
Ok(LiveEvent::Turn(te)) => {
if let Some(ae) = turn_to_agent_event(te) {
if fan_tx.send(RuntimeEvent { runtime_id, event: ae }).is_err() {
break;
}
}
}
Ok(LiveEvent::UserMessage { text, .. }) => {
if fan_tx
.send(RuntimeEvent { runtime_id, event: AgentEvent::UserEcho(text) })
.is_err()
{
break;
}
}
Ok(LiveEvent::StateChanged(st)) => {
let running = matches!(st, atomcode_core::live::TurnState::Running);
if fan_tx
.send(RuntimeEvent { runtime_id, event: AgentEvent::PeerBusy(running) })
.is_err()
{
break;
}
}
Ok(LiveEvent::ProviderChanged(provider)) => {
if fan_tx
.send(RuntimeEvent { runtime_id, event: AgentEvent::ProviderChanged(provider) })
.is_err()
{
break;
}
}
Ok(LiveEvent::WorkingDirChanged(dir)) => {
if fan_tx
.send(RuntimeEvent { runtime_id, event: AgentEvent::ProjectSwitched(dir) })
.is_err()
{
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(_) => break,
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn maps_text_delta() {
assert!(
matches!(
turn_to_agent_event(TurnEvent::TextDelta("hi".into())),
Some(AgentEvent::TextDelta(s)) if s == "hi"
)
);
}
#[test]
fn ignores_context_stats() {
assert!(turn_to_agent_event(TurnEvent::ContextStats {
system_tokens: 0,
sent_tokens: 0,
dropped_tokens: 0,
working_set_tokens: 0,
total_messages: 0,
})
.is_none());
}
}