// SPDX-License-Identifier: Mulan PSL v2
/*
 * Copyright (c) 2025 Huawei Technologies Co., Ltd.
 * This software is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *         http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

use std::{cell::RefCell, env, fs, mem, path::PathBuf, process, sync::Arc};

use anyhow::bail;
use crossbeam::queue::SegQueue;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use tracing::debug;

use xgpu_common::ipc::bytewise::{
    BytewiseBuffer, BytewiseReadOwned, BytewiseWrite, BytewiseWriter,
};

/// Thread-local reusable buffer to avoid frequent allocations
/// Each thread maintains its own 2MB buffer that gets reused across serialization calls
const SERIALIZE_BUFFER_SIZE: usize = 100 * 1024 * 1024;

thread_local! {
    static SERIALIZE_BUFFER: RefCell<Vec<u8>> = RefCell::new(Vec::with_capacity(SERIALIZE_BUFFER_SIZE));
}
use xgpu_journal::{RecorderConfig, RequestLogger, RequestPlayer};

static REQUEST_LOG_FILE_PATH: Lazy<PathBuf> = Lazy::new(|| {
    env::current_dir()
        .unwrap_or_else(|_| env::temp_dir())
        .join(format!("{}_rec", process::id()))
});
static REQUEST_LOGGER: Lazy<Mutex<Option<RequestLogger>>> = Lazy::new(|| Mutex::new(None));
static REQUEST_PLAYER: Lazy<Mutex<Option<RequestPlayer>>> = Lazy::new(|| Mutex::new(None));

static MINIBATCH_REQUESTS: Lazy<Mutex<Arc<SegQueue<Vec<u8>>>>> =
    Lazy::new(|| Mutex::new(Arc::new(SegQueue::new())));

pub fn initialize() -> anyhow::Result<()> {
    fs::OpenOptions::new()
        .create(true)
        .append(true)
        .open(REQUEST_LOG_FILE_PATH.as_path())?;

    let config = RecorderConfig {
        output_path: REQUEST_LOG_FILE_PATH.as_path().to_owned(),
        block_size: 4096,
        ..Default::default()
    };

    let mut logger = REQUEST_LOGGER.lock();
    if logger.is_none() {
        *logger = Some(RequestLogger::new(config)?);
        debug!(
            "[Journal] Initializing RequestLogger for log file: {}",
            REQUEST_LOG_FILE_PATH.display()
        );
        return Ok(());
    }

    Ok(())
}

pub fn finalize() {
    let mut logger = REQUEST_LOGGER.lock();

    if let Some(logger) = logger.take() {
        let _ = logger.close();
    }
    *logger = None;

    // let _ = fs::remove_file(REQUEST_LOG_FILE_PATH.as_path());
}

/// Convenience function: Serialize any object that implements the BytewiseWrite trait
/// Uses thread-local buffer to minimize allocations - only allocates for the final result
pub fn serialize_request<T: BytewiseWrite>(obj: &T) -> anyhow::Result<Vec<u8>> {
    SERIALIZE_BUFFER.with(|buf_cell| {
        let mut buffer = buf_cell.borrow_mut();

        // Initialize buffer capacity if needed (only happens once per thread)
        let tmp = buffer.capacity();
        if buffer.capacity() < SERIALIZE_BUFFER_SIZE {
            buffer.reserve(SERIALIZE_BUFFER_SIZE - tmp);
        }

        // Extend with zeros to the required size
        if buffer.len() < SERIALIZE_BUFFER_SIZE {
            buffer.resize(SERIALIZE_BUFFER_SIZE, 0);
        }

        let mut writer = BytewiseBuffer::new(&mut buffer[..]);

        obj.write_to(&mut writer).map_err(|e| {
            debug!("Object serialization failed: {:?}", e);
            anyhow::anyhow!("Object serialization failed: {:?}", e)
        })?;

        let written_size = writer.written_bytes();

        // Only allocate for the actual serialized data (typically much smaller than 2MB)
        Ok(buffer[..written_size].to_vec())
    })
}

/// Convenience function: Deserialize any object that implements the BytewiseReadOwned trait
pub fn deserialize_request<T>(data: &mut [u8]) -> anyhow::Result<T>
where
    T: BytewiseReadOwned,
{
    let mut reader = BytewiseBuffer::new(data);

    T::read_from_mut(&mut reader).map_err(|e| {
        debug!("Object deserialization failed: {:?}", e);
        anyhow::anyhow!("Object deserialization failed: {:?}", e)
    })
}

pub fn dump_request(req_bytes: Vec<u8>) -> anyhow::Result<()> {
    let mut logger = REQUEST_LOGGER.lock();
    if let Some(l) = logger.as_mut() {
        l.log_request_serialized(req_bytes)?;
    }

    Ok(())
}

pub fn add_minibatch_request(req_bytes: Vec<u8>) -> anyhow::Result<()> {
    let queue = {
        let guard = MINIBATCH_REQUESTS.lock();
        guard.clone()
    };
    queue.push(req_bytes);

    Ok(())
}

pub fn fetch_minibatch_request() -> anyhow::Result<Option<Vec<u8>>> {
    let queue = {
        let guard = MINIBATCH_REQUESTS.lock();
        guard.clone()
    };

    Ok(queue.pop())
}

pub fn reset_minibatch_requests() -> anyhow::Result<Arc<SegQueue<Vec<u8>>>> {
    let mut guard = MINIBATCH_REQUESTS.lock();
    let old = mem::replace(&mut *guard, Arc::new(SegQueue::new()));

    Ok(old)
}

pub fn request_player_init() -> anyhow::Result<()> {
    let mut player = REQUEST_PLAYER.lock();
    *player = Some(RequestPlayer::from_file(REQUEST_LOG_FILE_PATH.as_path())?);
    debug!(
        "[Journal] Initializing RequestPlayer for log file: {}",
        REQUEST_LOG_FILE_PATH.display()
    );

    Ok(())
}

pub fn load_next_request() -> anyhow::Result<Option<Vec<u8>>> {
    let mut player = REQUEST_PLAYER.lock();
    match player.as_mut() {
        Some(p) => match p.next_block()? {
            Some(block) => {
                if let Some(payload) = block.payload {
                    Ok(Some(payload.request_data))
                } else {
                    bail!("Block payload is None")
                }
            }
            None => Ok(None),
        },
        None => bail!("RequestPlayer is not initialized"),
    }
}