* Copyright (c) 2026 Huawei Technologies Co., Ltd.
* openFuyao is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package main
import (
"log/slog"
"net/http"
"os"
"time"
"github.com/openfuyao/weight-dispatcher/pkg/agent/httpapi"
"github.com/openfuyao/weight-dispatcher/pkg/agent/service"
"github.com/openfuyao/weight-dispatcher/pkg/cache"
"github.com/openfuyao/weight-dispatcher/pkg/dataplane"
"github.com/openfuyao/weight-dispatcher/pkg/dataplane/rdma"
"github.com/openfuyao/weight-dispatcher/pkg/dataplane/rdmaffi"
"github.com/openfuyao/weight-dispatcher/pkg/logging"
)
type nodeAgentConfig struct {
listenAddr string
nodeName string
collectiveMode string
logLevel string
rdmaEnabled bool
tcpFallbackEnabled bool
}
func main() {
config := loadNodeAgentConfig()
logger := logging.NewLogger(os.Stdout, logging.ParseLevel(config.logLevel))
slog.SetDefault(logger)
stateMachine := cache.NewStateMachine(logger)
tracker := service.NewMemoryTaskTracker()
adapter := buildDataPlaneAdapter(logger, config.rdmaEnabled, config.tcpFallbackEnabled)
logDataPlaneConfiguration(logger, config)
loader := service.NewCacheLoadService(stateMachine, adapter, service.NewCRC32CVerifier(), tracker, logger)
warmupService := service.NewNodeAgentWarmupService(config.nodeName, tracker, loader, logger)
sourceService := service.NewNodeAgentSourceService(logger)
collectiveService := newCollectiveService(config.collectiveMode, logger)
handler := httpapi.NewServer(warmupService, sourceService, collectiveService, logger)
logger.Info("starting node-agent", "listenAddr", config.listenAddr, "rdmaEnabled", config.rdmaEnabled, "tcpFallbackEnabled", config.tcpFallbackEnabled)
server := &http.Server{
Addr: config.listenAddr,
Handler: handler,
ReadHeaderTimeout: 5 * time.Second,
IdleTimeout: 30 * time.Second,
}
if err := server.ListenAndServe(); err != nil {
logger.Error("start node-agent failed", "error", err)
os.Exit(1)
}
}
func loadNodeAgentConfig() nodeAgentConfig {
return nodeAgentConfig{
listenAddr: envOrDefault("LISTEN_ADDR", ":18080"),
nodeName: envOrDefault("NODE_NAME", ""),
collectiveMode: envOrDefault("COLLECTIVE_MODE", "memory"),
logLevel: envOrDefault("LOG_LEVEL", "info"),
rdmaEnabled: envOrDefault("RDMA_ENABLED", "false") == "true",
tcpFallbackEnabled: envOrDefault("TCP_FALLBACK_ENABLED", "true") == "true",
}
}
func logDataPlaneConfiguration(logger *slog.Logger, config nodeAgentConfig) {
if config.rdmaEnabled {
logger.Info(
"node-agent configured with RDMA-aware data plane",
"tcpFallbackEnabled", config.tcpFallbackEnabled,
"rdmaAvailable", rdmaffi.Available(),
"rdmaInitCode", rdmaffi.Init(),
"rdmaSelfTestCode", rdmaffi.SelfTest(),
"rdmaDirectPrepareCode", rdmaffi.PrepareDirectPull(),
"rdmaRingPrepareCode", rdmaffi.PrepareRing(2, 0),
"rdmaLastError", rdmaffi.LastError(),
"rdmaVersion", rdmaffi.Version(),
)
return
}
logger.Info(
"node-agent configured with rust tcp fallback data plane",
"rdmaEnabled", config.rdmaEnabled,
"tcpFallbackEnabled", config.tcpFallbackEnabled,
"rdmaAvailable", rdmaffi.Available(),
"rdmaInitCode", rdmaffi.Init(),
"rdmaDirectPrepareCode", rdmaffi.PrepareDirectPull(),
"rdmaLastError", rdmaffi.LastError(),
"rdmaVersion", rdmaffi.Version(),
)
}
func newCollectiveService(mode string, logger *slog.Logger) service.NodeAgentCollectiveService {
switch mode {
case "memory", "":
logger.Info("collective service configured", "mode", mode)
return service.NewMemoryCollectiveService(logger)
default:
logger.Warn("unknown COLLECTIVE_MODE, falling back to memory", "mode", mode)
logger.Info("collective service configured", "mode", mode)
return service.NewMemoryCollectiveService(logger)
}
}
func buildDataPlaneAdapter(logger *slog.Logger, rdmaEnabled, tcpFallbackEnabled bool) dataplane.Adapter {
if rdmaEnabled {
return rdma.NewAdapterWithOptions(rdma.NewHTTPChunkClient(nil), logger, rdma.AdapterOptions{
EnableFFI: true,
ForceTCPFallback: false,
})
}
if tcpFallbackEnabled {
return rdma.NewAdapterWithOptions(rdma.NewHTTPChunkClient(nil), logger, rdma.AdapterOptions{
EnableFFI: true,
ForceTCPFallback: true,
})
}
return rdma.NewAdapterWithOptions(rdma.NewHTTPChunkClient(nil), logger, rdma.AdapterOptions{
EnableFFI: false,
ForceTCPFallback: true,
})
}
func envOrDefault(key, fallback string) string {
if value := os.Getenv(key); value != "" {
return value
}
return fallback
}