"""进程内滑动窗口限流(单实例;多副本需 Redis 等外置方案)。"""
from __future__ import annotations
import threading
import time
from collections import defaultdict
class SlidingWindowLimiter:
def __init__(self) -> None:
self._hits: dict[str, list[float]] = defaultdict(list)
self._lock = threading.Lock()
def allow(self, key: str, limit: int, window_sec: int) -> bool:
if limit <= 0:
return True
now = time.monotonic()
cutoff = now - float(window_sec)
with self._lock:
bucket = self._hits[key]
bucket[:] = [t for t in bucket if t > cutoff]
if len(bucket) >= limit:
return False
bucket.append(now)
return True
limiter = SlidingWindowLimiter()