import asyncio
import uuid
from abc import ABCMeta
from contextlib import asynccontextmanager
from typing import (
AsyncIterator,
Self,
)
from openjiuwen.core.common.constants.constant import INTERACTION
from openjiuwen.core.common.exception.codes import StatusCode
from openjiuwen.core.common.exception.errors import (
BaseError,
build_error,
)
from openjiuwen.core.common.logging import (
LogEventType,
workflow_logger as logger,
)
from openjiuwen.core.common.utils.schema_utils import SchemaUtils
from openjiuwen.core.context_engine import ModelContext
from openjiuwen.core.graph.base import (
CONFIG_KEY,
INPUTS_KEY,
Router,
)
from openjiuwen.core.graph.executable import (
Executable,
Input,
Output,
)
from openjiuwen.core.graph.graph import PregelGraph
from openjiuwen.core.graph.stream_actor.manager import ActorManager
from openjiuwen.core.session import (
InteractiveInput,
NodeSession,
SubWorkflowSession,
Transformer,
WORKFLOW_EXECUTE_TIMEOUT,
WORKFLOW_STREAM_FIRST_FRAME_TIMEOUT,
WORKFLOW_STREAM_FRAME_TIMEOUT,
WorkflowSession,
)
from openjiuwen.core.session.stream import (
BaseStreamMode,
OutputSchema,
StreamEmitter,
StreamMode,
StreamWriterManager,
)
from openjiuwen.core.session.tracer import (
Tracer,
TracerWorkflowUtils,
)
from openjiuwen.core.session.workflow import Session
from openjiuwen.core.workflow._workflow import BaseWorkflow
from openjiuwen.core.workflow.base import (
WorkflowCard,
WorkflowChunk,
WorkflowExecutionState,
WorkflowOutput,
)
from openjiuwen.core.workflow.components.base import ComponentAbility
from openjiuwen.core.workflow.components.component import ComponentComposable
from openjiuwen.core.workflow.workflow_config import WorkflowConfig, ExceptionConfig
from openjiuwen.core.common.background_tasks import BackgroundTask, create_background_task
from openjiuwen.core.runner.callback import trigger
from openjiuwen.core.runner.callback.events import WorkflowEvents
class _WorkflowMeta(ABCMeta):
def __call__(cls, *args, **kwargs):
instance = super().__call__(*args, **kwargs)
from openjiuwen.core.runner import Runner
_fw = Runner.callback_framework
fn = instance.invoke
fn = _fw.emit_before(WorkflowEvents.WORKFLOW_INVOKE_INPUT)(fn)
fn = _fw.transform_io(
input_event=WorkflowEvents.WORKFLOW_INVOKE_INPUT,
output_event=WorkflowEvents.WORKFLOW_INVOKE_OUTPUT,
)(fn)
fn = _fw.emit_after(WorkflowEvents.WORKFLOW_INVOKE_OUTPUT)(fn)
instance.invoke = fn
fn = instance.stream
fn = _fw.emit_before(WorkflowEvents.WORKFLOW_STREAM_INPUT)(fn)
fn = _fw.transform_io(
input_event=WorkflowEvents.WORKFLOW_STREAM_INPUT,
output_event=WorkflowEvents.WORKFLOW_STREAM_OUTPUT,
)(fn)
fn = _fw.emit_after(WorkflowEvents.WORKFLOW_STREAM_OUTPUT, item_key="result")(fn)
instance.stream = fn
return instance
class Workflow(metaclass=_WorkflowMeta):
"""
A workflow represents a directed graph of components that process data.
The workflow orchestrates the execution of connected components, managing
data flow, error handling, and streaming between components.
"""
def __init__(self, card: WorkflowCard = None, **kwargs):
"""
Initialize a new workflow.
Args:
card: Metadata describing the workflow (name, description, etc.)
kwargs: workflow configs
"""
self._card = card if card else WorkflowCard(id=uuid.uuid4().hex)
self._internal = BaseWorkflow(WorkflowConfig(card=self._card, **kwargs), PregelGraph())
self._end_comp_id: str = ""
self._is_streaming = False
@asynccontextmanager
async def _task_group_scope(self):
"""Create a task-manager scope for direct workflow execution."""
from openjiuwen.core.common.task_manager.context import get_task_group
from openjiuwen.core.common.task_manager.manager import get_task_manager
if get_task_group() is not None:
yield
return
async with get_task_manager().task_group():
yield
@property
def card(self):
"""Get the workflow metadata card."""
return self._card
def set_start_comp(
self,
start_comp_id: str,
component: ComponentComposable,
inputs_schema: dict | Transformer = None,
outputs_schema: dict | Transformer = None,
) -> Self:
"""
Set the starting component of the workflow.
The start component is the entry point that receives initial inputs.
Args:
start_comp_id: Unique identifier for the start component
component: The component instance to use as start
inputs_schema: Schema defining expected input structure
outputs_schema: Schema defining output structure
Returns:
Self for method chaining
"""
self._internal.add_workflow_comp(start_comp_id,
component,
wait_for_all=False,
inputs_schema=inputs_schema,
outputs_schema=outputs_schema)
self._internal.start_comp(start_comp_id)
return self
def add_workflow_comp(
self,
comp_id: str,
workflow_comp: ComponentComposable | Executable,
*,
wait_for_all: bool = None,
inputs_schema: dict | Transformer = None,
outputs_schema: dict | Transformer = None,
stream_inputs_schema: dict | Transformer = None,
stream_outputs_schema: dict | Transformer = None,
comp_ability: list[ComponentAbility] = None,
max_retries: int = 0,
timeout: float = -1.0,
exception_config: ExceptionConfig = None,
) -> Self:
"""
Add a component to the workflow graph.
Args:
comp_id: Unique identifier for the component
workflow_comp: The component instance to add
wait_for_all: If True, wait for all predecessor outputs before executing
inputs_schema: Schema defining expected input structure
outputs_schema: Schema defining output structure
stream_inputs_schema: Schema for streaming inputs
stream_outputs_schema: Schema for streaming outputs
comp_ability: List of component capabilities (streaming, batching, etc.)
max_retries: Maximum number of retries on failure (default: 0, no retry)
timeout: Per-node execution timeout in seconds (<=0 means no timeout)
exception_config: Exception handling configuration for error recovery
Returns:
Self for method chaining
"""
self._internal.add_workflow_comp(comp_id,
workflow_comp,
wait_for_all=wait_for_all,
inputs_schema=inputs_schema,
outputs_schema=outputs_schema,
stream_inputs_schema=stream_inputs_schema,
stream_outputs_schema=stream_outputs_schema,
comp_ability=comp_ability,
max_retries=max_retries,
timeout=timeout,
exception_config=exception_config)
return self
def set_end_comp(
self,
end_comp_id: str,
component: ComponentComposable,
inputs_schema: dict | Transformer = None,
outputs_schema: dict | Transformer = None,
stream_inputs_schema: dict | Transformer = None,
stream_outputs_schema: dict | Transformer = None,
response_mode: str = None
) -> Self:
"""
Set the ending component of the workflow.
The end component produces the final output of the workflow.
Args:
end_comp_id: Unique identifier for the end component
component: The component instance to use as end
inputs_schema: Schema defining expected input structure
outputs_schema: Schema defining output structure
stream_inputs_schema: Schema for streaming inputs
stream_outputs_schema: Schema for streaming outputs
response_mode: How the component should respond (e.g., "stream", "batch")
Returns:
Self for method chaining
"""
comp_ability = []
if response_mode is not None and "streaming" == response_mode:
self._is_streaming = True
if inputs_schema is not None:
comp_ability.append(ComponentAbility.STREAM)
if stream_inputs_schema is not None:
comp_ability.append(ComponentAbility.TRANSFORM)
if not comp_ability:
comp_ability = [ComponentAbility.STREAM]
else:
comp_ability = [ComponentAbility.INVOKE]
if stream_inputs_schema is not None:
comp_ability.append(ComponentAbility.COLLECT)
wait_for_all = True if ((ComponentAbility.COLLECT in comp_ability)
or (ComponentAbility.TRANSFORM in comp_ability)) else False
self._internal.add_workflow_comp(
end_comp_id,
component,
wait_for_all=wait_for_all,
comp_ability=comp_ability,
inputs_schema=inputs_schema,
outputs_schema=outputs_schema,
stream_inputs_schema=stream_inputs_schema,
stream_outputs_schema=stream_outputs_schema,
)
self._internal.end_comp(end_comp_id)
self._end_comp_id = end_comp_id
return self
def add_connection(self, src_comp_id: str | list[str], target_comp_id: str) -> Self:
"""
Add a data connection between components.
Creates a directed edge for regular (non-streaming) data flow.
Args:
src_comp_id: Source component ID or set of IDs
target_comp_id: Target component ID
Returns:
Self for method chaining
"""
self._internal.add_connection(src_comp_id, target_comp_id)
return self
def add_stream_connection(self, src_comp_id: str, target_comp_id: str) -> Self:
"""
Add a streaming connection between components.
Creates a directed edge for streaming data flow.
Args:
src_comp_id: Source component ID
target_comp_id: Target component ID
Returns:
Self for method chaining
"""
self._internal.add_stream_connection(src_comp_id, target_comp_id)
return self
def add_conditional_connection(self, src_comp_id: str, router: Router) -> Self:
"""
Add a conditional connection with routing logic.
Creates a connection where the target is determined dynamically
based on the router's logic.
Args:
src_comp_id: Source component ID
router: Router instance that decides the target based on data
Returns:
Self for method chaining
"""
self._internal.add_conditional_connection(src_comp_id, router)
return self
async def invoke(
self,
inputs: Input,
session: Session,
context: ModelContext = None,
**kwargs
) -> WorkflowOutput:
"""
Execute the workflow synchronously.
Runs the entire workflow and returns the final output.
Args:
inputs: Input data for the workflow
session: Workflow session for state management
context: context engine
**kwargs: Additional execution parameters
- is_sub: Whether this is a sub-workflow execution
- skip_inputs_validate: Whether to skip input validation
Returns:
WorkflowOutput containing results and metadata
"""
if kwargs.get("is_sub"):
return await self._sub_invoke(inputs, session, context, **kwargs)
self._validate_session(session)
self._validate_inputs(inputs, **kwargs)
self._install_asyncio_exception_handler()
logger.info(
"Begin to run workflow invoke",
event_type=LogEventType.WORKFLOW_EXECUTE_START,
workflow_id=self._card.id,
workflow_name=self._card.name,
inputs=inputs,
)
workflow_session = self._create_workflow_session(session, stream_modes=[BaseStreamMode.OUTPUT], is_sub=False)
async def _invoke_task():
chunks = []
async for chunk in self._stream(inputs, workflow_session, context=context):
chunks.append(chunk)
is_interaction = False
for chunk in chunks:
if isinstance(chunk, OutputSchema) and chunk.type == INTERACTION:
is_interaction = True
break
if is_interaction:
output = WorkflowOutput(result=[chunk for chunk in chunks],
state=WorkflowExecutionState.INPUT_REQUIRED)
else:
if self._is_streaming:
result = chunks
else:
result = workflow_session.state().get_outputs(self._end_comp_id)
output = WorkflowOutput(result=result, state=WorkflowExecutionState.COMPLETED)
return output
invoke_timeout = workflow_session.config().get_env(WORKFLOW_EXECUTE_TIMEOUT)
try:
result = await self._execute_with_timeout(_invoke_task, invoke_timeout)
logger.info(
"Succeed to run workflow invoke",
event_type=LogEventType.WORKFLOW_EXECUTE_END,
workflow_id=self._card.id,
workflow_name=self._card.name,
session_id=workflow_session.session_id(),
outputs=result
)
return result
except Exception as e:
logger.error(
"Failed to run workflow invoke",
event_type=LogEventType.WORKFLOW_EXECUTE_ERROR,
workflow_id=self._card.id,
workflow_name=self._card.name,
exception=e,
session_id=workflow_session.session_id(),
)
raise e
async def stream(
self,
inputs: Input,
session: Session,
context: ModelContext = None,
stream_modes: list[StreamMode] = None,
**kwargs
) -> AsyncIterator[WorkflowChunk]:
"""
Execute the workflow with streaming output.
Returns an async iterator that yields workflow chunks as they become available.
Args:
inputs: Input data for the workflow
session: Workflow session for state management
stream_modes: Type(s) of WorkflowChunk
context: context engine
**kwargs: Additional execution parameters
- is_sub: Whether this is a sub-workflow execution
- skip_inputs_validate: Whether to skip input validation
Yields:
WorkflowChunk: Stream chunks containing partial results, logs, or events
"""
if kwargs.get("is_sub"):
async for chunk in self._sub_stream(inputs, session, context, **kwargs):
yield chunk
return
self._validate_session(session)
self._validate_inputs(inputs, **kwargs)
self._install_asyncio_exception_handler()
logger.info(
"Begin to run workflow stream",
event_type=LogEventType.WORKFLOW_EXECUTE_START,
workflow_id=self._card.id,
workflow_name=self._card.name,
inputs=inputs,
)
workflow_session = self._create_workflow_session(session, stream_modes=stream_modes, is_sub=False)
idx = 0
async for chunk in self._stream(inputs, workflow_session, context):
logger.debug(
f"Output workflow chunk[{idx}]",
event_type=LogEventType.WORKFLOW_OUTPUT_CHUNK,
workflow_id=self._card.id,
workflow_name=self._card.name,
chunk=chunk,
chunk_idx=idx,
)
yield chunk
idx += 1
logger.info(
"Succeed to run workflow stream",
event_type=LogEventType.WORKFLOW_EXECUTE_END,
workflow_id=self._card.id,
workflow_name=self._card.name,
metadata={"total_chunks": idx}
)
def draw(
self,
title: str = "",
output_format: str = "mermaid",
expand_subgraph: int | bool = False,
enable_animation: bool = False,
**kwargs
) -> str | bytes:
"""
Generate a Mermaid diagram of the workflow.
Visualizes the workflow structure as a flowchart.
Args:
title: Diagram title
output_format: Output format ("mermaid", "png", or "svg")
expand_subgraph: Level of subgraph expansion (False/True or integer depth)
enable_animation: Enable animation in Mermaid diagram (Mermaid format only)
**kwargs: Additional rendering options
Returns:
str: Mermaid syntax when output_format="mermaid"
bytes: Image binary data when output_format="png" or "svg"
"""
if output_format == "png":
return self._internal.to_mermaid_png(title, expand_subgraph)
if output_format == "svg":
return self._internal.to_mermaid_svg(title, expand_subgraph)
return self._internal.to_mermaid(title, expand_subgraph, enable_animation)
async def _stream(self, inputs: Input,
session: WorkflowSession,
context: ModelContext = None,
) -> AsyncIterator[WorkflowChunk]:
await TracerWorkflowUtils.trace_workflow_start(session, inputs)
await trigger(
WorkflowEvents.WORKFLOW_STARTED,
workflow_id=self._card.id,
workflow_name=self._card.name,
inputs=inputs)
timeout = session.config().get_env(WORKFLOW_EXECUTE_TIMEOUT)
frame_timeout = session.config().get_env(WORKFLOW_STREAM_FRAME_TIMEOUT)
if timeout is not None and 0 < timeout <= frame_timeout:
frame_timeout = timeout
session.config().set_envs({WORKFLOW_STREAM_FRAME_TIMEOUT: frame_timeout})
first_frame_timeout = session.config().get_env(WORKFLOW_STREAM_FIRST_FRAME_TIMEOUT)
if timeout is not None and 0 < timeout <= first_frame_timeout:
first_frame_timeout = timeout
session.config().set_envs({WORKFLOW_STREAM_FIRST_FRAME_TIMEOUT: first_frame_timeout})
async def stream_process():
compiled_graph = self._internal.compile(session, context)
try:
await compiled_graph.invoke({INPUTS_KEY: inputs, CONFIG_KEY: None}, session)
finally:
outputs = session.state().get_outputs(self._end_comp_id)
await asyncio.shield(TracerWorkflowUtils.trace_workflow_done(session, outputs))
await asyncio.shield(session.stream_writer_manager().stream_emitter().close())
task = await self._spawn_background_task(
self._execute_with_timeout(stream_process, timeout),
name=f"workflow_stream_process:{self._card.id}",
)
try:
interaction_chuck_list = []
chunks = []
async for chunk in session.stream_writer_manager().stream_output(first_frame_timeout=first_frame_timeout,
timeout=frame_timeout,
need_close=True):
yield chunk
if isinstance(chunk, OutputSchema) and chunk.type == INTERACTION:
interaction_chuck_list.append(chunk)
chunks.append(chunk)
try:
await self._wait_background_task(task)
results = session.state().get_outputs(self._end_comp_id)
await trigger(
WorkflowEvents.WORKFLOW_FINISHED,
workflow_id=self._card.id,
workflow_name=self._card.name,
outputs=results)
if results:
yield OutputSchema(type="workflow_final", index=0, payload=results)
except asyncio.CancelledError:
logger.warning(
"Workflow stream output be cancelled",
event_type=LogEventType.WORKFLOW_EXECUTE_ERROR,
workflow_id=self._card.id,
workflow_name=self._card.name
)
raise
except asyncio.CancelledError:
await trigger(
WorkflowEvents.WORKFLOW_CANCELLED,
workflow_id=self._card.id,
workflow_name=self._card.name)
logger.warning(
"Canecel stream output task",
event_type=LogEventType.WORKFLOW_EXECUTE_ERROR,
workflow_id=self._card.id,
workflow_name=self._card.name
)
if not self._is_background_task_done(task):
await task.cancel(reason="workflow_cancelled")
raise
except BaseError as e:
await trigger(
WorkflowEvents.WORKFLOW_ERROR,
workflow_id=self._card.id,
workflow_name=self._card.name,
error=e)
raise
finally:
if not self._is_background_task_done(task):
await task.cancel(reason="workflow_cancelled")
await asyncio.shield(session.close())
await asyncio.shield(self._internal.reset())
async def _sub_invoke(self, inputs: Input, session: Session,
context: ModelContext = None, **kwargs) -> Output:
sub_workflow_session = self._create_workflow_session(session, is_sub=True)
try:
compiled_graph = self._internal.compile(sub_workflow_session, context)
await compiled_graph.invoke({INPUTS_KEY: inputs, CONFIG_KEY: kwargs.get(CONFIG_KEY)},
sub_workflow_session)
if self._is_streaming:
messages = []
sub_end_ability = self._internal.config().spec.comp_configs.get(self._end_comp_id).abilities
required_abilities = [ComponentAbility.STREAM, ComponentAbility.TRANSFORM]
stream_ability_count = sum(ability in sub_end_ability for ability in required_abilities)
while stream_ability_count > 0:
frame = await sub_workflow_session.actor_manager().sub_workflow_stream().receive(
session.get_env(WORKFLOW_EXECUTE_TIMEOUT))
if frame is None:
continue
if frame == StreamEmitter.END_FRAME:
stream_ability_count -= 1
continue
messages.append(frame)
if messages:
return dict(stream=messages)
node_session = NodeSession(sub_workflow_session, self._end_comp_id)
output_key = self._end_comp_id
results = node_session.state().get_outputs(output_key)
return results
finally:
await asyncio.shield(sub_workflow_session.close())
await asyncio.shield(self._internal.reset())
async def _sub_stream(self, inputs: Input, session: Session, context: ModelContext = None, **kwargs) -> \
AsyncIterator[Output]:
sub_workflow_session = self._create_workflow_session(session, is_sub=True)
try:
compiled_graph = self._internal.compile(sub_workflow_session, context=context)
await compiled_graph.invoke({INPUTS_KEY: inputs, CONFIG_KEY: kwargs.get(CONFIG_KEY)}, sub_workflow_session)
if self._is_streaming:
frame_count = 0
stream_timeout = session.get_env(WORKFLOW_EXECUTE_TIMEOUT)
sub_end_ability = self._internal.config().spec.comp_configs.get(self._end_comp_id).abilities
required_abilities = [ComponentAbility.STREAM, ComponentAbility.TRANSFORM]
stream_ability_count = sum(ability in sub_end_ability for ability in required_abilities)
while stream_ability_count > 0:
frame = await sub_workflow_session.actor_manager().sub_workflow_stream().receive(stream_timeout)
if frame is None:
continue
if frame == StreamEmitter.END_FRAME:
stream_ability_count -= 1
continue
frame_count += 1
yield frame
finally:
await asyncio.shield(sub_workflow_session.close())
await asyncio.shield(self._internal.reset())
async def _execute_with_timeout(self, func, timeout):
task = await self._spawn_background_task(
func(),
name=f"workflow_exec_timeout:{self._card.id}",
)
try:
return await asyncio.wait_for(
self._wait_background_task(task),
timeout=timeout if (timeout and timeout > 0) else None,
)
except asyncio.CancelledError:
logger.error(
"Workflow execution cancelled",
event_type=LogEventType.WORKFLOW_EXECUTE_ERROR,
workflow_id=self._card.id,
workflow_name=self._card.name
)
raise
except (asyncio.TimeoutError, TimeoutError) as e:
logger.error(
f"Workflow execution timeout {timeout} s",
event_type=LogEventType.WORKFLOW_EXECUTE_ERROR,
workflow_id=self._card.id,
workflow_name=self._card.name,
exception=e
)
raise build_error(StatusCode.WORKFLOW_EXECUTION_TIMEOUT, cause=e, timeout=timeout, card=self._card)
except RecursionError as e:
logger.error(
"Workflow execution recursion limit reached",
event_type=LogEventType.WORKFLOW_EXECUTE_ERROR,
workflow_id=self._card.id,
workflow_name=self._card.name,
exception=e
)
raise build_error(StatusCode.WORKFLOW_EXECUTION_ERROR, cause=e, reason=e,
workflow=self._card.to_str()) from e
except BaseError as e:
raise e
finally:
if not self._is_background_task_done(task):
await task.cancel(reason="workflow_cancelled")
async def _spawn_background_task(self, coro, name: str) -> BackgroundTask:
return await create_background_task(coro, name=name, group="workflow")
@staticmethod
async def _wait_background_task(task: BackgroundTask):
return await task.wait()
@staticmethod
def _is_background_task_done(task: BackgroundTask) -> bool:
return task.done()
def _create_workflow_session(self, session, stream_modes=None, is_sub: bool = False):
if not is_sub:
session.set_workflow_card(self._card)
parent = session.get_parent()
workflow_session = WorkflowSession(workflow_id=self._card.id,
parent=parent if parent is not None else None,
session_id=session.get_session_id())
workflow_session.config().set_envs(session.get_envs())
self._internal.auto_complete_abilities()
workflow_session.config().add_workflow_config(workflow_id=self._card.id,
workflow_config=self._internal.config())
mq_manager = ActorManager(self._internal.config().spec, self._internal.stream_actor(), sub_graph=False,
session=workflow_session)
workflow_session.set_actor_manager(mq_manager)
workflow_session.set_stream_writer_manager(
StreamWriterManager(stream_emitter=StreamEmitter(), modes=stream_modes))
if workflow_session.tracer() is None and (stream_modes is None or BaseStreamMode.TRACE in stream_modes):
tracer = Tracer()
tracer.init(workflow_session.stream_writer_manager())
workflow_session.set_tracer(tracer)
return workflow_session
else:
inner_session = getattr(session, "_inner")
self._internal.auto_complete_abilities()
actor_manager = ActorManager(self._internal.config().spec, self._internal.stream_actor(), sub_graph=True,
session=inner_session)
sub_workflow_session = SubWorkflowSession(
inner_session,
workflow_id=self._card.id,
actor_manager=actor_manager
)
sub_workflow_session.config().add_workflow_config(workflow_id=self._card.id,
workflow_config=self._internal.config())
return sub_workflow_session
def _validate_inputs(self, inputs, **kwargs):
if self._card.input_params is not None and not isinstance(inputs, InteractiveInput):
try:
inputs = SchemaUtils.format_with_schema(inputs, self._card.input_params,
skip_validate=kwargs.get("skip_inputs_validate"))
except Exception as e:
raise build_error(StatusCode.WORKFLOW_EXECUTE_INPUT_INVALID, cause=e, inputs=inputs,
reason=f"input validation failed against schema: {str(e) if e else 'Unknown error'}",
workflow=self._card.to_str())
def _validate_session(self, session):
if not session:
raise build_error(StatusCode.WORKFLOW_EXECUTE_SESSION_INVALID,
reason="session is required for workflow execution",
workflow=self._card.to_str())
@staticmethod
def _install_asyncio_exception_handler():
"""Install a global exception handler for asyncio tasks to handle unhandled exception."""
def loop_exception_handler(_, context):
"""Handle unhandled exceptions in asyncio tasks."""
exception = context.get("exception")
if exception:
import traceback
traceback_info = ''.join(traceback.format_exception(type(exception), exception,
exception.__traceback__))
logger.error(
"Unhandled exception in asyncio",
event_type=LogEventType.SYSTEM_ERROR,
error_message=str(exception),
metadata={"traceback": traceback_info}
)
loop = asyncio.get_event_loop()
loop.set_exception_handler(loop_exception_handler)