import itertools
import ssl
import time
from collections import defaultdict
import pytest
from mock import ANY, Mock, create_autospec, patch
from six.moves import queue
from nameko.constants import LOGIN_METHOD_CONFIG_KEY
from nameko.containers import WorkerContext
from nameko.events import (
BROADCAST, SERVICE_POOL, SINGLETON, EventDispatcher, EventHandler,
EventHandlerConfigurationError, event_handler
)
from nameko.messaging import QueueConsumer
from nameko.standalone.events import event_dispatcher as standalone_dispatcher
from nameko.standalone.events import get_event_exchange
from nameko.testing.services import dummy, entrypoint_hook, entrypoint_waiter
from nameko.testing.utils import DummyProvider, unpack_mock_call
EVENTS_TIMEOUT = 5
@pytest.fixture
def queue_consumer():
replacement = create_autospec(QueueConsumer)
with patch.object(QueueConsumer, 'bind') as mock_ext:
mock_ext.return_value = replacement
yield replacement
@pytest.mark.parametrize(
"config,expected_no_declare",
[
({'DECLARE_EVENT_EXCHANGES': True}, False),
({'DECLARE_EVENT_EXCHANGES': False}, True),
({'DECLARE_EVENT_EXCHANGES': None}, False),
({}, False)
]
)
def test_auto_declaration(config, expected_no_declare):
service_name = "example"
exchange = get_event_exchange(service_name, config)
assert exchange.no_declare is expected_no_declare
@pytest.mark.parametrize(
"config,expected_auto_delete",
[
({'AUTO_DELETE_EVENT_EXCHANGES': True}, True),
({'AUTO_DELETE_EVENT_EXCHANGES': False}, False),
({'AUTO_DELETE_EVENT_EXCHANGES': None}, False),
({}, False)
]
)
def test_auto_delete(config, expected_auto_delete):
service_name = "example"
exchange = get_event_exchange(service_name, config)
assert exchange.auto_delete is expected_auto_delete
def test_event_dispatcher(mock_container, mock_producer, rabbit_config):
container = mock_container
container.config = rabbit_config
container.service_name = "srcservice"
service = Mock()
worker_ctx = WorkerContext(container, service, DummyProvider("dispatch"))
custom_retry_policy = {'max_retries': 5}
event_dispatcher = EventDispatcher(retry_policy=custom_retry_policy).bind(
container, attr_name="dispatch")
event_dispatcher.setup()
service.dispatch = event_dispatcher.get_dependency(worker_ctx)
service.dispatch('eventtype', 'msg')
headers = event_dispatcher.get_message_headers(worker_ctx)
expected_args = ('msg',)
expected_kwargs = {
'exchange': ANY,
'routing_key': 'eventtype',
'headers': headers,
'declare': event_dispatcher.declare,
'retry': event_dispatcher.publisher_cls.retry,
'retry_policy': custom_retry_policy,
'compression': event_dispatcher.publisher_cls.compression,
'mandatory': event_dispatcher.publisher_cls.mandatory,
'expiration': event_dispatcher.publisher_cls.expiration,
'delivery_mode': event_dispatcher.publisher_cls.delivery_mode,
'priority': event_dispatcher.publisher_cls.priority,
'serializer': event_dispatcher.serializer,
}
assert mock_producer.publish.call_count == 1
args, kwargs = mock_producer.publish.call_args
assert args == expected_args
assert kwargs == expected_kwargs
assert kwargs['exchange'].name == 'srcservice.events'
def test_event_handler(queue_consumer, mock_container):
container = mock_container
container.service_name = "destservice"
event_handler = EventHandler("srcservice", "eventtype").bind(container,
"foobar")
event_handler.setup()
assert event_handler.queue.durable is True
assert event_handler.queue.routing_key == "eventtype"
assert event_handler.queue.exchange.name == "srcservice.events"
queue_consumer.register_provider.assert_called_once_with(event_handler)
event_handler = EventHandler(
"srcservice", "eventtype"
).bind(
container, "foobar"
)
event_handler.setup()
assert event_handler.queue.name == (
"evt-srcservice-eventtype--destservice.foobar")
assert event_handler.queue.exclusive is False
with patch('nameko.events.uuid') as mock_uuid:
mock_uuid.uuid4().hex = "uuid-value"
event_handler = EventHandler(
"srcservice", "eventtype",
handler_type=BROADCAST, reliable_delivery=False
).bind(
container, "foobar"
)
event_handler.setup()
assert event_handler.queue.name == (
"evt-srcservice-eventtype--destservice.foobar-{}".format("uuid-value"))
assert event_handler.queue.exclusive is True
class BroadcastEventHandler(EventHandler):
broadcast_identifier = "testbox"
event_handler = BroadcastEventHandler(
"srcservice", "eventtype", handler_type=BROADCAST
).bind(
container, "foobar"
)
event_handler.setup()
assert event_handler.queue.name == (
"evt-srcservice-eventtype--destservice.foobar-{}".format("testbox"))
assert event_handler.queue.exclusive is False
event_handler = EventHandler(
"srcservice", "eventtype", handler_type=SINGLETON
).bind(
container, "foobar"
)
event_handler.setup()
assert event_handler.queue.name == "evt-srcservice-eventtype"
assert event_handler.queue.exclusive is False
event_handler = EventHandler(
"srcservice", "eventtype"
).bind(
container, "foobar"
)
event_handler.setup()
assert event_handler.queue.auto_delete is False
class TestReliableDeliveryEventHandlerConfigurationError():
def test_raises_with_default_broadcast_identity(
self, queue_consumer, mock_container
):
container = mock_container
container.service_name = "destservice"
with pytest.raises(EventHandlerConfigurationError):
EventHandler(
"srcservice", "eventtype",
handler_type=BROADCAST, reliable_delivery=True
).bind(
container, "foobar"
)
def test_no_raise_with_custom_identity(
self, queue_consumer, mock_container
):
container = mock_container
container.service_name = "destservice"
class BroadcastEventHandler(EventHandler):
broadcast_identifier = "testbox"
event_handler = BroadcastEventHandler(
"srcservice", "eventtype",
handler_type=BROADCAST, reliable_delivery=True
).bind(
container, "foobar"
)
assert event_handler.reliable_delivery is True
services = defaultdict(list)
events = []
@pytest.fixture
def reset_state():
yield
services.clear()
events[:] = []
class CustomEventHandler(EventHandler):
_calls = []
def __init__(self, *args, **kwargs):
super(CustomEventHandler, self).__init__(*args, **kwargs)
self._calls[:] = []
def handle_result(self, message, worker_ctx, result=None, exc_info=None):
super(CustomEventHandler, self).handle_result(
message, worker_ctx, result, exc_info)
self._calls.append(message)
return result, exc_info
custom_event_handler = CustomEventHandler.decorator
class HandlerService(object):
""" Generic service that handles events.
"""
name = "handlerservice"
def __init__(self):
self.events = []
services[self.name].append(self)
def handle(self, evt):
self.events.append(evt)
events.append(evt)
class ServicePoolHandler(HandlerService):
@event_handler('srcservice', 'eventtype', handler_type=SERVICE_POOL)
def handle(self, evt):
super(ServicePoolHandler, self).handle(evt)
class DoubleServicePoolHandler(HandlerService):
@event_handler('srcservice', 'eventtype', handler_type=SERVICE_POOL)
def handle_1(self, evt):
super(DoubleServicePoolHandler, self).handle(evt)
@event_handler('srcservice', 'eventtype', handler_type=SERVICE_POOL)
def handle_2(self, evt):
super(DoubleServicePoolHandler, self).handle(evt)
class SingletonHandler(HandlerService):
@event_handler('srcservice', 'eventtype', handler_type=SINGLETON)
def handle(self, evt):
super(SingletonHandler, self).handle(evt)
class BroadcastHandler(HandlerService):
@event_handler(
'srcservice', 'eventtype',
handler_type=BROADCAST, reliable_delivery=False
)
def handle(self, evt):
super(BroadcastHandler, self).handle(evt)
class RequeueingHandler(HandlerService):
@event_handler('srcservice', 'eventtype', requeue_on_error=True)
def handle(self, evt):
super(RequeueingHandler, self).handle(evt)
raise Exception("Error")
class UnreliableHandler(HandlerService):
@event_handler('srcservice', 'eventtype', reliable_delivery=False)
def handle(self, evt):
super(UnreliableHandler, self).handle(evt)
class CustomHandler(HandlerService):
@custom_event_handler('srcservice', 'eventtype')
def handle(self, evt):
super(CustomHandler, self).handle(evt)
def service_factory(prefix, base):
""" Test utility to create subclasses of the above ServiceHandler classes
based on a prefix and base. The prefix is set as the ``name`` attribute
on the resulting type.
e.g. ``service_factory("foo", ServicePoolHandler)`` returns a type
called ``FooServicePoolHandler`` that inherits from ``ServicePoolHandler``,
and ``FooServicePoolHandler.name`` is ``"foo"``.
"""
name = prefix.title() + base.__name__
cls = type(name, (base,), {'name': prefix})
return cls
@pytest.fixture
def start_containers(request, container_factory, rabbit_config, reset_state):
def make(base, prefixes):
""" Use ``service_factory`` to create a service type inheriting from
``base`` using the given prefixes, and start a container for that
service.
If a prefix is given multiple times, create multiple containers for
that service type. If no prefixes are given, create a single container
with a type that does not extend the base.
Stops all started containers when the test ends.
"""
services = {}
containers = []
for prefix in prefixes:
key = (prefix, base)
if key not in services:
service = service_factory(prefix, base)
services[key] = service
service_cls = services.get(key)
ct = container_factory(service_cls, rabbit_config)
containers.append(ct)
ct.start()
request.addfinalizer(ct.stop)
return containers
return make
def test_service_pooled_events(rabbit_manager, rabbit_config,
start_containers):
vhost = rabbit_config['vhost']
start_containers(ServicePoolHandler, ("foo", "foo", "bar"))
foo_queue = rabbit_manager.get_queue(
vhost, "evt-srcservice-eventtype--foo.handle")
assert len(foo_queue['consumer_details']) == 2
bar_queue = rabbit_manager.get_queue(
vhost, "evt-srcservice-eventtype--bar.handle")
assert len(bar_queue['consumer_details']) == 1
exchange_name = "srcservice.events"
rabbit_manager.publish(
vhost, exchange_name, 'eventtype', '"msg"',
properties=dict(content_type='application/json')
)
time.sleep(.1)
assert len(events) == 2
assert len(services['foo']) == 1
assert isinstance(services['foo'][0], ServicePoolHandler)
assert services['foo'][0].events == ["msg"]
assert len(services['bar']) == 1
assert isinstance(services['bar'][0], ServicePoolHandler)
assert services['bar'][0].events == ["msg"]
def test_service_pooled_events_multiple_handlers(
rabbit_manager, rabbit_config, start_containers):
vhost = rabbit_config['vhost']
(container,) = start_containers(DoubleServicePoolHandler, ("double",))
foo_queue_1 = rabbit_manager.get_queue(
vhost, "evt-srcservice-eventtype--double.handle_1")
assert len(foo_queue_1['consumer_details']) == 1
foo_queue_2 = rabbit_manager.get_queue(
vhost, "evt-srcservice-eventtype--double.handle_2")
assert len(foo_queue_2['consumer_details']) == 1
exchange_name = "srcservice.events"
with entrypoint_waiter(container, 'handle_1'):
with entrypoint_waiter(container, 'handle_2'):
rabbit_manager.publish(
vhost, exchange_name, 'eventtype', '"msg"',
properties=dict(content_type='application/json')
)
assert len(events) == 2
assert len(services['double']) == 2
assert services['double'][0].events == ["msg"]
assert services['double'][1].events == ["msg"]
def test_singleton_events(rabbit_manager, rabbit_config, start_containers):
vhost = rabbit_config['vhost']
start_containers(SingletonHandler, ("foo", "foo", "bar"))
queue = rabbit_manager.get_queue(vhost, "evt-srcservice-eventtype")
assert len(queue['consumer_details']) == 3
exchange_name = "srcservice.events"
rabbit_manager.publish(vhost, exchange_name, 'eventtype', '"msg"',
properties=dict(content_type='application/json'))
time.sleep(.1)
assert len(events) == 1
assert len(services) == 1
lucky_service = next(iter(services))
assert len(services[lucky_service]) == 1
assert isinstance(services[lucky_service][0], SingletonHandler)
assert services[lucky_service][0].events == ["msg"]
def test_broadcast_events(rabbit_manager, rabbit_config, start_containers):
vhost = rabbit_config['vhost']
(c1, c2, c3) = start_containers(BroadcastHandler, ("foo", "foo", "bar"))
queues = rabbit_manager.get_queues(vhost)
queue_names = [queue['name'] for queue in queues
if queue['name'].startswith("evt-srcservice-eventtype-")]
assert len(queue_names) == 3
for name in queue_names:
queue = rabbit_manager.get_queue(vhost, name)
assert len(queue['consumer_details']) == 1
exchange_name = "srcservice.events"
with entrypoint_waiter(c1, 'handle'):
with entrypoint_waiter(c2, 'handle'):
with entrypoint_waiter(c3, 'handle'):
rabbit_manager.publish(
vhost, exchange_name, 'eventtype', '"msg"',
properties=dict(content_type='application/json')
)
assert len(events) == 3
assert len(services) == 2
assert len(services['foo']) == 2
assert isinstance(services['foo'][0], BroadcastHandler)
assert isinstance(services['foo'][1], BroadcastHandler)
assert services['foo'][0].events == ["msg"]
assert services['foo'][1].events == ["msg"]
assert len(services['bar']) == 1
assert isinstance(services['bar'][0], BroadcastHandler)
assert services['bar'][0].events == ["msg"]
def test_requeue_on_error(rabbit_manager, rabbit_config, start_containers):
vhost = rabbit_config['vhost']
(container,) = start_containers(RequeueingHandler, ('requeue',))
queue = rabbit_manager.get_queue(
vhost, "evt-srcservice-eventtype--requeue.handle")
assert len(queue['consumer_details']) == 1
counter = itertools.count(start=1)
def entrypoint_fired_twice(worker_ctx, res, exc_info):
return next(counter) > 1
with entrypoint_waiter(
container, 'handle', callback=entrypoint_fired_twice
):
rabbit_manager.publish(
vhost, "srcservice.events", 'eventtype', '"msg"',
properties=dict(content_type='application/json')
)
container.stop()
assert len(events) > 1
assert len(services['requeue']) > 1
for service in services['requeue']:
assert service.events == ["msg"]
def test_reliable_delivery(
rabbit_manager, rabbit_config, start_containers, container_factory
):
""" Events sent to queues declared by ``reliable_delivery`` handlers
should be received even if no service was listening when they were
dispatched.
"""
vhost = rabbit_config['vhost']
(container,) = start_containers(ServicePoolHandler, ('service-pool',))
queue_name = "evt-srcservice-eventtype--service-pool.handle"
queue = rabbit_manager.get_queue(vhost, queue_name)
assert len(queue['consumer_details']) == 1
exchange_name = "srcservice.events"
with entrypoint_waiter(container, 'handle'):
rabbit_manager.publish(
vhost, exchange_name, 'eventtype', '"msg_1"',
properties=dict(content_type='application/json')
)
assert events == ["msg_1"]
container.stop()
queues = rabbit_manager.get_queues(vhost)
assert queue_name in [q['name'] for q in queues]
queue = rabbit_manager.get_queue(vhost, queue_name)
assert len(queue['consumer_details']) == 0
rabbit_manager.publish(vhost, exchange_name, 'eventtype', '"msg_2"',
properties=dict(content_type='application/json'))
messages = rabbit_manager.get_messages(vhost, queue_name, requeue=True)
assert ['"msg_2"'] == [msg['payload'] for msg in messages]
class ServicePool(ServicePoolHandler):
name = "service-pool"
container = container_factory(ServicePool, rabbit_config)
with entrypoint_waiter(container, 'handle'):
container.start()
assert len(events) == 2
assert events == ["msg_1", "msg_2"]
def test_unreliable_delivery(rabbit_manager, rabbit_config, start_containers):
""" Events sent to queues declared by non- ``reliable_delivery`` handlers
should be lost if no service was listening when they were dispatched.
"""
vhost = rabbit_config['vhost']
(c1,) = start_containers(UnreliableHandler, ('unreliable',))
(c2,) = start_containers(ServicePoolHandler, ('keep-exchange-alive',))
queue_name = "evt-srcservice-eventtype--unreliable.handle"
queue = rabbit_manager.get_queue(vhost, queue_name)
assert len(queue['consumer_details']) == 1
exchange_name = "srcservice.events"
with entrypoint_waiter(c1, 'handle'):
with entrypoint_waiter(c2, 'handle'):
rabbit_manager.publish(
vhost, exchange_name, 'eventtype', '"msg_1"',
properties=dict(content_type='application/json')
)
assert events == ["msg_1", "msg_1"]
assert len(services['unreliable']) == 1
assert services['unreliable'][0].events == ["msg_1"]
c1.stop()
queues = rabbit_manager.get_queues(vhost)
assert queue_name not in [q['name'] for q in queues]
rabbit_manager.publish(vhost, exchange_name, 'eventtype', '"msg_2"',
properties=dict(content_type='application/json'))
(c3,) = start_containers(UnreliableHandler, ('unreliable',))
queue = rabbit_manager.get_queue(vhost, queue_name)
assert len(queue['consumer_details']) == 1
with entrypoint_waiter(c3, 'handle'):
rabbit_manager.publish(
vhost, exchange_name, 'eventtype', '"msg_3"',
properties=dict(content_type='application/json')
)
assert len(services['unreliable']) == 2
assert services['unreliable'][0].events == ["msg_1"]
assert services['unreliable'][1].events == ["msg_3"]
def test_custom_event_handler(rabbit_manager, rabbit_config, start_containers):
"""Uses a custom handler subclass for the event_handler entrypoint"""
(container,) = start_containers(CustomHandler, ('custom-events',))
payload = {'custom': 'data'}
dispatch = standalone_dispatcher(rabbit_config)
with entrypoint_waiter(container, 'handle'):
dispatch('srcservice', "eventtype", payload)
assert CustomEventHandler._calls[0].payload == payload
def test_dispatch_to_rabbit(rabbit_manager, rabbit_config, mock_container):
vhost = rabbit_config['vhost']
container = mock_container
container.shared_extensions = {}
container.service_name = "srcservice"
container.config = rabbit_config
service = Mock()
worker_ctx = WorkerContext(container, service, DummyProvider())
dispatcher = EventDispatcher().bind(container, 'dispatch')
dispatcher.setup()
dispatcher.start()
exchanges = rabbit_manager.get_exchanges(vhost)
queues = rabbit_manager.get_queues(vhost)
assert "srcservice.events" in [exchange['name'] for exchange in exchanges]
assert queues == []
rabbit_manager.create_queue(vhost, "event-sink", auto_delete=True)
rabbit_manager.create_queue_binding(
vhost, "srcservice.events", "event-sink", routing_key="eventtype")
service.dispatch = dispatcher.get_dependency(worker_ctx)
service.dispatch("eventtype", "msg")
messages = rabbit_manager.get_messages(vhost, "event-sink")
assert ['"msg"'] == [msg['payload'] for msg in messages]
class TestBackwardsCompatClassAttrs(object):
@pytest.mark.parametrize("parameter,value", [
('retry', False),
('retry_policy', {'max_retries': 999}),
('use_confirms', False),
])
def test_attrs_are_applied_as_defaults(
self, parameter, value, mock_container
):
""" Verify that you can specify some fields by subclassing the
EventDispatcher DependencyProvider.
"""
dispatcher_cls = type(
"LegacyEventDispatcher", (EventDispatcher,), {parameter: value}
)
with patch('nameko.messaging.warnings') as warnings:
mock_container.config = {'AMQP_URI': 'memory://'}
mock_container.service_name = "service"
dispatcher = dispatcher_cls().bind(mock_container, "dispatch")
assert warnings.warn.called
call_args = warnings.warn.call_args
assert parameter in unpack_mock_call(call_args).positional[0]
dispatcher.setup()
assert getattr(dispatcher.publisher, parameter) == value
class TestConfigurability(object):
"""
Test and demonstrate configuration options for the EventDispatcher
"""
@pytest.fixture
def get_producer(self):
with patch('nameko.amqp.publish.get_producer') as get_producer:
yield get_producer
@pytest.fixture
def producer(self, get_producer):
producer = get_producer().__enter__.return_value
producer.channel.returned_messages.get_nowait.side_effect = queue.Empty
return producer
@pytest.mark.parametrize("parameter", [
'delivery_mode', 'mandatory', 'priority', 'expiration',
'serializer', 'compression',
'retry', 'retry_policy',
'correlation_id', 'user_id', 'bogus_param'
])
def test_regular_parameters(
self, parameter, mock_container, producer
):
""" Verify that most parameters can be specified at instantiation time.
"""
mock_container.config = {'AMQP_URI': 'memory://localhost'}
mock_container.service_name = "service"
worker_ctx = Mock()
worker_ctx.context_data = {}
value = Mock()
dispatcher = EventDispatcher(
**{parameter: value}
).bind(mock_container, "dispatch")
dispatcher.setup()
dispatch = dispatcher.get_dependency(worker_ctx)
dispatch("event-type", "event-data")
assert producer.publish.call_args[1][parameter] == value
@pytest.mark.usefixtures('predictable_call_ids')
def test_headers(self, mock_container, producer):
""" Headers can be provided at instantiation time, and are merged with
Nameko headers.
"""
mock_container.config = {
'AMQP_URI': 'memory://localhost'
}
mock_container.service_name = "service"
service = Mock()
entrypoint = Mock(method_name="method")
worker_ctx = WorkerContext(
mock_container, service, entrypoint, data={'context': 'data'}
)
nameko_headers = {
'nameko.context': 'data',
'nameko.call_id_stack': ['service.method.0'],
}
value = {'foo': Mock()}
dispatcher = EventDispatcher(
**{'headers': value}
).bind(mock_container, "dispatch")
dispatcher.setup()
dispatch = dispatcher.get_dependency(worker_ctx)
def merge_dicts(base, *updates):
merged = base.copy()
[merged.update(update) for update in updates]
return merged
dispatch("event-type", "event-data")
assert producer.publish.call_args[1]['headers'] == merge_dicts(
nameko_headers, value
)
def test_restricted_parameters(
self, mock_container, producer
):
""" Verify that providing routing parameters at instantiation
time has no effect.
"""
mock_container.config = {'AMQP_URI': 'memory://localhost'}
mock_container.service_name = "service"
worker_ctx = Mock()
worker_ctx.context_data = {}
exchange = Mock()
routing_key = Mock()
dispatcher = EventDispatcher(
exchange=exchange,
routing_key=routing_key,
).bind(mock_container, "dispatch")
dispatcher.setup()
dispatch = dispatcher.get_dependency(worker_ctx)
event_exchange = get_event_exchange("service", config={})
event_type = "event-type"
dispatch(event_type, "event-data")
assert producer.publish.call_args[1]['exchange'] == event_exchange
assert producer.publish.call_args[1]['routing_key'] == event_type
class TestSSL(object):
@pytest.fixture(params=["PLAIN", "AMQPLAIN", "EXTERNAL"])
def login_method(self, request):
return request.param
@pytest.fixture(params=[True, False], ids=["use client cert", "no client cert"])
def use_client_cert(self, request):
return request.param
@pytest.fixture
def rabbit_ssl_config(self, rabbit_ssl_config, use_client_cert, login_method):
if use_client_cert is False:
rabbit_ssl_config['AMQP_SSL'] = {
"cert_reqs": ssl.CERT_NONE
}
rabbit_ssl_config[LOGIN_METHOD_CONFIG_KEY] = login_method
if login_method == "EXTERNAL" and not use_client_cert:
pytest.skip("EXTERNAL login method requires cert verification")
return rabbit_ssl_config
def test_event_handler_over_ssl(
self, container_factory, rabbit_ssl_config, rabbit_config
):
class Service(object):
name = "service"
@event_handler("service", "event")
def echo(self, event_data):
return event_data
container = container_factory(Service, rabbit_ssl_config)
container.start()
dispatch = standalone_dispatcher(rabbit_config)
with entrypoint_waiter(container, 'echo') as result:
dispatch("service", "event", "payload")
assert result.get() == "payload"
def test_event_dispatcher_over_ssl(
self, container_factory, rabbit_ssl_config, rabbit_config
):
class Dispatcher(object):
name = "dispatch"
dispatch = EventDispatcher()
@dummy
def method(self, payload):
return self.dispatch("event-type", payload)
class Handler(object):
name = "handler"
@event_handler("dispatch", "event-type")
def echo(self, payload):
return payload
dispatcher = container_factory(Dispatcher, rabbit_ssl_config)
dispatcher.start()
handler = container_factory(Handler, rabbit_config)
handler.start()
with entrypoint_waiter(handler, 'echo') as result:
with entrypoint_hook(dispatcher, 'method') as dispatch:
dispatch("payload")
assert result.get() == "payload"