/*
 * Copyright (c) 2026 Huawei Technologies Co., Ltd.
 * openFuyao is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

// Package main starts the node-agent HTTP service.
package main

import (
	"log/slog"
	"net/http"
	"os"
	"time"

	"github.com/openfuyao/weight-dispatcher/pkg/agent/httpapi"
	"github.com/openfuyao/weight-dispatcher/pkg/agent/service"
	"github.com/openfuyao/weight-dispatcher/pkg/cache"
	"github.com/openfuyao/weight-dispatcher/pkg/dataplane"
	"github.com/openfuyao/weight-dispatcher/pkg/dataplane/rdma"
	"github.com/openfuyao/weight-dispatcher/pkg/dataplane/rdmaffi"
	"github.com/openfuyao/weight-dispatcher/pkg/logging"
)

// nodeAgentConfig stores the environment-driven node-agent settings.
type nodeAgentConfig struct {
	listenAddr         string
	nodeName           string
	collectiveMode     string
	logLevel           string
	rdmaEnabled        bool
	tcpFallbackEnabled bool
}

// main wires the node-agent services and starts the HTTP server.
func main() {
	config := loadNodeAgentConfig()
	logger := logging.NewLogger(os.Stdout, logging.ParseLevel(config.logLevel))
	slog.SetDefault(logger)

	stateMachine := cache.NewStateMachine(logger)
	tracker := service.NewMemoryTaskTracker()
	adapter := buildDataPlaneAdapter(logger, config.rdmaEnabled, config.tcpFallbackEnabled)
	logDataPlaneConfiguration(logger, config)
	loader := service.NewCacheLoadService(stateMachine, adapter, service.NewCRC32CVerifier(), tracker, logger)
	warmupService := service.NewNodeAgentWarmupService(config.nodeName, tracker, loader, logger)
	sourceService := service.NewNodeAgentSourceService(logger)
	collectiveService := newCollectiveService(config.collectiveMode, logger)
	handler := httpapi.NewServer(warmupService, sourceService, collectiveService, logger)

	logger.Info("starting node-agent", "listenAddr", config.listenAddr, "rdmaEnabled", config.rdmaEnabled, "tcpFallbackEnabled", config.tcpFallbackEnabled)
	server := &http.Server{
		Addr:              config.listenAddr,
		Handler:           handler,
		ReadHeaderTimeout: 5 * time.Second,
		IdleTimeout:       30 * time.Second,
	}
	if err := server.ListenAndServe(); err != nil {
		logger.Error("start node-agent failed", "error", err)
		os.Exit(1)
	}
}

// loadNodeAgentConfig loads one node-agent process config from environment variables.
func loadNodeAgentConfig() nodeAgentConfig {
	return nodeAgentConfig{
		listenAddr:         envOrDefault("LISTEN_ADDR", ":18080"),
		nodeName:           envOrDefault("NODE_NAME", ""),
		collectiveMode:     envOrDefault("COLLECTIVE_MODE", "memory"),
		logLevel:           envOrDefault("LOG_LEVEL", "info"),
		rdmaEnabled:        envOrDefault("RDMA_ENABLED", "false") == "true",
		tcpFallbackEnabled: envOrDefault("TCP_FALLBACK_ENABLED", "true") == "true",
	}
}

// logDataPlaneConfiguration reports the selected transport mode and RDMA probe results.
func logDataPlaneConfiguration(logger *slog.Logger, config nodeAgentConfig) {
	if config.rdmaEnabled {
		// PrepareRing self-test: current Rust implementation supports world_size==2 only.
		logger.Info(
			"node-agent configured with RDMA-aware data plane",
			"tcpFallbackEnabled", config.tcpFallbackEnabled,
			"rdmaAvailable", rdmaffi.Available(),
			"rdmaInitCode", rdmaffi.Init(),
			"rdmaSelfTestCode", rdmaffi.SelfTest(),
			"rdmaDirectPrepareCode", rdmaffi.PrepareDirectPull(),
			"rdmaRingPrepareCode", rdmaffi.PrepareRing(2, 0),
			"rdmaLastError", rdmaffi.LastError(),
			"rdmaVersion", rdmaffi.Version(),
		)
		return
	}

	logger.Info(
		"node-agent configured with rust tcp fallback data plane",
		"rdmaEnabled", config.rdmaEnabled,
		"tcpFallbackEnabled", config.tcpFallbackEnabled,
		"rdmaAvailable", rdmaffi.Available(),
		"rdmaInitCode", rdmaffi.Init(),
		"rdmaDirectPrepareCode", rdmaffi.PrepareDirectPull(),
		"rdmaLastError", rdmaffi.LastError(),
		"rdmaVersion", rdmaffi.Version(),
	)
}

// newCollectiveService creates the configured collective service implementation.
func newCollectiveService(mode string, logger *slog.Logger) service.NodeAgentCollectiveService {
	switch mode {
	case "memory", "":
		logger.Info("collective service configured", "mode", mode)
		return service.NewMemoryCollectiveService(logger)
	default:
		logger.Warn("unknown COLLECTIVE_MODE, falling back to memory", "mode", mode)
		logger.Info("collective service configured", "mode", mode)
		return service.NewMemoryCollectiveService(logger)
	}
}

// buildDataPlaneAdapter creates the node-agent data-plane adapter from transport flags.
func buildDataPlaneAdapter(logger *slog.Logger, rdmaEnabled, tcpFallbackEnabled bool) dataplane.Adapter {
	if rdmaEnabled {
		return rdma.NewAdapterWithOptions(rdma.NewHTTPChunkClient(nil), logger, rdma.AdapterOptions{
			EnableFFI:        true,
			ForceTCPFallback: false,
		})
	}
	if tcpFallbackEnabled {
		// Keep the Rust direct-pull engine enabled even when RDMA is disabled so
		// TCP transport can reuse the validated bulk data path instead of the
		// much slower Go JSON readat fallback.
		return rdma.NewAdapterWithOptions(rdma.NewHTTPChunkClient(nil), logger, rdma.AdapterOptions{
			EnableFFI:        true,
			ForceTCPFallback: true,
		})
	}
	return rdma.NewAdapterWithOptions(rdma.NewHTTPChunkClient(nil), logger, rdma.AdapterOptions{
		EnableFFI:        false,
		ForceTCPFallback: true,
	})
}

// envOrDefault returns one environment variable or its fallback when empty.
func envOrDefault(key, fallback string) string {
	if value := os.Getenv(key); value != "" {
		return value
	}
	return fallback
}