* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This software is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#![allow(dead_code)]
use std::collections::VecDeque;
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use crate::types::xgpu_recorder::v1::*;
use super::{
format::{ProtobufReader, ProtobufWriter, RecordWriter},
RecorderConfig, RecorderError, RecorderStats,
};
fn get_current_thread_id() -> u64 {
use std::thread;
let thread_id = thread::current().id();
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
thread_id.hash(&mut hasher);
hasher.finish()
}
fn create_writer<P: AsRef<Path>>(path: P) -> Result<ProtobufWriter, RecorderError> {
ProtobufWriter::new(path)
}
pub fn create_reader<P: AsRef<Path>>(path: P) -> Result<ProtobufReader, RecorderError> {
ProtobufReader::new(path)
}
fn create_default_header() -> RecorderHeader {
RecorderHeader::new()
}
fn write_block(writer: &mut dyn RecordWriter, block: &RecordBlock) -> Result<(), RecorderError> {
writer.write_block(block)
}
fn write_footer(
writer: &mut dyn RecordWriter,
footer: &RecorderFooter,
) -> Result<(), RecorderError> {
writer.write_footer(footer)
}
enum WriteTask {
WriteBlock(RecordBlock),
Flush,
Shutdown,
}
struct AsyncWriter {
queue: Arc<Mutex<VecDeque<WriteTask>>>,
worker_thread: Option<JoinHandle<Result<(), RecorderError>>>,
shutdown_flag: Arc<AtomicBool>,
}
pub struct RequestLogger {
config: RecorderConfig,
block_counter: AtomicU64,
stats: Arc<Mutex<RecorderStats>>,
file_header: RecorderHeader,
async_writer: Option<AsyncWriter>,
file_writer: Option<ProtobufWriter>,
pending_blocks: VecDeque<RecordBlock>,
last_flush: Instant,
}
impl RequestLogger {
pub fn new(config: RecorderConfig) -> Result<Self, RecorderError> {
Self::new_with_async(config, true)
}
pub fn new_with_async(config: RecorderConfig, use_async: bool) -> Result<Self, RecorderError> {
let file_header = create_default_header();
let stats = Arc::new(Mutex::new(RecorderStats::default()));
if use_async {
let queue = Arc::new(Mutex::new(VecDeque::new()));
let shutdown_flag = Arc::new(AtomicBool::new(false));
let worker_thread = Self::spawn_writer_thread(
config.clone(),
queue.clone(),
shutdown_flag.clone(),
stats.clone(),
file_header.clone(),
)?;
Ok(Self {
config,
block_counter: AtomicU64::new(0),
stats,
file_header,
async_writer: Some(AsyncWriter {
queue,
worker_thread: Some(worker_thread),
shutdown_flag,
}),
file_writer: None,
pending_blocks: VecDeque::new(),
last_flush: Instant::now(),
})
} else {
let mut file_writer = create_writer(&config.output_path)?;
file_writer.write_header(&file_header)?;
Ok(Self {
config,
block_counter: AtomicU64::new(0),
stats,
file_header,
async_writer: None,
file_writer: Some(file_writer),
pending_blocks: VecDeque::new(),
last_flush: Instant::now(),
})
}
}
fn spawn_writer_thread(
config: RecorderConfig,
queue: Arc<Mutex<VecDeque<WriteTask>>>,
shutdown_flag: Arc<AtomicBool>,
stats: Arc<Mutex<RecorderStats>>,
file_header: RecorderHeader,
) -> Result<JoinHandle<Result<(), RecorderError>>, RecorderError> {
let mut file_writer = create_writer(&config.output_path)?;
file_writer.write_header(&file_header)?;
let handle = thread::spawn(move || -> Result<(), RecorderError> {
let mut pending_blocks = VecDeque::new();
let mut last_flush = Instant::now();
let flush_interval = Duration::from_secs(config.sync_interval_secs);
loop {
let task = {
let mut q = queue.lock().unwrap();
q.pop_front()
};
match task {
Some(WriteTask::WriteBlock(block)) => {
pending_blocks.push_back(block);
if pending_blocks.len() >= config.block_size
|| last_flush.elapsed() >= flush_interval
{
Self::flush_blocks(&mut file_writer, &mut pending_blocks, &stats)?;
last_flush = Instant::now();
}
}
Some(WriteTask::Flush) => {
Self::flush_blocks(&mut file_writer, &mut pending_blocks, &stats)?;
last_flush = Instant::now();
}
Some(WriteTask::Shutdown) => {
Self::flush_blocks(&mut file_writer, &mut pending_blocks, &stats)?;
break;
}
None => {
if shutdown_flag.load(Ordering::Relaxed) {
Self::flush_blocks(&mut file_writer, &mut pending_blocks, &stats)?;
break;
}
if !pending_blocks.is_empty() && last_flush.elapsed() >= flush_interval {
Self::flush_blocks(&mut file_writer, &mut pending_blocks, &stats)?;
last_flush = Instant::now();
}
thread::sleep(Duration::from_millis(1));
}
}
}
let total_requests = stats.lock().unwrap().total_requests;
let footer = RecorderFooter::new(total_requests, [0xff; 32]);
write_footer(&mut file_writer, &footer)?;
file_writer.sync()?;
Ok(())
});
Ok(handle)
}
fn flush_blocks(
writer: &mut ProtobufWriter,
blocks: &mut VecDeque<RecordBlock>,
stats: &Arc<Mutex<RecorderStats>>,
) -> Result<(), RecorderError> {
for block in blocks.drain(..) {
write_block(writer, &block)?;
let mut s = stats.lock().unwrap();
s.total_requests += 1;
}
writer.flush()?;
let mut s = stats.lock().unwrap();
s.flush_count += 1;
s.last_flush = Some(Instant::now());
Ok(())
}
pub fn log_request_serialized(&mut self, data: Vec<u8>) -> Result<(), RecorderError> {
let block_id = self.block_counter.fetch_add(1, Ordering::SeqCst);
let timestamp_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
let thread_id = get_current_thread_id();
let crc32_checksum = crc32fast::hash(&data);
let payload = RecordPayload {
length: data.len() as u64,
crc32_checksum,
request_data: data,
};
let block = RecordBlock {
block_id,
timestamp_ns,
thread_id,
payload: Some(payload),
};
if let Some(ref async_writer) = self.async_writer {
let mut queue = async_writer.queue.lock().unwrap();
queue.push_back(WriteTask::WriteBlock(block));
Ok(())
} else {
self.pending_blocks.push_back(block);
self.flush_sync()
}
}
pub fn flush(&mut self) -> Result<(), RecorderError> {
if let Some(ref async_writer) = self.async_writer {
let mut queue = async_writer.queue.lock().unwrap();
queue.push_back(WriteTask::Flush);
Ok(())
} else {
self.flush_sync()
}
}
fn flush_sync(&mut self) -> Result<(), RecorderError> {
let writer = self
.file_writer
.as_mut()
.ok_or_else(|| RecorderError::FormatError {
message: "File writer not available".to_string(),
})?;
for block in self.pending_blocks.drain(..) {
write_block(writer, &block)?;
let mut stats = self.stats.lock().unwrap();
stats.total_requests += 1;
}
writer.flush()?;
let mut stats = self.stats.lock().unwrap();
stats.flush_count += 1;
stats.last_flush = Some(Instant::now());
self.last_flush = Instant::now();
Ok(())
}
pub fn close(mut self) -> Result<(), RecorderError> {
if let Some(mut async_writer) = self.async_writer.take() {
{
let mut queue = async_writer.queue.lock().unwrap();
queue.push_back(WriteTask::Shutdown);
}
async_writer.shutdown_flag.store(true, Ordering::Relaxed);
if let Some(handle) = async_writer.worker_thread.take() {
handle.join().map_err(|_| RecorderError::FormatError {
message: "Worker thread panicked".to_string(),
})??;
}
} else {
self.flush_sync()?;
let writer = self
.file_writer
.as_mut()
.ok_or_else(|| RecorderError::FormatError {
message: "File writer not available".to_string(),
})?;
let total_requests = self.stats.lock().unwrap().total_requests;
let footer: RecorderFooter = RecorderFooter::new(total_requests, [0xffu8; 32]);
write_footer(writer, &footer)?;
writer.sync()?;
}
Ok(())
}
pub fn stats(&self) -> RecorderStats {
self.stats.lock().unwrap().clone()
}
pub fn header(&self) -> &RecorderHeader {
&self.file_header
}
}
impl Drop for RequestLogger {
fn drop(&mut self) {
if let Some(ref mut async_writer) = self.async_writer {
async_writer.shutdown_flag.store(true, Ordering::Relaxed);
if let Ok(mut queue) = async_writer.queue.lock() {
queue.push_back(WriteTask::Shutdown);
}
} else {
let _ = self.flush_sync();
}
}
}