package runtime
import (
"context"
"fmt"
"io"
"log"
"strconv"
"strings"
"time"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
)
type DockerRuntime struct {
client *client.Client
}
func NewDockerRuntime(cfg map[string]string) (*DockerRuntime, error) {
opts := []client.Opt{client.FromEnv, client.WithAPIVersionNegotiation()}
if host, ok := cfg["docker_host"]; ok && host != "" {
opts = append(opts, client.WithHost(host))
}
cli, err := client.NewClientWithOpts(opts...)
if err != nil {
return nil, fmt.Errorf("创建 Docker 客户端失败: %w", err)
}
return &DockerRuntime{client: cli}, nil
}
func (d *DockerRuntime) Name() string {
return "docker"
}
func (d *DockerRuntime) Create(ctx context.Context, cfg *CreateConfig) (string, error) {
imageName := cfg.Sandbox
if cfg.Rootfs.Type == RootfsSrcImage && cfg.Rootfs.ImageURL != "" {
imageName = cfg.Rootfs.ImageURL
}
if imageName == "" {
return "", fmt.Errorf("未指定容器镜像(sandbox 和 imageUrl 均为空)")
}
_, _, err := d.client.ImageInspectWithRaw(ctx, imageName)
if err != nil {
log.Printf("[docker] 本地镜像 %s 不存在,开始拉取", imageName)
pullOut, pullErr := d.client.ImagePull(ctx, imageName, image.PullOptions{})
if pullErr != nil {
return "", fmt.Errorf("拉取镜像 %s 失败: %w", imageName, pullErr)
}
_, _ = io.Copy(io.Discard, pullOut)
pullOut.Close()
log.Printf("[docker] 镜像 %s 拉取完成", imageName)
} else {
log.Printf("[docker] 使用本地镜像 %s", imageName)
}
var shellCmd string
if len(cfg.Command) > 0 {
shellCmd = strings.Join(cfg.Command, " ")
}
var cmd []string
if shellCmd != "" {
cmd = []string{"/bin/sh", "-c", shellCmd}
}
envList := make([]string, 0, len(cfg.Envs))
for k, v := range cfg.Envs {
envList = append(envList, k+"="+v)
}
containerCfg := &container.Config{
Image: imageName,
Cmd: cmd,
Env: envList,
}
networkMode := container.NetworkMode(cfg.Network)
if networkMode == "" {
networkMode = "host"
}
hostCfg := &container.HostConfig{
Resources: container.Resources{
NanoCPUs: int64(cfg.CPUMillicore * 1e6),
Memory: int64(cfg.MemoryMB * 1024 * 1024),
},
Mounts: convertMounts(cfg.Mounts),
NetworkMode: networkMode,
}
if len(cfg.Ports) > 0 {
exposedPorts, portBindings, err := parsePortMappings(cfg.Ports)
if err != nil {
return "", err
}
if networkMode == "host" {
log.Printf("[docker] network=host 时忽略端口映射: %v", cfg.Ports)
} else {
containerCfg.ExposedPorts = exposedPorts
hostCfg.PortBindings = portBindings
}
}
containerName := fmt.Sprintf("rl-%s-%d", cfg.ID, time.Now().UnixNano()%100000)
resp, err := d.client.ContainerCreate(ctx, containerCfg, hostCfg, nil, nil, containerName)
if err != nil {
return "", fmt.Errorf("创建容器失败: %w", err)
}
if err := d.client.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil {
_ = d.client.ContainerRemove(ctx, resp.ID, container.RemoveOptions{Force: true})
return "", fmt.Errorf("启动容器失败: %w", err)
}
log.Printf("[docker] 容器已启动: id=%s, image=%s, network=%s", resp.ID[:12], imageName, networkMode)
return resp.ID, nil
}
func parsePortMappings(mappings []string) (nat.PortSet, nat.PortMap, error) {
exposedPorts := nat.PortSet{}
portBindings := nat.PortMap{}
for _, raw := range mappings {
parts := strings.Split(raw, ":")
if len(parts) != 3 {
return nil, nil, fmt.Errorf("无效端口映射 %q,期望格式 protocol:hostPort:containerPort", raw)
}
protocol := strings.ToLower(strings.TrimSpace(parts[0]))
hostPort := strings.TrimSpace(parts[1])
containerPort := strings.TrimSpace(parts[2])
if protocol != "tcp" && protocol != "udp" && protocol != "sctp" {
return nil, nil, fmt.Errorf("无效端口映射 %q,protocol 仅支持 tcp/udp/sctp", raw)
}
if err := validatePort(hostPort); err != nil {
return nil, nil, fmt.Errorf("无效端口映射 %q,hostPort %v", raw, err)
}
if err := validatePort(containerPort); err != nil {
return nil, nil, fmt.Errorf("无效端口映射 %q,containerPort %v", raw, err)
}
portKey := nat.Port(fmt.Sprintf("%s/%s", containerPort, protocol))
exposedPorts[portKey] = struct{}{}
portBindings[portKey] = append(portBindings[portKey], nat.PortBinding{HostPort: hostPort})
}
return exposedPorts, portBindings, nil
}
func validatePort(port string) error {
p, err := strconv.Atoi(port)
if err != nil {
return fmt.Errorf("不是合法整数端口: %s", port)
}
if p < 1 || p > 65535 {
return fmt.Errorf("超出范围(1-65535): %d", p)
}
return nil
}
func (d *DockerRuntime) Wait(ctx context.Context, containerID string) (*ContainerStatus, error) {
resultCh, errCh := d.client.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
select {
case result := <-resultCh:
msg := ""
if result.Error != nil {
msg = result.Error.Message
}
return &ContainerStatus{
StatusCode: int32(result.StatusCode),
ExitCode: int32(result.StatusCode),
Message: msg,
}, nil
case err := <-errCh:
return nil, fmt.Errorf("等待容器退出失败: %w", err)
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (d *DockerRuntime) Delete(ctx context.Context, containerID string, timeoutSeconds int64) error {
if timeoutSeconds > 0 {
timeout := int(timeoutSeconds)
stopOpts := container.StopOptions{Timeout: &timeout}
if err := d.client.ContainerStop(ctx, containerID, stopOpts); err != nil {
log.Printf("[docker] 优雅停止容器 %s 失败: %v,尝试强制删除", containerID[:12], err)
}
}
err := d.client.ContainerRemove(ctx, containerID, container.RemoveOptions{Force: true})
if err != nil {
return fmt.Errorf("删除容器 %s 失败: %w", containerID[:12], err)
}
log.Printf("[docker] 容器已删除: %s", containerID[:12])
return nil
}
func (d *DockerRuntime) Close() error {
return d.client.Close()
}
func convertMounts(mounts []MountConfig) []mount.Mount {
result := make([]mount.Mount, 0, len(mounts))
for _, m := range mounts {
dm := mount.Mount{
Source: m.Source,
Target: m.Target,
}
switch strings.ToLower(m.Type) {
case "bind":
dm.Type = mount.TypeBind
case "volume":
dm.Type = mount.TypeVolume
case "tmpfs":
dm.Type = mount.TypeTmpfs
default:
dm.Type = mount.TypeBind
}
if m.Type == "erofs" {
dm.Type = mount.TypeBind
dm.ReadOnly = true
}
result = append(result, dm)
}
return result
}