import queue
import threading
from dataclasses import dataclass
from motor.common.resources import Instance, ReadOnlyInstance, InsEventMsg, EventType
from motor.common.logger import get_logger
from motor.config.controller import ControllerConfig
from motor.controller.api_client.coordinator_api_client import CoordinatorApiClient
from motor.controller.core import Observer, ObserverEvent
logger = get_logger(__name__)
@dataclass
class Event:
event_type: EventType
instance: Instance | None
class EventPusher(Observer):
def __init__(self, config: ControllerConfig | None = None) -> None:
super().__init__()
if config is None:
self.config = ControllerConfig()
else:
self.config = config
self.is_coordinator_reset = False
self.is_first_heartbeat_success = False
self.event_queue = queue.Queue()
self.instances: dict[str, Instance] = {}
self.lock = threading.Lock()
self.config_lock = threading.RLock()
self.stop_event = threading.Event()
self.work_condition = threading.Condition()
with self.config_lock:
self.coordinator_heartbeat_interval = config.event_config.coordinator_heartbeat_interval
self.event_consumer_thread = None
self.heartbeat_detector_thread = None
logger.info("EventPusher initialized.")
def start(self) -> None:
"""Start the event pusher threads"""
if self.stop_event.is_set():
self.stop_event.clear()
self.event_consumer_thread = threading.Thread(target=self._event_consumer, daemon=True, name="EventConsumer")
self.heartbeat_detector_thread = threading.Thread(
target=self._coordinator_heartbeat_detector, daemon=True, name="HeartbeatDetector"
)
self.event_consumer_thread.start()
self.heartbeat_detector_thread.start()
logger.info("EventPusher started.")
def stop(self) -> None:
self.stop_event.set()
with self.work_condition:
self.work_condition.notify_all()
if (
hasattr(self, 'event_consumer_thread')
and self.event_consumer_thread is not None
and self.event_consumer_thread.is_alive()
):
self.event_consumer_thread.join()
if (
hasattr(self, 'heartbeat_detector_thread')
and self.heartbeat_detector_thread is not None
and self.heartbeat_detector_thread.is_alive()
):
self.heartbeat_detector_thread.join()
if hasattr(self, 'heart_client'):
self.heart_client.close()
logger.info("EventPusher stopped.")
def is_alive(self) -> bool:
"""Check if the event_pusher threads are alive"""
return (
self.event_consumer_thread is not None
and self.event_consumer_thread.is_alive()
and self.heartbeat_detector_thread is not None
and self.heartbeat_detector_thread.is_alive()
)
def update_config(self, config: ControllerConfig) -> None:
"""Update configuration for the event pusher"""
with self.config_lock:
self.coordinator_heartbeat_interval = config.event_config.coordinator_heartbeat_interval
def update(self, instance: ReadOnlyInstance, event: ObserverEvent) -> None:
if event == ObserverEvent.INSTANCE_READY:
with self.lock:
self.instances[instance.job_name] = instance
event = Event(EventType.ADD, instance.to_instance())
logger.info("Instance ready: %s", instance.job_name)
elif event == ObserverEvent.INSTANCE_SEPERATED:
with self.lock:
if instance.job_name in self.instances:
del self.instances[instance.job_name]
else:
return
event = Event(EventType.DEL, instance.to_instance())
logger.info("Instance removed: %s", instance.job_name)
elif event == ObserverEvent.INSTANCE_PAUSED:
with self.lock:
if instance.job_name in self.instances:
del self.instances[instance.job_name]
else:
return
event = Event(EventType.PAUSE, instance.to_instance())
logger.info("Instance paused: %s", instance.job_name)
elif event == ObserverEvent.INSTANCE_RESUMED:
with self.lock:
self.instances[instance.job_name] = instance
event = Event(EventType.RESUME, instance.to_instance())
logger.info("Instance resumed: %s", instance.job_name)
elif event == ObserverEvent.INSTANCE_REMOVED:
event = Event(EventType.DEL, instance.to_instance())
logger.info("Instance removed: %s", instance.job_name)
else:
return
self.event_queue.put(event)
def push_event(self, event_type: EventType) -> None:
event = Event(event_type, None)
self.event_queue.put(event)
logger.info("Pushed event: %s", event_type)
def _event_consumer(self) -> None:
while not self.stop_event.is_set():
try:
event = self.event_queue.get(timeout=1.0)
except queue.Empty:
continue
if event is not None:
event_type = event.event_type
if event_type == EventType.ADD:
event_msg = InsEventMsg(event=event_type, instances=[event.instance])
elif event_type == EventType.DEL:
event_msg = InsEventMsg(event=event_type, instances=[event.instance])
elif event_type == EventType.PAUSE:
event_msg = InsEventMsg(event=event_type, instances=[event.instance])
elif event_type == EventType.RESUME:
event_msg = InsEventMsg(event=event_type, instances=[event.instance])
elif event_type == EventType.SET:
with self.lock:
instances = list(self.instances.values())
has_prefill = any(inst.role == "prefill" for inst in instances)
if has_prefill:
event_msg = InsEventMsg(
event=event_type, instances=[instance.to_instance() for instance in instances]
)
else:
logger.debug(
"SET event skipped: requires at least one prefill "
"instance, current instances: prefill=%s",
has_prefill,
)
event_msg = None
else:
logger.error("Unknown event type: %s", event_type)
continue
if event_msg is not None:
try:
CoordinatorApiClient.send_instance_refresh(event_msg)
except Exception as e:
logger.error("Failed to send instance refresh event, error: %s", e)
def _coordinator_heartbeat_detector(self) -> None:
"""
Detect Coordinator heartbeat, when Coordinator need Controller sent all
instances resource, this function will produce a SET event.
"""
hb_loss_cnt = 0
log_counter = 0
log_interval = 12
not_ready_log_counter = 0
while not self.stop_event.is_set():
try:
params = {"status": "normal"}
response = CoordinatorApiClient.query_status(params)
if not self.is_first_heartbeat_success:
self.is_first_heartbeat_success = True
logger.info("Coordinator heartbeat established successfully.")
log_counter = 0
not_ready_log_counter = 0
if response is None or response.get("ready") is None or not response.get("ready"):
not_ready_log_counter += 1
if not_ready_log_counter >= log_interval:
logger.info("Coordinator is alive but is not ready.")
not_ready_log_counter = 0
self.is_coordinator_reset = True
if self.is_coordinator_reset:
event = Event(EventType.SET, None)
self.event_queue.put(event)
self.is_coordinator_reset = False
hb_loss_cnt = 0
logger.debug("Controller will reset coordinator instance info.")
except Exception as e:
if self.is_first_heartbeat_success:
hb_loss_cnt += 1
if hb_loss_cnt >= 2:
self.is_coordinator_reset = True
logger.warning("Coordinator heartbeat lost. Possible restart detected.")
hb_loss_cnt = 0
log_counter += 1
if log_counter >= log_interval:
logger.warning("Send Coordinator heartbeat failed, Exception occurred %s", e)
log_counter = 0
else:
log_counter += 1
if log_counter >= log_interval:
logger.info("Coordinator not yet available, waiting for first successful heartbeat.")
log_counter = 0
with self.config_lock:
heartbeat_interval = self.coordinator_heartbeat_interval
with self.work_condition:
self.work_condition.wait(timeout=heartbeat_interval)