"""Event sinks — where ``SpanEvent`` records get persisted."""
from __future__ import annotations
import json
import logging
import os
import threading
from dataclasses import asdict, is_dataclass
from pathlib import Path
from typing import Any, Protocol
logger = logging.getLogger("ogmem.perf.sinks")
class Sink(Protocol):
"""Minimal sink interface. Implementations must be thread-safe."""
def emit(self, event: Any) -> None: ...
def close(self) -> None: ...
def _to_jsonable(event: Any) -> dict:
if is_dataclass(event):
return asdict(event)
if isinstance(event, dict):
return dict(event)
raise TypeError(f"unsupported event type for sink: {type(event)!r}")
class MemorySink:
"""In-process sink for tests."""
def __init__(self) -> None:
self._lock = threading.Lock()
self.events: list[dict] = []
def emit(self, event: Any) -> None:
with self._lock:
self.events.append(_to_jsonable(event))
def close(self) -> None:
return None
def get_events(self) -> list[dict]:
"""Return a copy of all accumulated events (thread-safe)."""
with self._lock:
return list(self.events)
class JsonlSink:
"""Append-only JSON-lines sink.
Writes are synchronous and flushed per event so a crashed process
still leaves a usable trace file. A single ``threading.Lock`` guards
the file handle so emits from multiple threads stay well-formed.
"""
def __init__(self, path: str | os.PathLike[str]):
self._path = Path(path)
self._path.parent.mkdir(parents=True, exist_ok=True)
self._fh = self._path.open("a", encoding="utf-8", buffering=1)
self._lock = threading.Lock()
@property
def path(self) -> Path:
return self._path
def emit(self, event: Any) -> None:
line = json.dumps(_to_jsonable(event), default=str, ensure_ascii=False)
with self._lock:
self._fh.write(line + "\n")
self._fh.flush()
def close(self) -> None:
with self._lock:
if not self._fh.closed:
self._fh.flush()
self._fh.close()
class CompositeSink:
"""Fan-out to multiple sinks."""
def __init__(self, sinks: list[Sink]):
self._sinks = list(sinks)
def emit(self, event: Any) -> None:
for s in self._sinks:
s.emit(event)
def close(self) -> None:
for s in self._sinks:
s.close()
class HttpSink:
"""Streaming HTTP sink — POSTs each event as JSON to a remote endpoint.
Uses a connection pool with a fixed size so concurrent emit calls are
safe. Events are fire-and-forget (non-blocking per call); network errors
are logged but never re-raised so they cannot crash the instrumented
code path.
"""
def __init__(
self,
url: str,
*,
timeout: float = 10.0,
max_pool: int = 4,
headers: dict[str, str] | None = None,
) -> None:
self._url = url
self._timeout = timeout
try:
import urllib.request
self._pool: list[urllib.request.OpenerDirector] = []
for _ in range(max_pool):
opener = urllib.request.OpenerDirector()
opener.add_handler(urllib.request.HTTPHandler())
opener.add_handler(urllib.request.HTTPSHandler())
self._pool.append(opener)
self._lock = threading.Lock()
self._pool_index = 0
except Exception as exc:
logger.warning("HttpSink pool init failed, falling back to default opener: %s", exc)
import urllib.request
self._pool = [urllib.request.OpenerDirector()]
self._lock = threading.Lock()
self._pool_index = 0
self._headers = {
"Content-Type": "application/json",
"Accept": "application/json",
**(headers or {}),
}
def _acquire_opener(self):
with self._lock:
opener = self._pool[self._pool_index % len(self._pool)]
self._pool_index += 1
return opener
def emit(self, event: Any) -> None:
import urllib.request
data = _to_jsonable(event)
payload = json.dumps(data, default=str, ensure_ascii=False).encode("utf-8")
opener = self._acquire_opener()
req = urllib.request.Request(
self._url,
data=payload,
headers=self._headers,
method="POST",
)
try:
opener.open(req, timeout=self._timeout)
except Exception as exc:
logger.warning("HttpSink POST failed to %s: %s", self._url, exc)
def close(self) -> None:
pass