异步调用实现方案

整体架构

客户端                          Frontend                           Runtime
  |                               |                                  |
  |  POST /invocations            |                                  |
  |  X-Invoke-Type: async        |                                  |
  |  X-Webhook-Url: ...          |                                  |
  |------------------------------>|                                  |
  |                               |  1. 获取 worker slot             |
  |                               |  2. 生成 requestID               |
  |                               |  3. 存入 StorageBackend(pending) |
  |                               |  4. 启动后台 goroutine          |
  |  HTTP 202 {requestId: "xxx"} |                                  |
  |<------------------------------|                                  |
  |                               |  5. goroutine: status -> running |
  |                               |--------------------------------->|
  |                               |          实际函数调用              |
  |                               |<---------------------------------|
  |                               |  6. 存储结果(completed/failed)   |
  |                               |  7. 发送 Webhook 回调            |
  |                               |                                  |
  |  GET /async-results/{id}     |                                  |
  |------------------------------>|                                  |
  |  HTTP 200 {status, result}   |                                  |
  |<------------------------------|                                  |

核心模块

1. 配置模块 (config.go)

  • AsyncConfig: 异步调用主配置
  • WebhookConfig: Webhook 配置(启用、超时、重试)
  • StorageConfig: 存储配置(类型、Redis 连接信息)
  • 默认配置:
    • 最大并发数: 1000
    • 结果保留时间: 60 分钟
    • 清理间隔: 5 分钟

2. 存储模块 (storage.go)

  • StorageBackend 接口: 统一的存储抽象
  • MemoryBackend: 内存存储实现(默认)
  • RedisBackend: Redis 分布式存储实现
  • 自动降级: Redis 连接失败时自动降级到内存存储
  • TTL 支持: 自动管理结果过期

3. 并发控制 (worker.go)

  • WorkerPool: 基于 channel 的信号量实现
  • Acquire/Release: 获取和释放并发槽位
  • 可配置: 最大并发数可配置

4. Webhook 回调 (webhook.go)

  • WebhookPayload: 回调内容结构
  • 指数退避重试: 1s → 2s → 4s
  • 失败处理: 记录日志但不阻塞主流程

5. 监控指标 (metrics.go)

  • async_invocation_total: 异步调用总数(按状态、函数名)
  • async_invocation_duration_seconds: 异步调用耗时直方图
  • async_invocation_concurrent: 当前并发数
  • async_webhook_total: Webhook 发送次数

6. 异步处理器 (api/v1/async_invoke.go)

  • AsyncInvokeHandler:

    • 获取 worker slot 限制并发
    • 记录并发 gauge
    • 生成 requestID,创建 pending 状态
    • 后台 goroutine 执行调用
    • 调用完成后发送 Webhook(如果配置了)
  • GetAsyncResultHandler:

    • 从 StorageBackend 加载结果
    • 返回对应状态的信息

API 使用方式

发起异步调用

curl -X POST /serverless/v1/functions/{urn}/invocations \
  -H "X-Invoke-Type: async" \
  -H "X-Webhook-Url: https://example.com/callback" \
  -d '{"key": "value"}'
# 返回: HTTP 202 {"requestId": "abc-123"}

查询结果

curl -X GET /serverless/v1/functions/async-results/abc-123
# pending:    {"requestId": "abc-123", "status": "pending"}
# running:    {"requestId": "abc-123", "status": "running"}
# completed:  {"requestId": "abc-123", "status": "completed", "statusCode": 200, "respBody": ...}
# not found:  HTTP 404 {"error": "async result not found"}

Webhook 回调格式

{
  "requestId": "550e8400-e29b-41d4-a716-446655440000",
  "status": "completed",
  "statusCode": 200,
  "result": "eyJrZXkiOiJ2YWx1ZSJ9",
  "completedAt": "2026-03-05T12:00:00Z"
}

配置示例

asyncInvocation:
  enabled: true
  maxConcurrent: 1000
  resultRetentionMinutes: 60
  cleanupIntervalMinutes: 5
  webhook:
    enabled: true
    timeoutSecond: 10
    retry:
      maxAttempts: 3
      initialDelayMs: 1000
  storage:
    type: redis
    redis:
      addr: redis:6379
      password: ""
      db: 0

涉及文件

文件 描述
asyncinvocation/store.go AsyncResult 定义、内存存储实现
asyncinvocation/storage.go StorageBackend 接口、Redis 后端
asyncinvocation/config.go 配置结构体
asyncinvocation/worker.go 并发控制
asyncinvocation/webhook.go Webhook 回调
asyncinvocation/metrics.go Prometheus 指标
api/v1/async_invoke.go HTTP 处理器
api/v1/invoke.go 入口分发
api/api.go 路由注册

已完成特性

  • 分布式存储: 支持 Redis,多实例共享结果
  • Webhook 回调: 支持指数退避重试
  • 并发限制: WorkerPool 限制并发数
  • 配置化管理: YAML 配置
  • 监控指标: Prometheus 指标

待完成

  • 主配置集成: 从主配置加载 AsyncConfig
  • 集成测试: 多实例场景验证