* Copyright (c) 2026 Huawei Technologies Co., Ltd.
* openFuyao is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package main
import (
"flag"
"fmt"
"log/slog"
"os"
"strconv"
"strings"
"time"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
warmupv1alpha1 "github.com/openfuyao/weight-dispatcher/api/v1alpha1"
warmupjob "github.com/openfuyao/weight-dispatcher/pkg/controller/warmupjob"
"github.com/openfuyao/weight-dispatcher/pkg/logging"
"github.com/openfuyao/weight-dispatcher/pkg/node"
"github.com/openfuyao/weight-dispatcher/pkg/planning/sourceplacement"
"github.com/openfuyao/weight-dispatcher/pkg/planning/topology"
"github.com/openfuyao/weight-dispatcher/pkg/planning/transferplanner"
)
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
type controllerManagerConfig struct {
metricsAddr string
enableLeaderElection bool
probeAddr string
logLevel string
agentPort int
}
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(warmupv1alpha1.AddToScheme(scheme))
}
func main() {
config, err := parseControllerManagerConfig(flag.CommandLine, os.Args[1:])
if err != nil {
setupLog.Error(err, "parse controller manager flags")
os.Exit(1)
}
slog.SetDefault(logging.NewLogger(os.Stdout, logging.ParseLevel(config.logLevel)))
mgr, err := newControllerManager(config)
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
if err = setupWarmupController(mgr, config); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ModelWarmupJob")
os.Exit(1)
}
if err := addHealthChecks(mgr); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
func parseControllerManagerConfig(flags *flag.FlagSet, args []string) (controllerManagerConfig, error) {
var config controllerManagerConfig
flags.StringVar(&config.metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
flags.StringVar(&config.probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flags.BoolVar(&config.enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
config.logLevel = envOrDefault("LOG_LEVEL", "info")
agentPort, err := envPortOrDefault("AGENT_PORT", 18080)
if err != nil {
return controllerManagerConfig{}, err
}
config.agentPort = agentPort
opts := zap.Options{Development: true}
opts.BindFlags(flags)
if err := flags.Parse(args); err != nil {
return controllerManagerConfig{}, fmt.Errorf("parse flags: %w", err)
}
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
return config, nil
}
func newControllerManager(config controllerManagerConfig) (ctrl.Manager, error) {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{BindAddress: config.metricsAddr},
HealthProbeBindAddress: config.probeAddr,
LeaderElection: config.enableLeaderElection,
LeaderElectionID: "76e5a98f.openfuyao.com",
})
if err != nil {
return nil, fmt.Errorf("create controller manager: %w", err)
}
return mgr, nil
}
func setupWarmupController(mgr ctrl.Manager, config controllerManagerConfig) error {
reconcilerLogger := slog.Default()
dispatcher := transferplanner.NewDispatcher(
transferplanner.NewDefaultCachePathBuilder(),
topology.NewNoopAdvisor(reconcilerLogger),
sourceplacement.NewHeuristicPlanner(reconcilerLogger),
reconcilerLogger,
)
if err := (&warmupjob.Reconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Resolver: node.NewResolver(mgr.GetClient()),
Dispatcher: dispatcher,
Agent: warmupjob.NewHTTPAgentClient(config.agentPort, 10*time.Second, reconcilerLogger),
RequeueAfter: 5 * time.Second,
Logger: reconcilerLogger,
}).SetupWithManager(mgr); err != nil {
return fmt.Errorf("setup warmup controller with manager: %w", err)
}
return nil
}
func addHealthChecks(mgr ctrl.Manager) error {
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
return fmt.Errorf("add healthz check: %w", err)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
return fmt.Errorf("add readyz check: %w", err)
}
return nil
}
func envOrDefault(key, fallback string) string {
if value := os.Getenv(key); value != "" {
return value
}
return fallback
}
func envPortOrDefault(key string, fallback int) (int, error) {
raw := strings.TrimSpace(os.Getenv(key))
if raw == "" {
return fallback, nil
}
port, err := strconv.Atoi(raw)
if err != nil || port <= 0 || port > 65535 {
return 0, fmt.Errorf("%s must be a valid TCP port, got %q", key, raw)
}
return port, nil
}