//! Blocking work scheduling for N-API exports.
//!
//! # Overview
//! Runs CPU-bound or blocking Rust work on libuv's thread pool via napi's
//! `Task` trait, with profiling and cancellation support.
//!
//! # Cancellation
//! Pass a `CancelToken` to blocking tasks. Work must check
//! `CancelToken::heartbeat()` periodically to respect cancellation.
//!
//! # Profiling
//! Samples are always collected into a circular buffer. Call
//! `get_work_profile()` to retrieve the last N seconds of data.
//!
//! # Usage
//! ```ignore
//! use crate::work::{blocking_task, CancelToken};
//!
//! #[napi]
//! fn my_heavy_work(signal: Option<AbortSignal>) -> AsyncTask<impl Task<...>> {
//! let ct = CancelToken::new(None, signal);
//! blocking_task("my_work", ct, |ct| {
//! ct.heartbeat()?;
//! // ... heavy computation ...
//! Ok(result)
//! })
//! }
//! ```
use std::future::Future;
use napi::{Env, Error, Result, Task, bindgen_prelude::*};
use pi_shell::cancel as core_cancel;
use crate::prof::profile_region;
// ─────────────────────────────────────────────────────────────────────────────
// Cancellation
// ─────────────────────────────────────────────────────────────────────────────
/// Reason for task abortion.
#[derive(Debug, Clone, Copy)]
pub enum AbortReason {
Unknown,
Timeout,
Signal,
User,
}
impl From<core_cancel::AbortReason> for AbortReason {
fn from(value: core_cancel::AbortReason) -> Self {
match value {
core_cancel::AbortReason::Unknown => Self::Unknown,
core_cancel::AbortReason::Timeout => Self::Timeout,
core_cancel::AbortReason::Signal => Self::Signal,
core_cancel::AbortReason::User => Self::User,
}
}
}
impl From<AbortReason> for core_cancel::AbortReason {
fn from(value: AbortReason) -> Self {
match value {
AbortReason::Unknown => Self::Unknown,
AbortReason::Timeout => Self::Timeout,
AbortReason::Signal => Self::Signal,
AbortReason::User => Self::User,
}
}
}
/// Token for cooperative cancellation of blocking work.
///
/// Call `heartbeat()` periodically inside long-running work to check for
/// cancellation requests from timeouts or abort signals.
#[derive(Clone, Default)]
pub struct CancelToken {
core: core_cancel::CancelToken,
}
impl From<()> for CancelToken {
fn from((): ()) -> Self {
Self::default()
}
}
impl CancelToken {
/// Create a new cancel token from optional timeout and abort signal.
pub fn new(timeout_ms: Option<u32>, signal: Option<Unknown>) -> Self {
let mut result = Self { core: core_cancel::CancelToken::new(timeout_ms) };
if let Some(signal) = signal.and_then(|value| AbortSignal::from_unknown(value).ok()) {
let abort_token = result.emplace_abort_token();
signal.on_abort(move || abort_token.abort(AbortReason::Signal));
}
result
}
/// Check if cancellation has been requested.
///
/// Returns `Ok(())` if work should continue, or an error if cancelled.
/// Call this periodically in long-running loops.
pub fn heartbeat(&self) -> Result<()> {
self
.core
.heartbeat()
.map_err(|err| Error::from_reason(err.to_string()))
}
/// Wait for the cancel token to be aborted.
pub async fn wait(&self) -> AbortReason {
self.core.wait().await.into()
}
/// Get an abort token for external cancellation.
pub fn abort_token(&self) -> AbortToken {
AbortToken(self.core.abort_token())
}
/// Emplaces a cancel token if there is none, returns the abort token.
pub fn emplace_abort_token(&mut self) -> AbortToken {
AbortToken(self.core.emplace_abort_token())
}
/// Check if already aborted (non-blocking).
pub fn aborted(&self) -> bool {
self.core.aborted()
}
pub fn into_core(self) -> core_cancel::CancelToken {
self.core
}
}
/// Token for requesting cancellation from outside the task.
#[derive(Clone, Default)]
pub struct AbortToken(core_cancel::AbortToken);
impl AbortToken {
/// Request cancellation of the associated task.
pub fn abort(&self, reason: AbortReason) {
self.0.abort(reason.into());
}
}
// ─────────────────────────────────────────────────────────────────────────────
// Blocking Task - libuv thread pool integration
// ─────────────────────────────────────────────────────────────────────────────
/// Task that runs blocking work on libuv's thread pool with profiling.
///
/// This implements napi's `Task` trait, running `compute()` on a libuv worker
/// thread and `resolve()` on the main JS thread.
pub struct Blocking<T>
where
T: Send + 'static,
{
tag: &'static str,
cancel_token: CancelToken,
work: Option<Box<dyn FnOnce(CancelToken) -> Result<T> + Send>>,
}
impl<T> Task for Blocking<T>
where
T: ToNapiValue + Send + 'static + TypeName,
{
type JsValue = T;
type Output = T;
fn compute(&mut self) -> Result<Self::Output> {
let _guard = profile_region(self.tag);
let work = self
.work
.take()
.ok_or_else(|| Error::from_reason("BlockingTask: work already consumed"))?;
work(self.cancel_token.clone())
}
fn resolve(&mut self, _env: Env, output: Self::Output) -> Result<Self::JsValue> {
Ok(output)
}
}
pub type Promise<T> = AsyncTask<Blocking<T>>;
/// Create an `AsyncTask` that runs blocking work on libuv's thread pool.
///
/// Returns `AsyncTask<BlockingTask<T>>` which can be returned directly from
/// `#[napi]` functions - it becomes `Promise<T>` on the JS side.
///
/// # Arguments
/// - `tag`: Profiling tag for this work (appears in flamegraphs)
/// - `cancel_token`: Token for cooperative cancellation
/// - `work`: Closure that performs the blocking work
///
/// # Example
/// ```ignore
/// #[napi]
/// fn heavy_computation(signal: Option<AbortSignal>) -> AsyncTask<impl Task<...>> {
/// let ct = CancelToken::new(None, signal);
/// blocking_task("heavy_computation", ct, |ct| {
/// for i in 0..1000 {
/// ct.heartbeat()?; // Check for cancellation
/// // ... do work ...
/// }
/// Ok(result)
/// })
/// }
/// ```
pub fn blocking<T, F>(
tag: &'static str,
cancel_token: impl Into<CancelToken>,
work: F,
) -> AsyncTask<Blocking<T>>
where
F: FnOnce(CancelToken) -> Result<T> + Send + 'static,
T: ToNapiValue + TypeName + Send + 'static,
{
AsyncTask::new(Blocking { tag, cancel_token: cancel_token.into(), work: Some(Box::new(work)) })
}
// ─────────────────────────────────────────────────────────────────────────────
// Async Task - Tokio runtime integration
// ─────────────────────────────────────────────────────────────────────────────
/// Run an async task on Tokio's runtime with profiling.
///
/// Use this for operations that need to `.await` (async I/O, `select!`, etc.).
/// For CPU-bound blocking work, use [`blocking_task`] instead.
///
/// # Arguments
/// - `env`: N-API environment (needed for `spawn_future`)
/// - `tag`: Profiling tag for this work
/// - `work`: Async closure that performs the work
///
/// # Example
/// ```ignore
/// #[napi]
/// fn run_async_io<'e>(env: &'e Env) -> Result<PromiseRaw<'e, String>> {
/// async_task(env, "async_io", async move {
/// let data = fetch_data().await?;
/// Ok(data)
/// })
/// }
/// ```
pub fn future<'env, T, Fut>(
env: &'env Env,
tag: &'static str,
work: Fut,
) -> Result<PromiseRaw<'env, T>>
where
Fut: Future<Output = Result<T>> + Send + 'static,
T: ToNapiValue + Send + 'static,
{
env.spawn_future(async move {
let _guard = profile_region(tag);
work.await
})
}