* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This software is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
use std::{cell::RefCell, env, fs, mem, path::PathBuf, process, sync::Arc};
use anyhow::bail;
use crossbeam::queue::SegQueue;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use tracing::debug;
use xgpu_common::ipc::bytewise::{
BytewiseBuffer, BytewiseReadOwned, BytewiseWrite, BytewiseWriter,
};
const SERIALIZE_BUFFER_SIZE: usize = 100 * 1024 * 1024;
thread_local! {
static SERIALIZE_BUFFER: RefCell<Vec<u8>> = RefCell::new(Vec::with_capacity(SERIALIZE_BUFFER_SIZE));
}
use xgpu_journal::{RecorderConfig, RequestLogger, RequestPlayer};
static REQUEST_LOG_FILE_PATH: Lazy<PathBuf> = Lazy::new(|| {
env::current_dir()
.unwrap_or_else(|_| env::temp_dir())
.join(format!("{}_rec", process::id()))
});
static REQUEST_LOGGER: Lazy<Mutex<Option<RequestLogger>>> = Lazy::new(|| Mutex::new(None));
static REQUEST_PLAYER: Lazy<Mutex<Option<RequestPlayer>>> = Lazy::new(|| Mutex::new(None));
static MINIBATCH_REQUESTS: Lazy<Mutex<Arc<SegQueue<Vec<u8>>>>> =
Lazy::new(|| Mutex::new(Arc::new(SegQueue::new())));
pub fn initialize() -> anyhow::Result<()> {
fs::OpenOptions::new()
.create(true)
.append(true)
.open(REQUEST_LOG_FILE_PATH.as_path())?;
let config = RecorderConfig {
output_path: REQUEST_LOG_FILE_PATH.as_path().to_owned(),
block_size: 4096,
..Default::default()
};
let mut logger = REQUEST_LOGGER.lock();
if logger.is_none() {
*logger = Some(RequestLogger::new(config)?);
debug!(
"[Journal] Initializing RequestLogger for log file: {}",
REQUEST_LOG_FILE_PATH.display()
);
return Ok(());
}
Ok(())
}
pub fn finalize() {
let mut logger = REQUEST_LOGGER.lock();
if let Some(logger) = logger.take() {
let _ = logger.close();
}
*logger = None;
}
pub fn serialize_request<T: BytewiseWrite>(obj: &T) -> anyhow::Result<Vec<u8>> {
SERIALIZE_BUFFER.with(|buf_cell| {
let mut buffer = buf_cell.borrow_mut();
let tmp = buffer.capacity();
if buffer.capacity() < SERIALIZE_BUFFER_SIZE {
buffer.reserve(SERIALIZE_BUFFER_SIZE - tmp);
}
if buffer.len() < SERIALIZE_BUFFER_SIZE {
buffer.resize(SERIALIZE_BUFFER_SIZE, 0);
}
let mut writer = BytewiseBuffer::new(&mut buffer[..]);
obj.write_to(&mut writer).map_err(|e| {
debug!("Object serialization failed: {:?}", e);
anyhow::anyhow!("Object serialization failed: {:?}", e)
})?;
let written_size = writer.written_bytes();
Ok(buffer[..written_size].to_vec())
})
}
pub fn deserialize_request<T>(data: &mut [u8]) -> anyhow::Result<T>
where
T: BytewiseReadOwned,
{
let mut reader = BytewiseBuffer::new(data);
T::read_from_mut(&mut reader).map_err(|e| {
debug!("Object deserialization failed: {:?}", e);
anyhow::anyhow!("Object deserialization failed: {:?}", e)
})
}
pub fn dump_request(req_bytes: Vec<u8>) -> anyhow::Result<()> {
let mut logger = REQUEST_LOGGER.lock();
if let Some(l) = logger.as_mut() {
l.log_request_serialized(req_bytes)?;
}
Ok(())
}
pub fn add_minibatch_request(req_bytes: Vec<u8>) -> anyhow::Result<()> {
let queue = {
let guard = MINIBATCH_REQUESTS.lock();
guard.clone()
};
queue.push(req_bytes);
Ok(())
}
pub fn fetch_minibatch_request() -> anyhow::Result<Option<Vec<u8>>> {
let queue = {
let guard = MINIBATCH_REQUESTS.lock();
guard.clone()
};
Ok(queue.pop())
}
pub fn reset_minibatch_requests() -> anyhow::Result<Arc<SegQueue<Vec<u8>>>> {
let mut guard = MINIBATCH_REQUESTS.lock();
let old = mem::replace(&mut *guard, Arc::new(SegQueue::new()));
Ok(old)
}
pub fn request_player_init() -> anyhow::Result<()> {
let mut player = REQUEST_PLAYER.lock();
*player = Some(RequestPlayer::from_file(REQUEST_LOG_FILE_PATH.as_path())?);
debug!(
"[Journal] Initializing RequestPlayer for log file: {}",
REQUEST_LOG_FILE_PATH.display()
);
Ok(())
}
pub fn load_next_request() -> anyhow::Result<Option<Vec<u8>>> {
let mut player = REQUEST_PLAYER.lock();
match player.as_mut() {
Some(p) => match p.next_block()? {
Some(block) => {
if let Some(payload) = block.payload {
Ok(Some(payload.request_data))
} else {
bail!("Block payload is None")
}
}
None => Ok(None),
},
None => bail!("RequestPlayer is not initialized"),
}
}