"""Unit tests for SQLOutboxStore."""
from datetime import datetime, timezone
import json
from unittest.mock import MagicMock, patch
import pytest
from core.models import OutboxEvent
@pytest.fixture
def mock_sql_outbox_store():
"""Create SQLOutboxStore with mocked psycopg2."""
mock_json = lambda x: x
with patch("commit.sql_outbox_store.psycopg2") as mock_pg, \
patch("commit.sql_outbox_store.Json", mock_json), \
patch("commit.sql_outbox_store._HAS_PSYCOPG2", True):
mock_conn = MagicMock()
mock_cursor = MagicMock()
mock_conn.cursor.return_value.__enter__ = lambda s: mock_cursor
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
mock_pg.connect.return_value = mock_conn
from commit.sql_outbox_store import SQLOutboxStore
store = SQLOutboxStore.__new__(SQLOutboxStore)
store._connection_string = "host=localhost dbname=test"
store._fs = MagicMock()
store._pool_size = 5
store._pool = []
store._get_connection = lambda: mock_conn
store._return_connection = lambda c: None
yield store, mock_pg, mock_conn, mock_cursor
def test_insert_event_sends_notify(mock_sql_outbox_store):
store, _, conn, cursor = mock_sql_outbox_store
event = OutboxEvent(
event_id="evt-1",
event_type="UPSERT_CONTEXT",
uri="ctx://acme/users/alice/memories/profile",
payload={"records": []},
status="PENDING",
retry_count=0,
created_at="2026-04-17T10:00:00+00:00",
)
store._insert_event(event, "acme")
assert cursor.execute.call_count == 2
notify_sql, notify_params = cursor.execute.call_args_list[1].args
assert "SELECT pg_notify(%s, %s)" in notify_sql
assert notify_params[0] == store.notify_channel
payload = json.loads(notify_params[1])
assert payload == {
"event_id": "evt-1",
"account_id": "acme",
"event_type": "UPSERT_CONTEXT",
}
conn.commit.assert_called_once()
def test_claim_batch_returns_claimed_events(mock_sql_outbox_store):
store, _, conn, cursor = mock_sql_outbox_store
created_at = datetime(2026, 4, 17, 10, 0, tzinfo=timezone.utc)
cursor.fetchall.return_value = [
(
"evt-2",
"UPSERT_CONTEXT",
"ctx://acme/users/alice/memories/profile",
{"records": []},
"PROCESSING",
1,
created_at,
None,
)
]
claimed = store.claim_batch(worker_id="worker-1", limit=50, timeout_seconds=42)
assert len(claimed) == 1
node_uri, event = claimed[0]
assert node_uri == "ctx://acme/users/alice/memories/profile"
assert event.event_id == "evt-2"
assert event.status == "PROCESSING"
sql, params = cursor.execute.call_args.args
assert "FOR UPDATE SKIP LOCKED" in sql
assert "LIMIT %s" in sql
assert params == (42, 50, "worker-1")
conn.commit.assert_called_once()
def test_open_listener_connection_sets_autocommit_and_listens(
mock_sql_outbox_store,
):
store, mock_pg, _, _ = mock_sql_outbox_store
listener_conn = MagicMock()
listener_cursor = MagicMock()
listener_conn.cursor.return_value.__enter__ = lambda s: listener_cursor
listener_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
mock_pg.connect.return_value = listener_conn
conn = store.open_listener_connection()
assert conn is listener_conn
listener_conn.set_session.assert_called_once_with(autocommit=True)
listener_cursor.execute.assert_called_once_with(f"LISTEN {store.notify_channel}")