* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This software is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
use std::collections::VecDeque;
use std::path::Path;
use crate::types::xgpu_recorder::v1::{
RecordBlock, RecorderFooter, RecorderHeader, RECORD_FILE_MAGIC,
};
use super::format::{ProtobufReader, RecordReader};
use super::logger::create_reader;
use super::RecorderError;
#[derive(Debug, Clone, Copy)]
pub struct PlayerStats {
pub current_position: u64,
pub cached_blocks: usize,
pub cache_capacity: usize,
pub eof_reached: bool,
pub validate_checksums: bool,
pub prefetch_threshold: usize,
pub prefetch_count: usize,
}
impl PlayerStats {
pub fn cache_utilization(&self) -> f64 {
if self.cache_capacity == 0 {
0.0
} else {
self.cached_blocks as f64 / self.cache_capacity as f64
}
}
pub fn is_cache_full(&self) -> bool {
self.cached_blocks >= self.cache_capacity
}
pub fn is_cache_empty(&self) -> bool {
self.cached_blocks == 0
}
pub fn is_auto_prefetch_enabled(&self) -> bool {
self.prefetch_threshold > 0
}
pub fn should_prefetch(&self) -> bool {
self.is_auto_prefetch_enabled() && self.cached_blocks <= self.prefetch_threshold
}
}
pub struct RequestPlayer {
file_reader: ProtobufReader,
file_header: RecorderHeader,
file_footer: RecorderFooter,
current_position: u64,
blocks_cache: VecDeque<RecordBlock>,
eof_reached: bool,
validate_checksums: bool,
cache_capacity: usize,
prefetch_threshold: usize,
prefetch_count: usize,
}
impl RequestPlayer {
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, RecorderError> {
RequestPlayerBuilder::new()
.file_path(path)
.validate_checksums(false)
.cache_capacity(1024)
.build()
}
fn new<P: AsRef<Path>>(
path: P,
validate_checksums: bool,
cache_capacity: usize,
) -> Result<Self, RecorderError> {
let mut file_reader = create_reader(path)?;
let file_header = file_reader.read_header()?;
let file_footer = file_reader.read_footer_from_end()?;
Self::validate_header(&file_header)?;
let prefetch_threshold = (cache_capacity as f64 * 0.2).max(1.0) as usize;
let prefetch_count = (cache_capacity as f64 * 0.5).max(10.0) as usize;
Ok(Self {
file_reader,
file_header,
file_footer,
current_position: 0,
blocks_cache: VecDeque::with_capacity(cache_capacity),
eof_reached: false,
validate_checksums,
cache_capacity,
prefetch_threshold,
prefetch_count,
})
}
pub fn next_block(&mut self) -> Result<Option<RecordBlock>, RecorderError> {
if self.eof_reached && self.blocks_cache.len() <= self.prefetch_threshold {
let _ = self.prefetch(self.prefetch_count);
}
if let Some(block) = self.blocks_cache.pop_front() {
self.current_position += 1;
return Ok(Some(block));
}
match self.file_reader.read_block()? {
Some(block) => {
if self.validate_checksums {
self.validate_block_checksum(&block)?;
}
self.current_position += 1;
Ok(Some(block))
}
None => {
self.eof_reached = true;
Ok(None)
}
}
}
fn validate_block_checksum(&self, block: &RecordBlock) -> Result<(), RecorderError> {
if let Some(payload) = &block.payload {
let expected_crc = payload.crc32_checksum;
let actual_crc = crc32fast::hash(&payload.request_data);
if actual_crc != expected_crc {
return Err(RecorderError::Corruption {
details: format!(
"Checksum mismatch at position {}: expected 0x{:08x}, got 0x{:08x}",
self.current_position, expected_crc, actual_crc
),
});
}
}
Ok(())
}
pub fn prefetch(&mut self, count: usize) -> Result<usize, RecorderError> {
let mut fetched = 0;
while fetched < count && self.blocks_cache.len() < self.cache_capacity {
if self.eof_reached {
break;
}
match self.file_reader.read_block()? {
Some(block) => {
if self.validate_checksums {
self.validate_block_checksum(&block)?;
}
self.blocks_cache.push_back(block);
fetched += 1;
}
None => {
self.eof_reached = true;
break;
}
}
}
Ok(fetched)
}
pub fn cached_blocks(&self) -> usize {
self.blocks_cache.len()
}
pub fn is_eof(&self) -> bool {
self.eof_reached && self.blocks_cache.is_empty()
}
pub fn reset(&mut self) -> Result<(), RecorderError> {
self.file_reader.seek(0)?;
self.file_header = self.file_reader.read_header()?;
self.blocks_cache.clear();
self.current_position = 0;
self.eof_reached = false;
Ok(())
}
pub fn header(&self) -> &RecorderHeader {
&self.file_header
}
pub fn footer(&self) -> &RecorderFooter {
&self.file_footer
}
pub fn position(&self) -> u64 {
self.current_position
}
pub fn skip_to(&mut self, position: u64) -> Result<(), RecorderError> {
if position < self.current_position {
self.reset()?;
}
let skip_count = position.saturating_sub(self.current_position);
for _ in 0..skip_count {
if self.next_block()?.is_none() {
return Err(RecorderError::FormatError {
message: format!("Cannot skip to position {}: EOF reached", position),
});
}
}
Ok(())
}
pub fn peek(&mut self) -> Result<Option<&RecordBlock>, RecorderError> {
if self.blocks_cache.is_empty() && !self.eof_reached {
self.prefetch(1)?;
}
Ok(self.blocks_cache.front())
}
pub fn total_blocks(&mut self) -> u64 {
self.file_footer.blocks
}
pub fn collect_remaining(&mut self) -> Result<Vec<RecordBlock>, RecorderError> {
let mut blocks = Vec::new();
while let Some(block) = self.next_block()? {
blocks.push(block);
}
Ok(blocks)
}
pub fn collect_n(&mut self, count: usize) -> Result<Vec<RecordBlock>, RecorderError> {
let mut blocks = Vec::with_capacity(count);
for _ in 0..count {
match self.next_block()? {
Some(block) => blocks.push(block),
None => break,
}
}
Ok(blocks)
}
pub fn filter_by_thread(
&mut self,
thread_id: u64,
) -> impl Iterator<Item = Result<RecordBlock, RecorderError>> + '_ {
std::iter::from_fn(move || {
loop {
match self.next_block() {
Ok(Some(block)) if block.thread_id == thread_id => {
return Some(Ok(block));
}
Ok(Some(_)) => continue,
Ok(None) => return None,
Err(e) => return Some(Err(e)),
}
}
})
}
pub fn stats(&self) -> PlayerStats {
PlayerStats {
current_position: self.current_position,
cached_blocks: self.blocks_cache.len(),
cache_capacity: self.cache_capacity,
eof_reached: self.eof_reached,
validate_checksums: self.validate_checksums,
prefetch_threshold: self.prefetch_threshold,
prefetch_count: self.prefetch_count,
}
}
fn validate_header(header: &RecorderHeader) -> Result<(), RecorderError> {
if header.magic != RECORD_FILE_MAGIC {
return Err(RecorderError::FormatError {
message: format!("Invalid magic number: 0x{:08x}", header.magic),
});
}
if let Some(version) = &header.version {
if version.major != 1 {
return Err(RecorderError::FormatError {
message: format!("Unsupported major version: {}", version.major),
});
}
}
Ok(())
}
}
impl Iterator for RequestPlayer {
type Item = Result<RecordBlock, RecorderError>;
fn next(&mut self) -> Option<Self::Item> {
match self.next_block() {
Ok(Some(block)) => Some(Ok(block)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}
#[derive(Debug, Clone)]
pub struct RequestPlayerBuilder {
file_path: Option<String>,
validate_checksums: bool,
cache_capacity: usize,
}
impl Default for RequestPlayerBuilder {
fn default() -> Self {
Self::new()
}
}
impl RequestPlayerBuilder {
pub fn new() -> Self {
Self {
file_path: None,
validate_checksums: false,
cache_capacity: 1024,
}
}
pub fn validate_checksums(mut self, validate: bool) -> Self {
self.validate_checksums = validate;
self
}
pub fn cache_capacity(mut self, capacity: usize) -> Self {
self.cache_capacity = capacity;
self
}
pub fn file_path<P: AsRef<Path>>(mut self, path: P) -> Self {
self.file_path = Some(path.as_ref().to_string_lossy().to_string());
self
}
pub fn build(self) -> Result<RequestPlayer, RecorderError> {
let file_path = self.file_path.ok_or_else(|| {
RecorderError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
"File path not found",
))
})?;
RequestPlayer::new(file_path, self.validate_checksums, self.cache_capacity)
}
}