mod daemon_config;
mod daemon_runtime;
mod lsp_support;

use crate::daemon_config::{resolve_config_path, DaemonConfig};
use crate::daemon_runtime::ConfiguredRuntimeResolver;
use anyhow::{bail, Context, Result};
use futures_util::future::BoxFuture;
use operation_backend::process_group::ProcessGroupCleanupGuard;
use std::env;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use tracing_subscriber::EnvFilter;
use xiaoo_app::channels::{
    build_feishu_runtime, build_telegram_runtime, FeishuEventTransport,
    FeishuWebsocketMessageHandler, FeishuWebsocketService, TelegramPollingMessageHandler,
    TelegramPollingService,
};
use xiaoo_app::gateway::{
    backend::ExternalBackendManager, AppBootstrap, InMemorySessionStore, SessionStore,
};
use xiaoo_app::httpserver::{
    create_router_with_channel_runtimes_control_plane_and_timeout_and_auth,
    create_router_with_control_plane_and_auth, ChannelRuntimeProcessor, HttpBearerAuthConfig,
};

#[tokio::main]
async fn main() -> Result<()> {
    let _cleanup_guard = ProcessGroupCleanupGuard;

    init_tracing();
    let cli = Cli::parse(env::args().skip(1))?;
    match cli.command {
        Command::Daemon { config, host, port } => run_daemon(config, host, port).await,
    }
}

async fn run_daemon(config_path: Option<PathBuf>, host: String, port: u16) -> Result<()> {
    let config_path = resolve_config_path(config_path)?;
    let config = DaemonConfig::load_from(&config_path)?;
    let hooker_config = config.app.hooker.clone();
    let bearer_auth = config.http_bearer_token()?.map(HttpBearerAuthConfig::new);
    let rate_limit = config.app.http.rate_limit.clone();
    let resolver = Arc::new(ConfiguredRuntimeResolver::from_config(&config).await?);
    let session_store: Arc<dyn SessionStore> = Arc::new(InMemorySessionStore::default());
    let backend_manager = Arc::new(ExternalBackendManager::new());
    let app = AppBootstrap::from_session_components_with_hooks_and_backend_manager(
        session_store,
        resolver,
        hooker_config,
        backend_manager.clone(),
    )?;
    let interaction_timeout_secs = config.interaction_timeout_secs();
    let session_service = app.session_service.clone();
    let session_control_plane = app.session_control_plane.clone();

    if let Some(telegram_config) = config.telegram_polling_config()? {
        spawn_telegram_polling_service(
            telegram_config,
            session_service.clone(),
            interaction_timeout_secs,
        )
        .context("failed to start telegram polling service")?;
    }

    if let Some(feishu_config) = config.feishu_config()? {
        if feishu_config.event_transport == FeishuEventTransport::Websocket {
            spawn_feishu_websocket_service(
                feishu_config,
                session_service.clone(),
                interaction_timeout_secs,
            )
            .context("failed to start Feishu websocket service")?;
        }
    }

    let channel_runtimes = config.channel_runtimes()?;
    let router = if channel_runtimes.is_empty() {
        create_router_with_control_plane_and_auth(
            session_service,
            session_control_plane,
            bearer_auth,
            rate_limit,
        )
    } else {
        create_router_with_channel_runtimes_control_plane_and_timeout_and_auth(
            session_service,
            session_control_plane,
            channel_runtimes,
            interaction_timeout_secs,
            bearer_auth,
            rate_limit,
        )
        .map_err(anyhow::Error::new)
        .context("failed to create router with channel runtimes")?
    };

    let addr: SocketAddr = format!("{host}:{port}")
        .parse()
        .with_context(|| format!("invalid listen address {host}:{port}"))?;
    let listener = tokio::net::TcpListener::bind(addr)
        .await
        .with_context(|| format!("failed to bind {addr}"))?;
    tracing::info!(config = %config_path.display(), %addr, "starting rebuild daemon");
    let serve_result = axum::serve(listener, router)
        .await
        .context("axum server exited unexpectedly");
    if let Err(error) = backend_manager.shutdown_all().await {
        tracing::warn!(error = %error, "failed to shutdown daemon backend manager");
    }
    serve_result
}

fn spawn_feishu_websocket_service(
    feishu_config: xiaoo_app::channels::FeishuConfig,
    session_service: Arc<dyn xiaoo_app::gateway::SessionService>,
    interaction_timeout_secs: u64,
) -> Result<()> {
    let runtime = build_feishu_runtime(feishu_config.clone()).map_err(anyhow::Error::new)?;
    let processor =
        ChannelRuntimeProcessor::with_timeout(session_service, interaction_timeout_secs);
    let service = FeishuWebsocketService::new(feishu_config).map_err(anyhow::Error::new)?;
    let handler: FeishuWebsocketMessageHandler = Arc::new(move |message| {
        let processor = processor.clone();
        let runtime = runtime.clone();
        Box::pin(async move {
            if let Err(error) = processor.process_message(runtime, message).await {
                tracing::warn!("failed to process Feishu websocket message: {error}");
            }
        }) as BoxFuture<'static, ()>
    });
    tracing::info!("starting Feishu websocket transport");
    tokio::spawn(async move {
        service.run_forever(handler).await;
    });
    Ok(())
}

fn spawn_telegram_polling_service(
    telegram_config: xiaoo_app::channels::TelegramConfig,
    session_service: Arc<dyn xiaoo_app::gateway::SessionService>,
    interaction_timeout_secs: u64,
) -> Result<()> {
    let runtime = build_telegram_runtime(telegram_config.clone()).map_err(anyhow::Error::new)?;
    let processor =
        ChannelRuntimeProcessor::with_timeout(session_service, interaction_timeout_secs);
    let service = TelegramPollingService::new(telegram_config).map_err(anyhow::Error::new)?;
    let handler: TelegramPollingMessageHandler = Arc::new(move |message| {
        let processor = processor.clone();
        let runtime = runtime.clone();
        Box::pin(async move {
            if let Err(error) = processor.process_message(runtime, message).await {
                tracing::warn!("failed to process telegram polling message: {error}");
            }
        }) as BoxFuture<'static, ()>
    });
    tracing::info!("starting telegram polling transport");
    tokio::spawn(async move {
        service.run_forever(handler).await;
    });
    Ok(())
}

fn init_tracing() {
    let filter = EnvFilter::try_from_default_env()
        .unwrap_or_else(|_| EnvFilter::new("info,xiaoo_app=debug"));
    let _ = tracing_subscriber::fmt()
        .with_env_filter(filter)
        .with_target(false)
        .try_init();
}

struct Cli {
    command: Command,
}

enum Command {
    Daemon {
        config: Option<PathBuf>,
        host: String,
        port: u16,
    },
}

impl Cli {
    fn parse<I>(args: I) -> Result<Self>
    where
        I: IntoIterator<Item = String>,
    {
        let mut args = args.into_iter();
        let Some(command) = args.next() else {
            bail!("missing command: expected `daemon`");
        };
        match command.as_str() {
            "daemon" => {
                let mut config = None;
                let mut host = "0.0.0.0".to_string();
                let mut port = 18080_u16;
                let remaining = args.collect::<Vec<_>>();
                let mut index = 0;
                while index < remaining.len() {
                    match remaining[index].as_str() {
                        "--config" => {
                            index += 1;
                            let value =
                                remaining.get(index).context("missing value for --config")?;
                            config = Some(PathBuf::from(value));
                        }
                        "--host" => {
                            index += 1;
                            let value = remaining.get(index).context("missing value for --host")?;
                            host = value.clone();
                        }
                        "--port" => {
                            index += 1;
                            let value = remaining.get(index).context("missing value for --port")?;
                            port = value
                                .parse()
                                .with_context(|| format!("invalid port `{value}`"))?;
                        }
                        other => bail!("unknown argument `{other}` for daemon"),
                    }
                    index += 1;
                }
                Ok(Self {
                    command: Command::Daemon { config, host, port },
                })
            }
            other => bail!("unknown command `{other}`"),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::Cli;

    #[test]
    fn parses_daemon_arguments() {
        let cli = Cli::parse(
            [
                "daemon",
                "--config",
                "/tmp/demo.toml",
                "--host",
                "127.0.0.1",
                "--port",
                "18080",
            ]
            .into_iter()
            .map(str::to_string),
        )
        .expect("cli should parse");

        assert!(matches!(cli.command, super::Command::Daemon { .. }));
    }

    #[test]
    fn daemon_defaults_to_port_18080() {
        let cli = Cli::parse(["daemon"].into_iter().map(str::to_string))
            .expect("cli should parse with defaults");

        match cli.command {
            super::Command::Daemon { host, port, .. } => {
                assert_eq!(host, "0.0.0.0");
                assert_eq!(port, 18080);
            }
        }
    }
}