// SPDX-License-Identifier: Mulan PSL v2
/*
 * Copyright (c) 2025 Huawei Technologies Co., Ltd.
 * This software is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *         http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

//! New request player implementation (based on direct Request deserialization)

use std::collections::VecDeque;
use std::path::Path;

use crate::types::xgpu_recorder::v1::{
    RecordBlock, RecorderFooter, RecorderHeader, RECORD_FILE_MAGIC,
};

use super::format::{ProtobufReader, RecordReader};
use super::logger::create_reader;
use super::RecorderError;

/// Player statistics
#[derive(Debug, Clone, Copy)]
pub struct PlayerStats {
    /// Current read position (number of blocks read)
    pub current_position: u64,
    /// Number of blocks currently cached
    pub cached_blocks: usize,
    /// Maximum cache capacity
    pub cache_capacity: usize,
    /// Whether EOF has been reached
    pub eof_reached: bool,
    /// Whether checksum validation is enabled
    pub validate_checksums: bool,
    /// Auto-prefetch threshold
    pub prefetch_threshold: usize,
    /// Auto-prefetch count
    pub prefetch_count: usize,
}

impl PlayerStats {
    /// Get cache utilization percentage (0.0 to 1.0)
    pub fn cache_utilization(&self) -> f64 {
        if self.cache_capacity == 0 {
            0.0
        } else {
            self.cached_blocks as f64 / self.cache_capacity as f64
        }
    }

    /// Check if cache is full
    pub fn is_cache_full(&self) -> bool {
        self.cached_blocks >= self.cache_capacity
    }

    /// Check if cache is empty
    pub fn is_cache_empty(&self) -> bool {
        self.cached_blocks == 0
    }

    /// Check if auto-prefetch is enabled
    pub fn is_auto_prefetch_enabled(&self) -> bool {
        self.prefetch_threshold > 0
    }

    /// Check if auto-prefetch should trigger now
    pub fn should_prefetch(&self) -> bool {
        self.is_auto_prefetch_enabled() && self.cached_blocks <= self.prefetch_threshold
    }
}

/// New request player (simplified design)
pub struct RequestPlayer {
    file_reader: ProtobufReader,
    file_header: RecorderHeader,
    file_footer: RecorderFooter,
    current_position: u64,
    blocks_cache: VecDeque<RecordBlock>,
    eof_reached: bool,
    validate_checksums: bool,
    cache_capacity: usize,
    /// Threshold for auto-prefetch (when cache drops below this, trigger prefetch)
    prefetch_threshold: usize,
    /// Number of blocks to prefetch when threshold is reached
    prefetch_count: usize,
}

impl RequestPlayer {
    /// Create player from file with default settings
    pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, RecorderError> {
        RequestPlayerBuilder::new()
            .file_path(path)
            .validate_checksums(false)
            .cache_capacity(1024)
            .build()
    }

    /// Create player from file with custom settings
    fn new<P: AsRef<Path>>(
        path: P,
        validate_checksums: bool,
        cache_capacity: usize,
    ) -> Result<Self, RecorderError> {
        let mut file_reader = create_reader(path)?;
        let file_header = file_reader.read_header()?;

        let file_footer = file_reader.read_footer_from_end()?;

        // Validate file header
        Self::validate_header(&file_header)?;

        // Auto-prefetch when cache drops to 20% or below
        let prefetch_threshold = (cache_capacity as f64 * 0.2).max(1.0) as usize;
        // Prefetch 50% of capacity or at least 10 blocks
        let prefetch_count = (cache_capacity as f64 * 0.5).max(10.0) as usize;

        Ok(Self {
            file_reader,
            file_header,
            file_footer,
            current_position: 0,
            blocks_cache: VecDeque::with_capacity(cache_capacity),
            eof_reached: false,
            validate_checksums,
            cache_capacity,
            prefetch_threshold,
            prefetch_count,
        })
    }

    /// Read next record block
    /// This method automatically triggers prefetch when cache is low,
    /// improving sequential read performance.
    pub fn next_block(&mut self) -> Result<Option<RecordBlock>, RecorderError> {
        // Auto-prefetch if cache is running low
        if self.eof_reached && self.blocks_cache.len() <= self.prefetch_threshold {
            // Try to prefetch more blocks in the background
            let _ = self.prefetch(self.prefetch_count);
        }

        // Check cache first
        if let Some(block) = self.blocks_cache.pop_front() {
            self.current_position += 1;
            return Ok(Some(block));
        }

        // Fallback: read directly from file if cache is empty
        match self.file_reader.read_block()? {
            Some(block) => {
                // Validate checksum if enabled
                if self.validate_checksums {
                    self.validate_block_checksum(&block)?;
                }
                self.current_position += 1;
                Ok(Some(block))
            }
            None => {
                self.eof_reached = true;
                Ok(None)
            }
        }
    }

    /// Validate block checksum
    fn validate_block_checksum(&self, block: &RecordBlock) -> Result<(), RecorderError> {
        if let Some(payload) = &block.payload {
            let expected_crc = payload.crc32_checksum;
            let actual_crc = crc32fast::hash(&payload.request_data);
            if actual_crc != expected_crc {
                return Err(RecorderError::Corruption {
                    details: format!(
                        "Checksum mismatch at position {}: expected 0x{:08x}, got 0x{:08x}",
                        self.current_position, expected_crc, actual_crc
                    ),
                });
            }
        }
        Ok(())
    }

    /// Prefetch multiple blocks into cache
    pub fn prefetch(&mut self, count: usize) -> Result<usize, RecorderError> {
        let mut fetched = 0;
        while fetched < count && self.blocks_cache.len() < self.cache_capacity {
            if self.eof_reached {
                break;
            }

            match self.file_reader.read_block()? {
                Some(block) => {
                    if self.validate_checksums {
                        self.validate_block_checksum(&block)?;
                    }
                    self.blocks_cache.push_back(block);
                    fetched += 1;
                }
                None => {
                    self.eof_reached = true;
                    break;
                }
            }
        }
        Ok(fetched)
    }

    /// Get number of cached blocks
    pub fn cached_blocks(&self) -> usize {
        self.blocks_cache.len()
    }

    /// Check if end of file is reached
    pub fn is_eof(&self) -> bool {
        self.eof_reached && self.blocks_cache.is_empty()
    }

    /// Reset to file beginning
    pub fn reset(&mut self) -> Result<(), RecorderError> {
        self.file_reader.seek(0)?;

        // Re-read file header
        self.file_header = self.file_reader.read_header()?;

        // Clear cache and status
        self.blocks_cache.clear();
        self.current_position = 0;
        self.eof_reached = false;

        Ok(())
    }

    /// Get file header information
    pub fn header(&self) -> &RecorderHeader {
        &self.file_header
    }

    /// Get file footer information
    pub fn footer(&self) -> &RecorderFooter {
        &self.file_footer
    }

    /// Get current position
    pub fn position(&self) -> u64 {
        self.current_position
    }

    /// Skip to specific position
    pub fn skip_to(&mut self, position: u64) -> Result<(), RecorderError> {
        if position < self.current_position {
            // Need to reset and skip forward
            self.reset()?;
        }

        let skip_count = position.saturating_sub(self.current_position);
        for _ in 0..skip_count {
            if self.next_block()?.is_none() {
                return Err(RecorderError::FormatError {
                    message: format!("Cannot skip to position {}: EOF reached", position),
                });
            }
        }

        Ok(())
    }

    /// Peek at the next block without consuming it
    pub fn peek(&mut self) -> Result<Option<&RecordBlock>, RecorderError> {
        if self.blocks_cache.is_empty() && !self.eof_reached {
            self.prefetch(1)?;
        }
        Ok(self.blocks_cache.front())
    }

    /// Get total estimated blocks (if available from footer)
    pub fn total_blocks(&mut self) -> u64 {
        self.file_footer.blocks
    }

    /// Collect all remaining blocks into a Vec
    pub fn collect_remaining(&mut self) -> Result<Vec<RecordBlock>, RecorderError> {
        let mut blocks = Vec::new();
        while let Some(block) = self.next_block()? {
            blocks.push(block);
        }
        Ok(blocks)
    }

    /// Collect a specific number of blocks
    pub fn collect_n(&mut self, count: usize) -> Result<Vec<RecordBlock>, RecorderError> {
        let mut blocks = Vec::with_capacity(count);
        for _ in 0..count {
            match self.next_block()? {
                Some(block) => blocks.push(block),
                None => break,
            }
        }
        Ok(blocks)
    }

    /// Filter blocks by thread ID
    pub fn filter_by_thread(
        &mut self,
        thread_id: u64,
    ) -> impl Iterator<Item = Result<RecordBlock, RecorderError>> + '_ {
        std::iter::from_fn(move || {
            loop {
                match self.next_block() {
                    Ok(Some(block)) if block.thread_id == thread_id => {
                        return Some(Ok(block));
                    }
                    Ok(Some(_)) => continue, // Skip blocks from other threads
                    Ok(None) => return None,
                    Err(e) => return Some(Err(e)),
                }
            }
        })
    }

    /// Get statistics about the player state
    pub fn stats(&self) -> PlayerStats {
        PlayerStats {
            current_position: self.current_position,
            cached_blocks: self.blocks_cache.len(),
            cache_capacity: self.cache_capacity,
            eof_reached: self.eof_reached,
            validate_checksums: self.validate_checksums,
            prefetch_threshold: self.prefetch_threshold,
            prefetch_count: self.prefetch_count,
        }
    }

    /// Validate file header
    fn validate_header(header: &RecorderHeader) -> Result<(), RecorderError> {
        // Check magic number
        if header.magic != RECORD_FILE_MAGIC {
            return Err(RecorderError::FormatError {
                message: format!("Invalid magic number: 0x{:08x}", header.magic),
            });
        }

        // Check version compatibility
        if let Some(version) = &header.version {
            if version.major != 1 {
                return Err(RecorderError::FormatError {
                    message: format!("Unsupported major version: {}", version.major),
                });
            }
        }

        Ok(())
    }
}

/// Implement Iterator trait to support iterating record blocks
impl Iterator for RequestPlayer {
    type Item = Result<RecordBlock, RecorderError>;

    fn next(&mut self) -> Option<Self::Item> {
        match self.next_block() {
            Ok(Some(block)) => Some(Ok(block)),
            Ok(None) => None,
            Err(e) => Some(Err(e)),
        }
    }
}

/// Player builder for flexible configuration
#[derive(Debug, Clone)]
pub struct RequestPlayerBuilder {
    file_path: Option<String>,
    validate_checksums: bool,
    cache_capacity: usize,
}

impl Default for RequestPlayerBuilder {
    fn default() -> Self {
        Self::new()
    }
}

impl RequestPlayerBuilder {
    /// Create a new builder with default settings
    pub fn new() -> Self {
        Self {
            file_path: None,
            validate_checksums: false,
            cache_capacity: 1024,
        }
    }

    /// Enable or disable checksum validation
    ///
    /// When enabled, each block's CRC32 checksum will be validated
    /// Default: true
    pub fn validate_checksums(mut self, validate: bool) -> Self {
        self.validate_checksums = validate;
        self
    }

    /// Set the cache capacity (maximum number of blocks to cache)
    ///
    /// Larger cache improves sequential read performance but uses more memory
    /// Default: 1024 blocks
    pub fn cache_capacity(mut self, capacity: usize) -> Self {
        self.cache_capacity = capacity;
        self
    }

    pub fn file_path<P: AsRef<Path>>(mut self, path: P) -> Self {
        self.file_path = Some(path.as_ref().to_string_lossy().to_string());
        self
    }

    /// Build the RequestPlayer with configured settings
    pub fn build(self) -> Result<RequestPlayer, RecorderError> {
        let file_path = self.file_path.ok_or_else(|| {
            RecorderError::Io(std::io::Error::new(
                std::io::ErrorKind::NotFound,
                "File path not found",
            ))
        })?;

        RequestPlayer::new(file_path, self.validate_checksums, self.cache_capacity)
    }
}