"""
Comprehensive Async Callback Framework
Main framework class for production-ready async callback management.
"""
import asyncio
import inspect
import json
import logging
import time
from collections import (
defaultdict,
deque,
)
from datetime import datetime
from typing import (
Any,
AsyncIterator,
Awaitable,
Callable,
Dict,
List,
Literal,
Optional,
Set,
)
import anyio
from openjiuwen.core.runner.callback.chain import CallbackChain
from openjiuwen.core.runner.callback.decorator import (
_TRANSFORM_NOOP,
create_emit_after_decorator,
create_emit_around_decorator,
create_emit_before_decorator,
create_on_decorator,
create_on_wrap_decorator,
create_transform_io_by_events_decorator,
create_transform_io_decorator,
create_wrap_by_event_decorator,
InputTransform,
OutputTransform,
WrapHandler,
)
from openjiuwen.core.runner.callback.enums import (
ChainAction,
FilterAction,
HookType,
)
from openjiuwen.core.runner.callback.errors import AbortError
from openjiuwen.core.runner.callback.filters import (
CircuitBreakerFilter,
EventFilter,
)
from openjiuwen.core.runner.callback.models import (
CallbackInfo,
CallbackMetrics,
ChainContext,
ChainResult,
FilterResult,
)
def _narrow_call(
func: Callable,
args: tuple,
kwargs: dict[str, Any],
) -> tuple[tuple, dict[str, Any]]:
"""Trim *args* and *kwargs* to only what *func* actually accepts.
Positional arguments are truncated to the number of positional-or-keyword /
positional-only parameters the function declares (unless it has *args).
Keyword arguments are filtered to declared parameter names (unless it has
**kwargs).
"""
try:
sig = inspect.signature(func)
except (ValueError, TypeError):
return args, kwargs
has_var_positional = False
has_var_keyword = False
positional_count = 0
for param in sig.parameters.values():
if param.kind == inspect.Parameter.VAR_POSITIONAL:
has_var_positional = True
elif param.kind == inspect.Parameter.VAR_KEYWORD:
has_var_keyword = True
elif param.kind in (
inspect.Parameter.POSITIONAL_ONLY,
inspect.Parameter.POSITIONAL_OR_KEYWORD,
):
positional_count += 1
narrowed_args = args if has_var_positional else args[:positional_count]
if has_var_keyword:
narrowed_kwargs = kwargs
else:
skip_kinds = (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD)
accepted: set[str] = set()
for name, p in sig.parameters.items():
if p.kind not in skip_kinds:
accepted.add(name)
narrowed_kwargs = {k: v for k, v in kwargs.items() if k in accepted}
return narrowed_args, narrowed_kwargs
def _inject_session_if_needed(callback, narrowed_args, narrowed_kwargs):
"""
Inject session parameter into narrowed_kwargs if the callback needs it
Args:
callback: Target function
narrowed_args: Positional arguments
narrowed_kwargs: Keyword arguments
"""
need_session = False
try:
sig = inspect.signature(callback)
if 'session' in sig.parameters:
need_session = True
elif any(p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()):
need_session = True
except (ValueError, TypeError):
need_session = False
if need_session:
session = narrowed_kwargs.get("session", None)
if not session:
from openjiuwen.core.session import get_current_session
narrowed_kwargs['session'] = get_current_session()
class AsyncCallbackFramework:
"""Production-ready async callback framework.
A comprehensive event-driven framework with support for:
- Priority-based callback execution
- Filtering and validation
- Callback chains with rollback
- Performance metrics
- Lifecycle hooks
- Circuit breakers
- Rate limiting
All operations are async-only and designed for asyncio environments.
Attributes:
enable_metrics: Whether to collect performance metrics
enable_logging: Whether to enable logging
logger: Logger instance
"""
def __init__(
self,
enable_metrics: bool = True,
enable_logging: bool = True
):
"""Initialize the callback framework.
Args:
enable_metrics: Enable performance metric collection
enable_logging: Enable logging output
"""
self._callbacks: Dict[str, List[CallbackInfo]] = defaultdict(list)
self._chains: Dict[str, CallbackChain] = {}
self._filters: Dict[str, List[EventFilter]] = defaultdict(list)
self._global_filters: List[EventFilter] = []
self._callback_filters: Dict[Callable, List[EventFilter]] = {}
self._hooks: Dict[str, Dict[HookType, List[Callable]]] = defaultdict(
lambda: defaultdict(list)
)
self.enable_metrics = enable_metrics
self._metrics: Dict[str, CallbackMetrics] = defaultdict(CallbackMetrics)
self.enable_logging = enable_logging
self.logger = logging.getLogger(__name__)
self._circuit_breakers: Dict[str, CircuitBreakerFilter] = {}
self._event_history: deque = deque(maxlen=1000)
self._enable_history = False
self._locks: Dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
@property
def callbacks(self):
return self._callbacks
@property
def chains(self):
return self._chains
@property
def circuit_breakers(self):
return self._circuit_breakers
@property
def callback_filters(self):
return self._callback_filters
def on(
self,
event: str,
*,
priority: int = 0,
once: bool = False,
namespace: str = "default",
tags: Optional[Set[str]] = None,
filters: Optional[List[EventFilter]] = None,
max_retries: int = 0,
retry_delay: float = 0.0,
timeout: Optional[float] = None,
callback_type: str = "",
):
"""Decorator to register an async callback.
Example:
>>> framework = AsyncCallbackFramework()
>>> @framework.on("user_login", priority=10)
>>> async def handle_login(username: str):
>>> print(f"User logged in: {username}")
Args:
event: Event name to listen for
priority: Execution priority (higher first)
once: Execute only once then disable
namespace: Namespace for grouping
tags: Set of tags for filtering
filters: List of filters to apply
rollback_handler: Function to call on rollback
error_handler: Function to call on error
max_retries: Maximum retry attempts
retry_delay: Delay between retries in seconds
timeout: Execution timeout in seconds
callback_type: Semantic type marker, e.g. "transform"
Returns:
Decorator function
"""
return create_on_decorator(
self,
event,
priority=priority,
once=once,
namespace=namespace,
tags=tags,
filters=filters,
rollback_handler=None,
error_handler=None,
max_retries=max_retries,
retry_delay=retry_delay,
timeout=timeout,
callback_type=callback_type,
)
def on_chain(
self,
event: str,
*,
priority: int = 0,
once: bool = False,
namespace: str = "default",
tags: Optional[Set[str]] = None,
rollback_handler: Optional[Callable] = None,
error_handler: Optional[Callable] = None,
max_retries: int = 0,
retry_delay: float = 0.0,
timeout: Optional[float] = None,
callback_type: str = "",
):
"""Decorator to register an async callback specifically for callback chains.
This method is similar to `on()` but is designed specifically for use with
callback chains. All chain-related parameters should be provided here instead
of in the regular `on()` method.
Example:
>>> framework = AsyncCallbackFramework()
>>> @framework.on_chain("order_process", rollback_handler=rollback_order)
>>> async def process_payment(order_id: str):
>>> print(f"Processing payment for order: {order_id}")
Args:
event: Event name to listen for
priority: Execution priority (higher first)
once: Execute only once then disable
namespace: Namespace for grouping
tags: Set of tags for filtering
rollback_handler: Function to call on rollback
error_handler: Function to call on error
max_retries: Maximum retry attempts
retry_delay: Delay between retries in seconds
timeout: Execution timeout in seconds
callback_type: Semantic type marker, e.g. "transform"
Returns:
Decorator function
"""
return create_on_decorator(
self,
event,
priority=priority,
once=once,
namespace=namespace,
tags=tags,
filters=None,
rollback_handler=rollback_handler,
error_handler=error_handler,
max_retries=max_retries,
retry_delay=retry_delay,
timeout=timeout,
callback_type=callback_type,
)
def register_sync(
self,
event: str,
callback: Callable[..., Awaitable[Any]],
*,
priority: int = 0,
once: bool = False,
namespace: str = "default",
tags: Optional[Set[str]] = None,
filters: Optional[List[EventFilter]] = None,
rollback_handler: Optional[Callable] = None,
error_handler: Optional[Callable] = None,
max_retries: int = 0,
retry_delay: float = 0.0,
timeout: Optional[float] = None,
callback_type: str = "",
) -> 'CallbackInfo':
"""Synchronous registration method for decorator use.
This is an internal method used by the decorator to register callbacks
synchronously during module loading.
Returns:
The registered CallbackInfo instance
"""
callback_info = CallbackInfo(
callback=callback,
priority=priority,
once=once,
namespace=namespace,
tags=tags or set(),
max_retries=max_retries,
retry_delay=retry_delay,
timeout=timeout,
callback_type=callback_type,
)
self._callbacks[event].append(callback_info)
self._callbacks[event].sort(key=lambda x: x.priority, reverse=True)
if filters:
self._callback_filters[callback] = filters
if rollback_handler or error_handler:
if event not in self._chains:
self._chains[event] = CallbackChain(event)
self._chains[event].add(
callback_info,
rollback_handler,
error_handler
)
if self.enable_logging:
self.logger.info(f"Registered callback: {event} -> {callback.__name__}")
return callback_info
def emit_before(
self,
event: str,
*,
pass_args: bool = True,
extra_kwargs: Optional[dict[str, Any]] = None,
):
"""Decorator to trigger event BEFORE the decorated function is called.
Supports async functions, async generators, sync functions (promoted
to async), and sync generators (promoted to async generators).
Example (async):
>>> @framework.emit_before("processing_started")
>>> async def process_data(data):
>>> return {"processed": data}
Example (sync — promoted to async):
>>> @framework.emit_before("processing_started")
>>> def process_data(data):
>>> return {"processed": data}
>>> # Must be awaited: result = await process_data(data)
Args:
event: Event name to trigger
pass_args: Whether to pass function arguments to callbacks
extra_kwargs: Additional keyword arguments merged into every trigger call
Returns:
Decorator function
"""
return create_emit_before_decorator(self, event, pass_args=pass_args,
extra_kwargs=extra_kwargs)
def emit_after(
self,
event: str,
*,
result_key: str = "result",
item_key: str = "item",
pass_args: bool = False,
stream_mode: Literal["per_item", "once"] = "per_item",
extra_kwargs: Optional[dict[str, Any]] = None,
):
"""Decorator to trigger event AFTER the decorated function completes.
For regular functions: triggers event with the result.
For generators: triggers event for each yielded item (per_item mode)
or once with all collected items (once mode).
Supports async functions, async generators, sync functions (promoted
to async), and sync generators (promoted to async generators).
Example (regular function):
>>> @framework.emit_after("data_processed")
>>> async def process_data(data):
>>> return {"processed": data}
>>> # Callbacks receive: result={"processed": data}
Example (stream per_item):
>>> @framework.on("chunk_ready")
>>> async def save_chunk(item: dict):
>>> print(f"Saving: {item}")
>>>
>>> @framework.emit_after("chunk_ready")
>>> async def process_file(filepath: str):
>>> with open(filepath, 'r') as f:
>>> for line in f:
>>> await asyncio.sleep(0.01)
>>> yield {"line": line.strip()}
>>>
>>> async for chunk in process_file("data.txt"):
>>> pass # chunk_ready event triggered for each chunk
Example (stream once):
>>> @framework.emit_after("all_chunks_done", stream_mode="once")
>>> async def process_file(filepath: str):
>>> for line in open(filepath):
>>> yield {"line": line.strip()}
>>> # Callbacks receive: result=[{"line": ...}, {"line": ...}, ...]
Args:
event: Event name to trigger
result_key: Keyword argument name for the result (regular functions)
item_key: Keyword argument name for each yielded item (generators)
pass_args: Whether to include original args in event
stream_mode: For generators - "per_item" triggers per item, "once"
triggers after all items are yielded with collected results
extra_kwargs: Additional keyword arguments merged into every trigger call
Returns:
Decorator function
"""
return create_emit_after_decorator(
self, event,
result_key=result_key,
item_key=item_key,
pass_args=pass_args,
stream_mode=stream_mode,
extra_kwargs=extra_kwargs,
)
def emit_around(
self,
before_event: str,
after_event: str,
pass_args: bool = True,
pass_result: bool = True,
on_error_event: Optional[str] = None
):
"""Decorator to trigger events before and after function execution.
Triggers before_event before execution and after_event after completion.
Optionally triggers on_error_event if an exception occurs.
Supports async functions, async generators, sync functions (promoted
to async), and sync generators (promoted to async generators).
Example (async):
>>> @framework.emit_around("process_start", "process_end",
>>> on_error_event="process_error")
>>> async def process_data(data):
>>> return {"processed": data}
Example (sync — promoted to async):
>>> @framework.emit_around("start", "end")
>>> def compute(x):
>>> return x * 2
>>> # Must be awaited: result = await compute(5)
Args:
before_event: Event to trigger before execution
after_event: Event to trigger after successful execution
pass_args: Whether to pass function arguments to events
pass_result: Whether to pass result to after_event
on_error_event: Optional event to trigger on error
Returns:
Decorator function
"""
return create_emit_around_decorator(
self,
before_event,
after_event,
pass_args=pass_args,
pass_result=pass_result,
on_error_event=on_error_event,
)
def transform_io(
self,
*,
input_event: Optional[str] = None,
output_event: Optional[str] = None,
result_key: str = "result",
input_transform: Optional[InputTransform] = None,
output_transform: Optional[OutputTransform] = None,
output_mode: Literal["frame", "generator"] = "frame",
):
"""Decorator to transform function inputs and outputs via callbacks.
Supports two modes: event-based or direct callable. When
input_event/output_event are set, the framework triggers those
events and uses the last callback return value as the transformed
input or output. Otherwise uses input_transform/output_transform
callables directly.
``output_mode`` controls how the output transform is applied to
generator functions:
* ``'frame'`` (default): output transform is applied once per yielded
item. For event mode, output_event fires per item.
* ``'generator'``: output transform receives the entire source async
iterator. For direct callable mode, output_transform must be an
async generator function. For event mode, output_event fires once
with the source; the callback should return an async iterable.
Sync generator functions are automatically promoted to async.
Event-based example:
>>> @framework.on("transform_input")
... async def normalize(*args, **kwargs):
... return (args, {**kwargs, "limit": kwargs.get("limit", 10)})
>>> @framework.on("transform_output")
... async def serialize(result):
... return json.dumps(result) if isinstance(result, dict) else result
>>> @framework.transform_io(
... input_event="transform_input",
... output_event="transform_output",
... )
... async def fetch_data(limit: int):
... return {"count": limit}
Direct callable example:
>>> @framework.transform_io(
... input_transform=normalize_input,
... output_transform=serialize_output,
... )
... async def fetch_data(limit: int):
... return {"count": limit}
Args:
input_event: Optional event name for input transform. When
triggered, callbacks receive (*args, **kwargs) and must
return (new_args, new_kwargs). Last result is used.
output_event: Optional event name for output transform. When
triggered, callbacks receive result_key=<value> and must
return the transformed value. Last result is used.
result_key: Keyword for output_event payload (default: "result").
input_transform: Optional direct callback (args, kwargs) ->
(new_args, new_kwargs). Used when input_event is not set.
output_transform: Optional direct callback value -> new_value.
Used when output_event is not set.
output_mode: ``'frame'`` (default) or ``'generator'``. Controls
how output transform is applied to generator functions.
Raises ``ValueError`` if used with non-generator functions.
Returns:
Decorator that applies input/output transformation via
events or callbacks.
"""
if input_event is not None or output_event is not None:
return create_transform_io_by_events_decorator(
self,
input_event=input_event,
output_event=output_event,
result_key=result_key,
output_mode=output_mode,
)
return create_transform_io_decorator(
input_transform=input_transform,
output_transform=output_transform,
output_mode=output_mode,
)
def on_wrap(
self,
event: str,
*,
priority: int = 0,
) -> Callable[[WrapHandler], WrapHandler]:
"""Decorator that registers a WrapHandler for *event*.
WrapHandlers form an explicit call chain around the function decorated
with :meth:`wrap`. Each handler receives ``call_next`` as its first
argument and decides when (and whether) to invoke the rest of the chain.
Handlers are stored in the framework's standard ``_callbacks`` registry
under ``"__wrap__:{event}"``, so they participate in the same priority
sorting and can be removed via :meth:`unregister` /
:meth:`unregister_event`.
For regular (non-generator) functions::
@framework.on_wrap("fetch")
async def auth_handler(call_next, *args, **kwargs):
if not kwargs.get("token"):
raise PermissionError("missing token")
return await call_next(*args, **kwargs)
For async/sync generator functions the handler must itself be an
async generator that iterates ``call_next``::
@framework.on_wrap("stream_data")
async def log_handler(call_next, *args, **kwargs):
async for item in call_next(*args, **kwargs):
print("item:", item)
yield item
Args:
event: Logical event name shared with the matching :meth:`wrap`
decorator.
priority: Higher-priority handlers are outermost (called first)
in the chain.
Returns:
Decorator that registers the function and returns it unchanged.
"""
return create_on_wrap_decorator(self, event, priority=priority)
def wrap(
self,
event: str,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Decorator that wraps a function with event-registered WrapHandlers.
At each call, the framework retrieves all enabled WrapHandlers
registered for *event* via :meth:`on_wrap` and executes them as a
chain around the decorated function. Because the handler list is
resolved at call time, handlers registered after decoration are
automatically included.
Supports async functions, sync functions (promoted to async), async
generators, and sync generators (promoted to async generators).
Example::
@framework.on_wrap("fetch", priority=10)
async def log_handler(call_next, *args, **kwargs):
print("before fetch")
result = await call_next(*args, **kwargs)
print("after fetch")
return result
@framework.wrap("fetch")
async def fetch_data(url: str) -> dict:
...
For a static chain (handlers fixed at decoration time, no framework
involvement) use :func:`create_wrap_decorator` directly.
Args:
event: Logical event name shared with the matching :meth:`on_wrap`
decorators.
Returns:
Decorator that wraps the function with the dynamic handler chain.
"""
return create_wrap_by_event_decorator(self, event)
def on_transform(
self,
event: str,
*,
priority: int = 0,
):
"""Convenience decorator that registers a transform-type callback.
Equivalent to ``on(event, callback_type="transform", priority=priority)``.
Transform callbacks are exclusively invoked by :meth:`trigger_transform`
and are used by :meth:`transform_io` to modify inputs and outputs.
Example::
@framework.on_transform(ToolCallEvents.TOOL_INVOKE_INPUT)
async def preprocess(inputs, **kwargs):
return ((modified_inputs,), kwargs)
Args:
event: Event name to listen for.
priority: Execution priority (higher first).
Returns:
Decorator function.
"""
return self.on(event, callback_type="transform", priority=priority)
async def trigger_transform(self, event: str, *args, **kwargs) -> Any:
"""Trigger only transform-type callbacks and return the last result.
Unlike :meth:`trigger`, this method runs only callbacks registered with
``callback_type="transform"``. When no such callbacks exist, returns the
sentinel :data:`_TRANSFORM_NOOP` so callers can detect a no-op and pass
through the original value unchanged.
Args:
event: Event name to trigger.
*args: Positional arguments forwarded to callbacks.
**kwargs: Keyword arguments forwarded to callbacks.
Returns:
The return value of the last transform callback, or
``_TRANSFORM_NOOP`` when no transform callbacks are registered.
"""
callbacks = [
info for info in self._callbacks.get(event, [])
if info.enabled and info.callback_type == "transform"
]
if not callbacks:
return _TRANSFORM_NOOP
result: Any = _TRANSFORM_NOOP
for info in callbacks:
narrowed_args, narrowed_kwargs = _narrow_call(info.callback, args, kwargs)
_inject_session_if_needed(info.callback, narrowed_args, narrowed_kwargs)
result = await info.callback(*narrowed_args, **narrowed_kwargs)
return result
async def register(
self,
event: str,
callback: Callable[..., Awaitable[Any]],
*,
priority: int = 0,
once: bool = False,
namespace: str = "default",
tags: Optional[Set[str]] = None,
filters: Optional[List[EventFilter]] = None,
rollback_handler: Optional[Callable] = None,
error_handler: Optional[Callable] = None,
max_retries: int = 0,
retry_delay: float = 0.0,
timeout: Optional[float] = None,
callback_type: str = "",
) -> CallbackInfo:
"""Register a callback for an event.
Args:
event: Event name to listen for
callback: Async callback function
priority: Execution priority (higher first)
once: Execute only once then disable
namespace: Namespace for grouping
tags: Set of tags for filtering
filters: List of filters to apply
rollback_handler: Function to call on rollback
error_handler: Function to call on error
max_retries: Maximum retry attempts
retry_delay: Delay between retries in seconds
timeout: Execution timeout in seconds
callback_type:
"""
async with self._locks["registration"]:
return self.register_sync(
event,
callback,
priority=priority,
once=once,
namespace=namespace,
tags=tags,
filters=filters,
rollback_handler=rollback_handler,
error_handler=error_handler,
max_retries=max_retries,
retry_delay=retry_delay,
timeout=timeout,
callback_type=callback_type,
)
def unregister_sync(self, event: str, callback: Callable) -> None:
"""Synchronous unregistration method for internal use.
Supports unregistering by either the original callback function or
the wrapper function returned by the @framework.on decorator.
Args:
event: Event name
callback: Callback to remove (can be original function or decorator wrapper)
"""
if event not in self._callbacks:
return
callback_to_remove = None
for callback_info in self._callbacks[event]:
if callback_info.callback == callback:
callback_to_remove = callback_info.callback
break
elif callback_info.wrapper == callback:
callback_to_remove = callback_info.callback
break
elif hasattr(callback, '__wrapped__'):
wrapped_func = getattr(callback, '__wrapped__', None)
if wrapped_func is not None and callback_info.callback == wrapped_func:
callback_to_remove = callback_info.callback
break
if callback_to_remove is not None:
self._callbacks[event] = [
ci for ci in self._callbacks[event] if ci.callback != callback_to_remove
]
self._callback_filters.pop(callback_to_remove, None)
if event in self._chains:
self._chains[event].remove(callback_to_remove)
if self.enable_logging:
callback_name = getattr(callback_to_remove, '__name__', 'unknown')
self.logger.info(f"Unregistered callback: {event} -> {callback_name}")
async def unregister(self, event: str, callback: Callable) -> None:
"""Unregister a callback from an event.
Supports unregistering by either the original callback function or
the wrapper function returned by the @framework.on decorator.
Args:
event: Event name
callback: Callback to remove (can be original function or decorator wrapper)
"""
async with self._locks["registration"]:
self.unregister_sync(event, callback)
async def unregister_namespace(self, namespace: str) -> None:
"""Unregister all callbacks in a namespace.
Args:
namespace: Namespace to clear
"""
async with self._locks["registration"]:
for event in list(self._callbacks.keys()):
self._callbacks[event] = [
ci for ci in self._callbacks[event]
if ci.namespace != namespace
]
async def unregister_by_tags(self, tags: Set[str]) -> None:
"""Unregister callbacks matching any of the given tags.
Args:
tags: Set of tags to match
"""
async with self._locks["registration"]:
for event in list(self._callbacks.keys()):
self._callbacks[event] = [
ci for ci in self._callbacks[event]
if not ci.tags.intersection(tags)
]
async def unregister_event(self, event: str) -> None:
"""Unregister all callbacks for a specific event.
Removes all callbacks, filters, chains, hooks, and circuit breakers
associated with the event.
Args:
event: Event name to clear
"""
async with self._locks["registration"]:
if event in self._callbacks:
for callback_info in self._callbacks[event]:
callback = callback_info.callback
self._callback_filters.pop(callback, None)
cb_key = f"{event}:{callback.__name__}"
self._circuit_breakers.pop(cb_key, None)
del self._callbacks[event]
if event in self._chains:
del self._chains[event]
if event in self._hooks:
del self._hooks[event]
if event in self._filters:
del self._filters[event]
if self.enable_logging:
self.logger.info(f"Unregistered all callbacks for event: {event}")
async def trigger(self, event: str, *args, **kwargs) -> List[Any]:
"""Trigger an event and execute all registered callbacks.
Executes callbacks in priority order, applying filters and collecting
metrics. Handles errors gracefully and executes lifecycle hooks.
Example:
>>> await framework.trigger("user_login", username="alice")
Args:
event: Event name to trigger
*args: Positional arguments for callbacks
**kwargs: Keyword arguments for callbacks
Returns:
List of results from all executed callbacks
"""
results = []
if self._enable_history:
self._event_history.append({
"event": event,
"args": args,
"kwargs": kwargs,
"timestamp": time.time()
})
await self._execute_hooks(event, HookType.BEFORE, *args, **kwargs)
callbacks = self._callbacks.get(event, [])
for callback_info in callbacks:
if not callback_info.enabled:
continue
if callback_info.callback_type == "transform":
continue
callback = callback_info.callback
try:
filter_result = await self._apply_filters(
event, callback, args, kwargs
)
if filter_result.action == FilterAction.STOP:
if self.enable_logging:
self.logger.info(f"Filter stopped event processing: {event}")
break
elif filter_result.action == FilterAction.SKIP:
if self.enable_logging:
self.logger.debug(
f"Filter skipped callback {callback.__name__}: "
f"{filter_result.reason}"
)
continue
final_args = filter_result.modified_args or args
final_kwargs = filter_result.modified_kwargs or kwargs
start_time = time.time()
narrowed_args, narrowed_kwargs = _narrow_call(callback, final_args, final_kwargs)
_inject_session_if_needed(callback, narrowed_args, narrowed_kwargs)
callback_result = callback(*narrowed_args, **narrowed_kwargs)
if inspect.isasyncgen(callback_result):
result = callback_result
else:
result = await callback_result
execution_time = time.time() - start_time
if self.enable_metrics:
key = f"{event}:{callback.__name__}"
self._metrics[key].update(execution_time, is_error=False)
cb_key = f"{event}:{callback.__name__}"
if cb_key in self._circuit_breakers:
await self._circuit_breakers[cb_key].record_success(
event, callback
)
results.append(result)
if callback_info.once:
callback_info.enabled = False
except AbortError as e:
execution_time = time.time() - start_time
if self.enable_metrics:
key = f"{event}:{callback.__name__}"
self._metrics[key].update(execution_time, is_error=True)
cb_key = f"{event}:{callback.__name__}"
if cb_key in self._circuit_breakers:
await self._circuit_breakers[cb_key].record_failure(event, callback)
await self._execute_hooks(event, HookType.ERROR, e.cause or e, *args, **kwargs)
if self.enable_logging:
if e.cause:
self.logger.error(
f"Callback execution aborted: {callback.__name__} - {e.reason} "
f"(caused by {type(e.cause).__name__}: {e.cause})"
)
else:
self.logger.error(
f"Callback execution aborted: {callback.__name__} - {e.reason}"
)
if e.cause is not None:
raise e.cause
raise
except Exception as e:
execution_time = time.time() - start_time
if self.enable_metrics:
key = f"{event}:{callback.__name__}"
self._metrics[key].update(execution_time, is_error=True)
cb_key = f"{event}:{callback.__name__}"
if cb_key in self._circuit_breakers:
await self._circuit_breakers[cb_key].record_failure(
event, callback
)
await self._execute_hooks(event, HookType.ERROR, e, *args, **kwargs)
if self.enable_logging:
self.logger.error(
f"Callback execution failed: {callback.__name__} - {e}",
exc_info=True
)
await self._execute_hooks(event, HookType.AFTER, results, *args, **kwargs)
return results
async def trigger_delayed(
self,
event: str,
delay: float,
*args,
**kwargs
) -> List[Any]:
"""Trigger an event after a delay.
Args:
event: Event name
delay: Delay in seconds
*args: Positional arguments
**kwargs: Keyword arguments
Returns:
List of callback results
"""
await asyncio.sleep(delay)
return await self.trigger(event, *args, **kwargs)
async def trigger_chain(self, event: str, *args, **kwargs) -> ChainResult:
"""Trigger callbacks as a chain with data flow.
Executes callbacks sequentially, passing each result to the next.
Supports rollback on failure.
Example:
>>> result = await framework.trigger_chain(
>>> "order_process",
>>> order_id="123"
>>> )
>>> if result.action == ChainAction.ROLLBACK:
>>> print("Order processing failed and was rolled back")
Args:
event: Event name
*args: Initial positional arguments
**kwargs: Initial keyword arguments
Returns:
ChainResult with execution outcome
"""
if event not in self._chains:
chain = CallbackChain(event)
for callback_info in self._callbacks.get(event, []):
chain.add(callback_info)
else:
chain = self._chains[event]
context = ChainContext(
event=event,
initial_args=args,
initial_kwargs=kwargs
)
return await chain.execute(context)
async def trigger_parallel(self, event: str, *args, **kwargs) -> List[Any]:
"""Trigger callbacks in parallel (concurrent execution).
All callbacks execute concurrently using asyncio.gather.
Significantly faster than sequential trigger when callbacks
involve I/O operations.
Example:
>>> # All three callbacks run concurrently
>>> results = await framework.trigger_parallel(
>>> "fetch_data",
>>> url="https://api.example.com"
>>> )
Args:
event: Event name to trigger
*args: Positional arguments for callbacks
**kwargs: Keyword arguments for callbacks
Returns:
List of results from all callbacks (excluding failed ones)
"""
callbacks = self._callbacks.get(event, [])
if not callbacks:
return []
tasks = []
for callback_info in callbacks:
if not callback_info.enabled:
continue
callback = callback_info.callback
async def execute_callback(cb_info=callback_info, cb=callback):
try:
filter_result = await self._apply_filters(
event, cb, args, kwargs
)
if filter_result.action == FilterAction.STOP:
if self.enable_logging:
self.logger.info(f"Filter stopped processing for {event}")
return None
elif filter_result.action == FilterAction.SKIP:
if self.enable_logging:
self.logger.debug(
f"Filter skipped {cb.__name__}: "
f"{filter_result.reason}"
)
return None
final_args = filter_result.modified_args or args
final_kwargs = filter_result.modified_kwargs or kwargs
_inject_session_if_needed(callback, final_args, final_kwargs)
if cb_info.timeout:
with anyio.fail_after(cb_info.timeout):
result = await cb(*final_args, **final_kwargs)
else:
result = await cb(*final_args, **final_kwargs)
if cb_info.once:
cb_info.enabled = False
return result
except Exception as e:
if self.enable_logging:
self.logger.error(
f"Callback {cb.__name__} failed in parallel execution: {e}"
)
return None
tasks.append(execute_callback())
results = await asyncio.gather(*tasks, return_exceptions=True)
clean_results = []
for result in results:
if result is not None and not isinstance(result, Exception):
clean_results.append(result)
elif isinstance(result, Exception):
if self.enable_logging:
self.logger.error(f"Parallel execution exception: {result}")
return clean_results
async def trigger_until(
self,
event: str,
condition: Callable[[Any], bool],
*args,
**kwargs
) -> Optional[Any]:
"""Trigger callbacks until a condition is satisfied.
Executes callbacks in priority order, stopping when one returns
a result that satisfies the condition function.
Example:
>>> # Stop at first result > 10
>>> result = await framework.trigger_until(
>>> "compute",
>>> lambda x: x > 10,
>>> value=5
>>> )
Args:
event: Event name to trigger
condition: Function that takes result and returns bool
*args: Positional arguments for callbacks
**kwargs: Keyword arguments for callbacks
Returns:
First result that satisfies condition, or None
"""
callbacks = self._callbacks.get(event, [])
if not callbacks:
return None
for callback_info in callbacks:
if not callback_info.enabled:
continue
callback = callback_info.callback
try:
filter_result = await self._apply_filters(
event, callback, args, kwargs
)
if filter_result.action == FilterAction.STOP:
break
elif filter_result.action == FilterAction.SKIP:
continue
final_args = filter_result.modified_args or args
final_kwargs = filter_result.modified_kwargs or kwargs
narrowed_args, narrowed_kwargs = _narrow_call(callback, final_args, final_kwargs)
_inject_session_if_needed(callback, narrowed_args, narrowed_kwargs)
result = await callback(*narrowed_args, **narrowed_kwargs)
if condition(result):
if self.enable_logging:
self.logger.info(
f"Condition satisfied by {callback.__name__}: {result}"
)
if callback_info.once:
callback_info.enabled = False
return result
if callback_info.once:
callback_info.enabled = False
except Exception as e:
if self.enable_logging:
self.logger.error(
f"Callback {callback.__name__} failed in trigger_until: {e}"
)
return None
async def trigger_with_timeout(
self,
event: str,
timeout: float,
*args,
**kwargs
) -> List[Any]:
"""Trigger event with timeout control.
Executes all callbacks but will abort if total execution
time exceeds the timeout.
Example:
>>> results = await framework.trigger_with_timeout(
>>> "slow_task",
>>> timeout=5.0, # 5 seconds max
>>> data="process_me"
>>> )
Args:
event: Event name to trigger
timeout: Maximum execution time in seconds
*args: Positional arguments for callbacks
**kwargs: Keyword arguments for callbacks
Returns:
List of callback results (may be incomplete if timeout)
"""
try:
with anyio.fail_after(timeout):
return await self.trigger(event, *args, **kwargs)
except TimeoutError:
if self.enable_logging:
self.logger.warning(
f"Event '{event}' execution timeout after {timeout}s"
)
return []
async def trigger_stream(
self,
event: str,
input_stream: AsyncIterator[Any],
*args,
**kwargs
) -> AsyncIterator[Any]:
"""Trigger event for each item in an async input stream.
Processes each item from the input stream through registered callbacks,
yielding results as they are produced. Supports callbacks that return
regular values or async generators.
Example:
>>> async def data_source():
>>> for i in range(5):
>>> await asyncio.sleep(0.1)
>>> yield {"value": i}
>>>
>>> async for result in framework.trigger_stream(
>>> "process_item",
>>> data_source()
>>> ):
>>> print(result)
Args:
event: Event name to trigger
input_stream: Async iterator providing input items
*args: Additional positional arguments for callbacks
**kwargs: Additional keyword arguments for callbacks
Yields:
Results from callback executions for each stream item
"""
try:
async for item in input_stream:
results = await self.trigger(event, item, *args, **kwargs)
for result in results:
if inspect.isasyncgen(result):
async for sub_item in result:
yield sub_item
else:
yield result
except Exception as e:
if self.enable_logging:
self.logger.error(f"Stream processing error for event '{event}': {e}")
raise
async def trigger_generator(
self,
event: str,
*args,
**kwargs
) -> AsyncIterator[Any]:
"""Trigger event and aggregate async generator outputs from callbacks.
Executes all callbacks for an event and yields items from any callbacks
that return async generators. Regular return values are also yielded.
Example:
>>> @framework.on("data_stream")
>>> async def streaming_callback():
>>> for i in range(10):
>>> await asyncio.sleep(0.05)
>>> yield {"data": i}
>>>
>>> async for item in framework.trigger_generator("data_stream"):
>>> print(item)
Args:
event: Event name to trigger
*args: Positional arguments for callbacks
**kwargs: Keyword arguments for callbacks
Yields:
All items from callbacks, including async generator outputs
"""
await self._execute_hooks(event, HookType.BEFORE, *args, **kwargs)
callbacks = self._callbacks.get(event, [])
try:
for callback_info in callbacks:
if not callback_info.enabled:
continue
callback = callback_info.callback
try:
filter_result = await self._apply_filters(
event, callback, args, kwargs
)
if filter_result.action == FilterAction.STOP:
if self.enable_logging:
self.logger.info(f"Filter stopped event processing: {event}")
break
elif filter_result.action == FilterAction.SKIP:
if self.enable_logging:
self.logger.debug(
f"Filter skipped callback {callback.__name__}: "
f"{filter_result.reason}"
)
continue
final_args = filter_result.modified_args or args
final_kwargs = filter_result.modified_kwargs or kwargs
start_time = time.time()
narrowed_args, narrowed_kwargs = _narrow_call(callback, final_args, final_kwargs)
_inject_session_if_needed(callback, narrowed_args, narrowed_kwargs)
result = callback(*narrowed_args, **narrowed_kwargs)
if inspect.isasyncgen(result):
async for item in result:
yield item
elif inspect.iscoroutine(result):
awaited_result = await result
if inspect.isasyncgen(awaited_result):
async for item in awaited_result:
yield item
else:
yield awaited_result
else:
yield result
execution_time = time.time() - start_time
if self.enable_metrics:
key = f"{event}:{callback.__name__}"
self._metrics[key].update(execution_time, is_error=False)
if callback_info.once:
callback_info.enabled = False
except Exception as e:
execution_time = time.time() - start_time
if self.enable_metrics:
key = f"{event}:{callback.__name__}"
self._metrics[key].update(execution_time, is_error=True)
if self.enable_logging:
self.logger.error(
f"Callback {callback.__name__} failed in generator mode: {e}"
)
await self._execute_hooks(event, HookType.ERROR, e, *args, **kwargs)
await self._execute_hooks(event, HookType.AFTER, *args, **kwargs)
finally:
await self._execute_hooks(event, HookType.CLEANUP, *args, **kwargs)
def add_filter(self, event: str, filter_obj: EventFilter) -> None:
"""Add a filter to a specific event.
Args:
event: Event name
filter_obj: Filter instance
"""
self._filters[event].append(filter_obj)
def add_global_filter(self, filter_obj: EventFilter) -> None:
"""Add a filter that applies to all events.
Args:
filter_obj: Filter instance
"""
self._global_filters.append(filter_obj)
def add_circuit_breaker(
self,
event: str,
callback: Callable,
failure_threshold: int = 5,
timeout: float = 60.0
) -> None:
"""Add circuit breaker protection to a callback.
Args:
event: Event name
callback: Callback to protect
failure_threshold: Failures before opening circuit
timeout: Seconds before reset attempt
"""
key = f"{event}:{callback.__name__}"
breaker = CircuitBreakerFilter(failure_threshold, timeout)
self._circuit_breakers[key] = breaker
self.add_filter(event, breaker)
async def _apply_filters(
self,
event: str,
callback: Callable,
args: tuple,
kwargs: dict
) -> FilterResult:
"""Apply all relevant filters to a callback.
Filters are applied in order: global -> event -> callback-specific.
Args:
event: Event name
callback: Callback being executed
args: Positional arguments
kwargs: Keyword arguments
Returns:
FilterResult with final action and possibly modified arguments
"""
current_args = args
current_kwargs = kwargs
all_filters = []
all_filters.extend(self._global_filters)
all_filters.extend(self._filters.get(event, []))
all_filters.extend(self._callback_filters.get(callback, []))
for filter_obj in all_filters:
result = await filter_obj.filter(
event, callback, *current_args, **current_kwargs
)
if result.action in [FilterAction.STOP, FilterAction.SKIP]:
return result
elif result.action == FilterAction.MODIFY:
if result.modified_args is not None:
current_args = result.modified_args
if result.modified_kwargs is not None:
current_kwargs = result.modified_kwargs
return FilterResult(
FilterAction.CONTINUE,
modified_args=current_args,
modified_kwargs=current_kwargs
)
def add_hook(
self,
event: str,
hook_type: HookType,
hook: Callable
) -> None:
"""Add a lifecycle hook to an event.
Args:
event: Event name
hook_type: Type of hook (BEFORE, AFTER, ERROR, CLEANUP)
hook: Async function to execute
"""
self._hooks[event][hook_type].append(hook)
async def _execute_hooks(
self,
event: str,
hook_type: HookType,
*args,
**kwargs
) -> None:
"""Execute all hooks of a specific type for an event.
Args:
event: Event name
hook_type: Type of hooks to execute
*args: Arguments to pass to hooks
**kwargs: Keyword arguments to pass to hooks
"""
for hook in self._hooks[event][hook_type]:
try:
if asyncio.iscoroutinefunction(hook):
await hook(*args, **kwargs)
else:
hook(*args, **kwargs)
except Exception as e:
self.logger.error(f"Hook execution failed: {hook.__name__} - {e}")
def get_metrics(
self,
event: Optional[str] = None,
callback: Optional[str] = None
) -> Dict[str, Dict[str, Any]]:
"""Get performance metrics for callbacks.
Args:
event: Optional event name filter
callback: Optional callback name filter
Returns:
Dictionary mapping callback keys to metric dictionaries
"""
result = {}
for key, metrics in self._metrics.items():
event_name, callback_name = key.split(":", 1)
if event and event != event_name:
continue
if callback and callback != callback_name:
continue
result[key] = metrics.to_dict()
return result
def reset_metrics(self) -> None:
"""Reset all performance metrics."""
self._metrics.clear()
def get_slow_callbacks(self, threshold: float = 1.0) -> List[Dict[str, Any]]:
"""Get callbacks with average execution time above threshold.
Args:
threshold: Minimum average time in seconds
Returns:
List of slow callback information, sorted by avg_time
"""
slow_callbacks = []
for key, metrics in self._metrics.items():
if metrics.avg_time > threshold:
slow_callbacks.append({
"callback": key,
"avg_time": metrics.avg_time,
"max_time": metrics.max_time,
"call_count": metrics.call_count
})
return sorted(slow_callbacks, key=lambda x: x["avg_time"], reverse=True)
def enable_event_history(self, enabled: bool = True) -> None:
"""Enable or disable event history recording.
Args:
enabled: Whether to record event history
"""
self._enable_history = enabled
def get_event_history(
self,
event: Optional[str] = None,
since: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""Get recorded event history.
Args:
event: Optional event name filter
since: Optional datetime filter
Returns:
List of event records
"""
history = list(self._event_history)
if event:
history = [h for h in history if h["event"] == event]
if since:
since_timestamp = since.timestamp()
history = [h for h in history if h["timestamp"] >= since_timestamp]
return history
async def replay_events(self, since: Optional[datetime] = None) -> None:
"""Replay recorded events.
Args:
since: Optional datetime to replay from
"""
history = self.get_event_history(since=since)
for record in history:
await self.trigger(
record["event"],
*record["args"],
**record["kwargs"]
)
def save_state(self, filepath: str) -> None:
"""Save framework state to file.
Note: Does not save actual callback functions, only metadata.
Args:
filepath: Path to save state file
"""
state = {
"callbacks": {},
"metrics": {k: v.to_dict() for k, v in self._metrics.items()},
"history": list(self._event_history)
}
for event, callback_list in self._callbacks.items():
state["callbacks"][event] = [
{
"name": ci.callback.__name__,
"priority": ci.priority,
"namespace": ci.namespace,
"tags": list(ci.tags),
"enabled": ci.enabled
}
for ci in callback_list
]
with open(filepath, 'w') as f:
json.dump(state, f, indent=2)
def list_events(self, namespace: Optional[str] = None) -> List[str]:
"""List all registered events.
Args:
namespace: Optional namespace filter
Returns:
List of event names
"""
events = list(self._callbacks.keys())
if namespace:
filtered_events = []
for event in events:
if any(ci.namespace == namespace for ci in self._callbacks[event]):
filtered_events.append(event)
return filtered_events
return events
def list_callbacks(self, event: str) -> List[Dict[str, Any]]:
"""List all callbacks registered for an event.
Args:
event: Event name
Returns:
List of callback information dictionaries
"""
callbacks = []
for ci in self._callbacks.get(event, []):
info = {
"name": ci.callback.__name__,
"priority": ci.priority,
"enabled": ci.enabled,
"namespace": ci.namespace,
"tags": list(ci.tags),
"once": ci.once,
"max_retries": ci.max_retries,
"timeout": ci.timeout
}
callbacks.append(info)
return callbacks
def get_statistics(self) -> Dict[str, Any]:
"""Get overall framework statistics.
Returns:
Dictionary with various statistics
"""
total_callbacks = sum(len(cbs) for cbs in self._callbacks.values())
total_events = len(self._callbacks)
namespaces = set()
for callback_list in self._callbacks.values():
for ci in callback_list:
namespaces.add(ci.namespace)
return {
"total_events": total_events,
"total_callbacks": total_callbacks,
"namespaces": list(namespaces),
"total_filters": len(self._global_filters) + sum(
len(filters) for filters in self._filters.values()
),
"total_chains": len(self._chains),
"history_size": len(self._event_history),
"metrics_collected": len(self._metrics)
}