// SPDX-License-Identifier: Mulan PSL v2
/*
 * Copyright (c) 2025 Huawei Technologies Co., Ltd.
 * This software is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *         http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

//! New request recorder implementation (based on direct Request serialization)
#![allow(dead_code)]

use std::collections::VecDeque;
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use crate::types::xgpu_recorder::v1::*;

use super::{
    format::{ProtobufReader, ProtobufWriter, RecordWriter},
    RecorderConfig, RecorderError, RecorderStats,
};

/// Get current thread ID (using process PID as alternative implementation)
fn get_current_thread_id() -> u64 {
    use std::thread;
    // Use thread ID hash value as unique identifier
    let thread_id = thread::current().id();
    use std::collections::hash_map::DefaultHasher;
    use std::hash::{Hash, Hasher};
    let mut hasher = DefaultHasher::new();
    thread_id.hash(&mut hasher);
    hasher.finish()
}

/// Create writer
fn create_writer<P: AsRef<Path>>(path: P) -> Result<ProtobufWriter, RecorderError> {
    ProtobufWriter::new(path)
}

/// Create reader
pub fn create_reader<P: AsRef<Path>>(path: P) -> Result<ProtobufReader, RecorderError> {
    ProtobufReader::new(path)
}

/// Create default header
fn create_default_header() -> RecorderHeader {
    RecorderHeader::new()
}

/// Helper function to write record block (using trait object)
fn write_block(writer: &mut dyn RecordWriter, block: &RecordBlock) -> Result<(), RecorderError> {
    writer.write_block(block)
}

/// Helper function to write file footer (using trait object)
fn write_footer(
    writer: &mut dyn RecordWriter,
    footer: &RecorderFooter,
) -> Result<(), RecorderError> {
    writer.write_footer(footer)
}

/// Internal write task
enum WriteTask {
    WriteBlock(RecordBlock),
    Flush,
    Shutdown,
}

/// Async writer thread state
struct AsyncWriter {
    queue: Arc<Mutex<VecDeque<WriteTask>>>,
    worker_thread: Option<JoinHandle<Result<(), RecorderError>>>,
    shutdown_flag: Arc<AtomicBool>,
}

/// New request logger (with async write support)
pub struct RequestLogger {
    config: RecorderConfig,
    block_counter: AtomicU64,
    stats: Arc<Mutex<RecorderStats>>,
    file_header: RecorderHeader,
    async_writer: Option<AsyncWriter>,
    // Synchronous mode fields (when async is disabled)
    file_writer: Option<ProtobufWriter>,
    pending_blocks: VecDeque<RecordBlock>,
    last_flush: Instant,
}

impl RequestLogger {
    /// Create new logger (async mode by default)
    pub fn new(config: RecorderConfig) -> Result<Self, RecorderError> {
        Self::new_with_async(config, true)
    }

    /// Create new logger with async mode control
    pub fn new_with_async(config: RecorderConfig, use_async: bool) -> Result<Self, RecorderError> {
        let file_header = create_default_header();
        let stats = Arc::new(Mutex::new(RecorderStats::default()));

        if use_async {
            // Async mode: create writer thread
            let queue = Arc::new(Mutex::new(VecDeque::new()));
            let shutdown_flag = Arc::new(AtomicBool::new(false));

            let worker_thread = Self::spawn_writer_thread(
                config.clone(),
                queue.clone(),
                shutdown_flag.clone(),
                stats.clone(),
                file_header.clone(),
            )?;

            Ok(Self {
                config,
                block_counter: AtomicU64::new(0),
                stats,
                file_header,
                async_writer: Some(AsyncWriter {
                    queue,
                    worker_thread: Some(worker_thread),
                    shutdown_flag,
                }),
                file_writer: None,
                pending_blocks: VecDeque::new(),
                last_flush: Instant::now(),
            })
        } else {
            // Sync mode: traditional approach
            let mut file_writer = create_writer(&config.output_path)?;
            file_writer.write_header(&file_header)?;

            Ok(Self {
                config,
                block_counter: AtomicU64::new(0),
                stats,
                file_header,
                async_writer: None,
                file_writer: Some(file_writer),
                pending_blocks: VecDeque::new(),
                last_flush: Instant::now(),
            })
        }
    }

    /// Spawn background writer thread
    fn spawn_writer_thread(
        config: RecorderConfig,
        queue: Arc<Mutex<VecDeque<WriteTask>>>,
        shutdown_flag: Arc<AtomicBool>,
        stats: Arc<Mutex<RecorderStats>>,
        file_header: RecorderHeader,
    ) -> Result<JoinHandle<Result<(), RecorderError>>, RecorderError> {
        let mut file_writer = create_writer(&config.output_path)?;
        file_writer.write_header(&file_header)?;

        let handle = thread::spawn(move || -> Result<(), RecorderError> {
            let mut pending_blocks = VecDeque::new();
            let mut last_flush = Instant::now();
            let flush_interval = Duration::from_secs(config.sync_interval_secs); // flush interval from config

            loop {
                // Process tasks from queue
                let task = {
                    let mut q = queue.lock().unwrap();
                    q.pop_front()
                };

                match task {
                    Some(WriteTask::WriteBlock(block)) => {
                        pending_blocks.push_back(block);

                        // Auto flush if buffer is full or timeout
                        if pending_blocks.len() >= config.block_size
                            || last_flush.elapsed() >= flush_interval
                        {
                            Self::flush_blocks(&mut file_writer, &mut pending_blocks, &stats)?;
                            last_flush = Instant::now();
                        }
                    }
                    Some(WriteTask::Flush) => {
                        Self::flush_blocks(&mut file_writer, &mut pending_blocks, &stats)?;
                        last_flush = Instant::now();
                    }
                    Some(WriteTask::Shutdown) => {
                        // Flush all remaining data and exit
                        Self::flush_blocks(&mut file_writer, &mut pending_blocks, &stats)?;
                        break;
                    }
                    None => {
                        // No tasks in queue
                        // Check if shutdown is requested
                        if shutdown_flag.load(Ordering::Relaxed) {
                            // Flush remaining data before exit
                            Self::flush_blocks(&mut file_writer, &mut pending_blocks, &stats)?;
                            break;
                        }

                        // Check if need periodic flush
                        if !pending_blocks.is_empty() && last_flush.elapsed() >= flush_interval {
                            Self::flush_blocks(&mut file_writer, &mut pending_blocks, &stats)?;
                            last_flush = Instant::now();
                        }
                        // Sleep briefly to avoid busy waiting
                        thread::sleep(Duration::from_millis(1));
                    }
                }
            }

            // Write footer before exit
            let total_requests = stats.lock().unwrap().total_requests;
            let footer = RecorderFooter::new(total_requests, [0xff; 32]);

            write_footer(&mut file_writer, &footer)?;
            file_writer.sync()?;

            Ok(())
        });

        Ok(handle)
    }

    /// Helper function to flush blocks
    fn flush_blocks(
        writer: &mut ProtobufWriter,
        blocks: &mut VecDeque<RecordBlock>,
        stats: &Arc<Mutex<RecorderStats>>,
    ) -> Result<(), RecorderError> {
        for block in blocks.drain(..) {
            write_block(writer, &block)?;
            let mut s = stats.lock().unwrap();
            s.total_requests += 1;
        }
        writer.flush()?;
        let mut s = stats.lock().unwrap();
        s.flush_count += 1;
        s.last_flush = Some(Instant::now());
        Ok(())
    }

    /// Log a serialized request with owned data (zero-copy alternative)
    /// Use this when you already have a Vec<u8> to avoid copy
    pub fn log_request_serialized(&mut self, data: Vec<u8>) -> Result<(), RecorderError> {
        let block_id = self.block_counter.fetch_add(1, Ordering::SeqCst);
        let timestamp_ns = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_nanos() as u64;

        let thread_id = get_current_thread_id();
        let crc32_checksum = crc32fast::hash(&data);

        let payload = RecordPayload {
            length: data.len() as u64,
            crc32_checksum,
            request_data: data,
        };

        let block = RecordBlock {
            block_id,
            timestamp_ns,
            thread_id,
            payload: Some(payload),
        };

        if let Some(ref async_writer) = self.async_writer {
            // Async mode: push to queue
            let mut queue = async_writer.queue.lock().unwrap();
            // println!("Logged request with block ID {}", block.block_id);
            queue.push_back(WriteTask::WriteBlock(block));

            Ok(())
        } else {
            // Sync mode
            self.pending_blocks.push_back(block);
            self.flush_sync()
        }
    }

    /// Manually flush buffer
    pub fn flush(&mut self) -> Result<(), RecorderError> {
        if let Some(ref async_writer) = self.async_writer {
            // Async mode: send flush command
            let mut queue = async_writer.queue.lock().unwrap();
            queue.push_back(WriteTask::Flush);
            Ok(())
        } else {
            // Sync mode
            self.flush_sync()
        }
    }

    /// Synchronous flush (for sync mode)
    fn flush_sync(&mut self) -> Result<(), RecorderError> {
        let writer = self
            .file_writer
            .as_mut()
            .ok_or_else(|| RecorderError::FormatError {
                message: "File writer not available".to_string(),
            })?;

        for block in self.pending_blocks.drain(..) {
            write_block(writer, &block)?;
            let mut stats = self.stats.lock().unwrap();
            stats.total_requests += 1;
        }

        writer.flush()?;
        let mut stats = self.stats.lock().unwrap();
        stats.flush_count += 1;
        stats.last_flush = Some(Instant::now());
        self.last_flush = Instant::now();

        Ok(())
    }

    /// Close logger and write file footer
    pub fn close(mut self) -> Result<(), RecorderError> {
        if let Some(mut async_writer) = self.async_writer.take() {
            // Async mode: shutdown writer thread
            {
                let mut queue = async_writer.queue.lock().unwrap();
                queue.push_back(WriteTask::Shutdown);
            }
            async_writer.shutdown_flag.store(true, Ordering::Relaxed);

            // Wait for worker thread to finish
            if let Some(handle) = async_writer.worker_thread.take() {
                handle.join().map_err(|_| RecorderError::FormatError {
                    message: "Worker thread panicked".to_string(),
                })??;
            }
        } else {
            // Sync mode: flush and write footer
            self.flush_sync()?;

            let writer = self
                .file_writer
                .as_mut()
                .ok_or_else(|| RecorderError::FormatError {
                    message: "File writer not available".to_string(),
                })?;

            let total_requests = self.stats.lock().unwrap().total_requests;
            let footer: RecorderFooter = RecorderFooter::new(total_requests, [0xffu8; 32]);
            write_footer(writer, &footer)?;
            writer.sync()?;
        }

        Ok(())
    }

    /// Get recording statistics (returns a copy)
    pub fn stats(&self) -> RecorderStats {
        self.stats.lock().unwrap().clone()
    }

    /// Get file header information
    pub fn header(&self) -> &RecorderHeader {
        &self.file_header
    }
}

impl Drop for RequestLogger {
    fn drop(&mut self) {
        if let Some(ref mut async_writer) = self.async_writer {
            // Signal shutdown
            async_writer.shutdown_flag.store(true, Ordering::Relaxed);
            // Try to wake up the thread with a shutdown task
            if let Ok(mut queue) = async_writer.queue.lock() {
                queue.push_back(WriteTask::Shutdown);
            }
        } else {
            // Sync mode: try to flush
            let _ = self.flush_sync();
        }
    }
}