面向智能路由的高实时性指标采集与上报
面向对时效性要求高的模块(如智能路由),通过NATS实现秒级指标推送,确保推理过程中的关键性能指标(如等待执行的推理请求数、NPU/GPU KV Cache利用率等)能够被及时感知并影响决策。
目前文档只支持vLLM部署,未来会扩展到支持其他推理引擎和服务。
前置条件
- Kubernetes集群已部署并可正常访问
- 已安装
kubectl命令行工具并配置好集群访问权限 - 节点上已安装Ascend驱动和相关依赖
- NATS已部署并正常运行
部署步骤
选用NATS发布/订阅模型作为指标传输方式。CustomStatLogger是在vLLM中通过继承StatLoggerBase抽象类实现的自定义统计数据记录器。作为发布者,CustomStatLogger会在每次解码批次结束后,主动生成结构化的运行指标消息,并将其发布到指定的NATS主题;Router作为订阅者,只需在该主题上注册监听,即可在消息发布的瞬间接收并触发后续处理逻辑。
为避免首批消息丢失,订阅端(Router)需先于发布端(CustomStatLogger)完成连接与订阅。
Router (订阅端)
Router作为订阅者,主要负责接收发布的运行指标消息并触发相应的处理逻辑。参考以下代码示例来实现Router,并将其部署到Kubernetes集群中
import logging
import json
from nats.aio.client import Client as NATS_Client
logger = logging.getLogger()
class RouterSubscriber:
def __init__(self, server_address: str, topic: str):
"""
初始化RouterSubscriber实例
:param server_address: NATS服务器地址
:param topic: 订阅的主题
"""
self.server_address = server_address
self.topic = topic
self.client = NATS_Client()
async def connect(self):
"""
建立与NATS服务器的连接
"""
try:
# 建立NATS长连接
await self.client.connect(self.server_address)
logger.info("Connected to NATS server at %s", self.server_address)
except Exception as e:
logger.error("Failed to connect to NATS server: %s", e)
raise e
async def subscribe(self, topic: str, message_handler):
"""
订阅指定主题,并将接收到的消息交给业务逻辑处理
:param message_handler: 业务逻辑处理函数,用于处理接收到的消息
"""
try:
async def on_message(msg):
"""
消息回调函数,处理订阅到的消息
:param msg: NATS消息
"""
logger.info("Received message on topic %s", topic)
await message_handler(msg)
# 确保NATS连接
if not self.client.is_connected:
await self.connect()
# 注册订阅主题,并绑定消息回调函数
await self.client.subscribe(topic, cb=on_message)
logger.info("Successfully subscribed to NATS topic %s", topic)
except Exception as e:
logger.error("Failed to subscribe to NATS topic %s: %s", topic, e)
raise e
async def message_handler(self, msg) -> None:
"""
NATS原始回调,负责:
1. 从msg中解码数据;
2. 调用parse_message做JSON反序列化;
3. 将解析后的结构化数据交给后续业务逻辑处理。
"""
try:
data = msg.data.decode("utf-8")
except Exception as exc:
logger.error("Failed to decode NATS message: %s", exc)
return
data_dict = self.parse_message(data)
if not data_dict:
# 解析失败直接返回,避免后续逻辑异常
return
# TODO: 在这里处理具体业务逻辑,例如根据指标做路由决策
...
def parse_message(self, data: str) -> Dict[str, Any]:
"""
将NATS消息体从JSON字符串反序列化为字典。
解析失败时记录日志并返回空字典。
"""
try:
return json.loads(data)
except Exception as exc:
logger.error("Failed to parse message: %s, raw: %r", exc, data)
return {}
Customstatlogger (发布端)
CustomStatLogger是发布者,它负责生成并发布实时的运行指标数据。每次解码批次结束后,CustomStatLogger将结构化的指标消息发布到指定的NATS主题(该主题应与订阅方的主题匹配,当前主题eagle_eye.routing_metrics),供Router订阅。
1. 打包源代码
在项目根目录下执行以下命令,将源代码打包为压缩文件:
tar -czf source_code.tar.gz src/ requirements.txt
2. 创建ConfigMap
将压缩包作为二进制文件导入到ConfigMap中:
# 检查是否存在namespace,如果不存在则创建它
kubectl get namespace ai-inference || kubectl create namespace ai-inference
# 如果已存在同名ConfigMap,先删除
kubectl delete configmap vllm-source-code -n ai-inference --ignore-not-found
# 创建新的ConfigMap
kubectl create configmap vllm-source-code \
--from-file=source_code.tar.gz=source_code.tar.gz \
-n ai-inference
3. 部署服务
应用部署YAML文件:
kubectl apply -f ./docs/routing-metrics/vllm_eagle_eye.yaml
4. 检查部署状态
# 查看Pod运行状态
kubectl get pods -n ai-inference -l app=vllm-eagle-eye
# 查看Pod详细信息
kubectl describe pod -n ai-inference -l app=vllm-eagle-eye
# 查看Pod日志
kubectl logs -n ai-inference -l app=vllm-eagle-eye -f
验证步骤
方式一:使用临时Pod验证
1. 获取服务Pod IP
kubectl get pod -n ai-inference -l app=vllm-eagle-eye -o wide
记录输出中的IP地址(例如:10.244.1.5)。
2. 启动临时测试 Pod
kubectl run curl-test --image=curlimages/curl -n ai-inference -it --rm --restart=Never -- sh
3. 在临时Pod中发送测试请求
在临时Pod的shell中执行(将<POD_IP>替换为实际IP):
curl -X POST http://<POD_IP>:8000/generate \
-H "Content-Type: application/json" \
-d '{"prompt": "Hello AI"}'
方式二:使用port-forward验证
# 端口转发
kubectl port-forward -n ai-inference deployment/vllm-eagle-eye 8000:8000
# 在另一个终端发送请求
curl -X POST http://localhost:8000/generate \
-H "Content-Type: application/json" \
-d '{"prompt": "Hello AI"}'
清理资源
# 删除Deployment
kubectl delete -f ./docs/routing-metrics/vllm_eagle_eye.yaml
# 删除ConfigMap
kubectl delete configmap vllm-source-code -n ai-inference
# 删除临时Pod
kubectl delete pod -n ai-inference curl-test
常见问题
Pod启动失败
# 查看Pod事件
kubectl describe pod -n ai-inference -l app=vllm-eagle-eye
# 查看init容器日志
kubectl logs -n ai-inference <pod-name> -c code-extractor
依赖安装失败
如果集群无法联网,需要提前将依赖包打入压缩包:
# 下载依赖到本地
pip download -r requirements.txt -d packages/
# 打包时包含依赖
tar -czf source_code.tar.gz src/ requirements.txt packages/