Native Rust task execution and cancellation (pi-natives)
This document describes how crates/pi-natives schedules native work and how cancellation flows from JS options (timeoutMs, AbortSignal) into Rust execution.
Implementation files
crates/pi-natives/src/task.rscrates/pi-natives/src/grep.rscrates/pi-natives/src/glob.rscrates/pi-natives/src/fd.rscrates/pi-natives/src/ast.rscrates/pi-natives/src/shell.rscrates/pi-natives/src/pty.rscrates/pi-natives/src/html.rscrates/pi-natives/src/sixel.rscrates/pi-natives/src/clipboard.rscrates/pi-natives/src/text.rscrates/pi-natives/src/ps.rs
Core primitives (task.rs)
task.rs defines:
-
task::blocking(tag, cancel_token, work)- Wraps
napi::AsyncTask/Task. compute()runs on libuv worker threads.- Returns a JS
Promise<T>for exported functions. - Records a profiling sample through
profile_region(tag).
- Wraps
-
task::future(env, tag, work)- Wraps
env.spawn_future(...). - Runs async work on Tokio's runtime.
- Returns
PromiseRaw<'env, T>. - Records a profiling sample through
profile_region(tag).
- Wraps
-
CancelToken/AbortToken/AbortReasonCancelToken::new(timeout_ms, signal)combines an optional deadline and optional JSAbortSignalconverted fromUnknown.CancelToken::heartbeat()is cooperative cancellation for blocking loops.CancelToken::wait()asynchronously waits for signal or timeout.CancelToken::emplace_abort_token()creates an abortable flag whenAbortSignal,Shell.abort(), or an internal bridge needs one.AbortToken::abort(reason)lets external code request abort.
blocking vs future: execution model and selection
Use task::blocking
Use when work is CPU-heavy or fundamentally synchronous/blocking:
- regex/file scanning (
grep,glob,fuzzyFind) - ast-grep search/edit worker work
- HTML conversion
- clipboard image read
Behavior:
- Work closure receives a cloned
CancelToken. - Cancellation is only observed where code checks
ct.heartbeat()?. - Closure
Err(...)rejects the JS promise.
Use task::future
Use when work must await async operations:
- shell session orchestration (
Shell.run,executeShell) - PTY outer promise (
PtySession.start) before it entersspawn_blocking - async task orchestration that must bridge completion and cancellation
Behavior:
- Future code can race normal completion against
ct.wait(). - On cancel path, async implementations typically cancel subordinate machinery and may force-abort after a grace timeout.
JS API ↔ Rust export mapping (task/cancel relevant)
| JS-facing API | Rust export | Scheduler | Cancellation hookup |
|---|---|---|---|
grep(options, onMatch?) |
grep |
task::blocking("grep", ct, ...) |
CancelToken::new(options.timeoutMs, options.signal) + heartbeat checks |
glob(options, onMatch?) |
glob |
task::blocking("glob", ct, ...) |
CancelToken::new(...) + heartbeat checks |
fuzzyFind(options) |
fuzzy_find |
task::blocking("fuzzy_find", ct, ...) |
CancelToken::new(...) + heartbeat checks |
astGrep(options) / astEdit(options) |
ast exports | blocking worker path | timeout/signal fields are accepted by options and checked cooperatively in worker loops |
Shell#run(options, onChunk?) |
Shell::run |
task::future(env, "shell.run", ...) |
JS CancelToken is converted into pi_shell::cancel::CancelToken; shell races it against command completion and descendant cleanup |
executeShell(options, onChunk?) |
execute_shell |
task::future(env, "shell.execute", ...) |
same cancel race and 2s graceful window |
PtySession#start(options, onChunk?) |
PtySession::start |
task::future(env, "pty.start", ...) + inner spawn_blocking |
CancelToken checked in sync PTY loop via heartbeat() |
htmlToMarkdown(html, options?) |
html_to_markdown |
task::blocking("html_to_markdown", (), ...) |
none (() token) |
encodeSixel(...) |
encode_sixel |
synchronous native function | none |
readImageFromClipboard() |
read_image_from_clipboard |
task::blocking("clipboard.read_image", (), ...) |
none (() token) |
text.rs, tokens.rs, keys.rs, most ps.rs functions, SIXEL encoding, and synchronous utility exports do not use task::blocking/task::future cancellation and therefore do not participate in this cancellation path.
Cancellation lifecycle and state transitions
CancelToken lifecycle
Created
├─ no signal + no timeout -> passive token
├─ signal registered -> AbortSignal callback can set AbortReason::Signal
└─ deadline set -> timeout check becomes active
Running
├─ heartbeat()/wait() sees signal -> AbortReason::Signal
├─ heartbeat()/wait() sees deadline -> AbortReason::Timeout
└─ no abort -> continue
Aborted
└─ flag stores first observed cause for waiters; heartbeat formats it as "Aborted: <reason>"
Before-start vs mid-execution cancellation
-
Before start / before first cancellation check:
task::futureusers that race onct.wait()can resolve cancellation once they enterselect!.task::blockingusers only observe cancellation when closure code reachesheartbeat().
-
Mid-execution:
blocking: nextheartbeat()returnsErr("Aborted: ...").future:ct.wait()branch winsselect!, then code cancels subordinate async machinery.- shell: cancellation triggers a Tokio cancellation token, sends descendant termination waves, waits up to 2 seconds for the command task, then aborts the task if needed.
- PTY: heartbeat failure or
kill()terminates PTY child/process targets and drains output briefly.
Heartbeat expectations for long-running loops
heartbeat() must run at predictable cadence in loops with unbounded or large work sets.
Observed patterns:
globfiltering checks entries during scan/filter work.fdscoring checks scanned candidates.grepchecks before/during expensive search and passes tokens into shared scan/cache helpers.run_pty_syncchecks every loop tick with a maximum 16ms wait cadence.
Practical rule: no loop over external-size input should exceed a short bounded interval without a heartbeat.
Failure behavior and error propagation to JS
Blocking tasks
Error path:
- Closure returns
Err(napi::Error)(includingheartbeat()abort). Task::compute()returnsErr.AsyncTaskrejects JS promise.
Typical error strings:
Aborted: TimeoutAborted: Signal- domain errors (
Failed to decode image: ...,Conversion error: ..., etc.)
Future tasks
Error path:
- Async body returns
Err(napi::Error)or join failure is mapped (... task failed: {err}). task::future-spawned promise rejects.- Shell and PTY command APIs model cancellation as structured results instead of rejection when the cancellation path wins:
exitCodeomitted,cancelledortimedOutset.
Cancellation reporting split
- Abort as error: blocking exports using
heartbeat()?. - Abort as typed result: shell/PTY command APIs that model cancellation in result structs.
Choose one model per API and document it explicitly.
Common pitfalls
-
Missing heartbeat in blocking loops
- Symptom: timeout/signal appears ignored until loop ends.
- Fix: add
ct.heartbeat()?at loop top and before expensive per-item steps.
-
Long uncancelable sections
- Symptom: cancellation latency spikes during single large call (decode, sort, compression, parser invocation, etc.).
- Fix: split work into chunks with heartbeat boundaries; if impossible, document latency.
-
Blocking async executor
- Symptom: async API stalls when sync-heavy code runs directly in future.
- Fix: move CPU/sync blocks to
task::blockingortokio::task::spawn_blocking.
-
Inconsistent cancel semantics
- Symptom: one API rejects on cancel, another resolves with flags, confusing callers.
- Fix: standardize per domain and keep docs aligned.
-
Forgetting cancellation bridge in nested async tasks
- Symptom: outer token is cancelled but inner readers/subprocess tasks keep running.
- Fix: bridge cancellation to inner token/signal and enforce grace timeout + forced abort fallback.
Checklist for new cancellable exports
-
Classify work correctly:
- CPU-bound or sync blocking ->
task::blocking. - async I/O /
awaitorchestration ->task::future.
- CPU-bound or sync blocking ->
-
Expose cancel inputs when needed:
- include
timeoutMsandsignalin#[napi(object)]options, - create
let ct = task::CancelToken::new(timeout_ms, signal);.
- include
-
Wire cancellation through all layers:
- blocking loops:
ct.heartbeat()?at stable intervals, - async orchestration: race with
ct.wait()and cancel sub-tasks/tokens.
- blocking loops:
-
Decide cancellation contract:
- reject promise with abort error, or
- resolve typed
{ cancelled, timedOut, ... }, - keep this contract consistent for the API family.
-
Propagate failures with context:
- map errors via
Error::from_reason(format!("...: {err}")), - include stage-specific prefixes (
spawn,decode,wait, etc.).
- map errors via
-
Handle before-start and mid-flight cancellation:
- cancellation check/await must happen before expensive body and during long execution.
-
Validate no executor misuse:
- no long sync work directly inside async futures without
spawn_blocking/blocking task wrapper.
- no long sync work directly inside async futures without