"""进程内滑动窗口限流(单实例;多副本需 Redis 等外置方案)。"""

from __future__ import annotations

import threading
import time
from collections import defaultdict


class SlidingWindowLimiter:
    def __init__(self) -> None:
        self._hits: dict[str, list[float]] = defaultdict(list)
        self._lock = threading.Lock()

    def allow(self, key: str, limit: int, window_sec: int) -> bool:
        if limit <= 0:
            return True
        now = time.monotonic()
        cutoff = now - float(window_sec)
        with self._lock:
            bucket = self._hits[key]
            bucket[:] = [t for t in bucket if t > cutoff]
            if len(bucket) >= limit:
                return False
            bucket.append(now)
            return True


limiter = SlidingWindowLimiter()