/*
 * Copyright (c) 2026 Huawei Technologies Co., Ltd.
 * openFuyao is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package cache

import (
	"context"
	"errors"
	"fmt"
	"io"
	"log/slog"
	"os"
	"path/filepath"
	"strings"
	"sync"

	"github.com/openfuyao/weight-dispatcher/pkg/internal/errutil"
)

// Phase describes the lifecycle state of one cache entry.
type Phase string

const (
	// PhaseEmpty indicates no cache entry has been created yet.
	PhaseEmpty Phase = "EMPTY"
	// PhaseLoading indicates one loader currently owns the staging path.
	PhaseLoading Phase = "LOADING"
	// PhaseReady indicates the artifact is published at the ready path.
	PhaseReady Phase = "READY"
	// PhaseInvalid indicates the load failed and the staging path was cleaned.
	PhaseInvalid Phase = "INVALID"
)

// AcquireResult reports whether the caller owns or joined one cache load.
type AcquireResult struct {
	Phase       Phase
	Shared      bool
	StagingPath string
	ReadyPath   string
}

type entry struct {
	phase   Phase
	err     error
	waitCh  chan struct{}
	closing sync.Once
}

type cachePaths struct {
	key     string
	ready   string
	staging string
}

const (
	internalDirPerm = 0o750
)

// StateMachine coordinates staging and ready paths for one cache root.
type StateMachine struct {
	logger *slog.Logger

	mu      sync.Mutex
	entries map[string]*entry
}

// NewStateMachine creates a cache state machine that coordinates explicit target paths.
func NewStateMachine(logger *slog.Logger) *StateMachine {
	if logger == nil {
		logger = slog.Default()
	}
	return &StateMachine{
		logger:  logger,
		entries: map[string]*entry{},
	}
}

// AcquireOrJoinTarget acquires or joins a load that publishes to an explicit target path.
func (s *StateMachine) AcquireOrJoinTarget(artifactKey, readyPath string) (AcquireResult, error) {
	paths, err := targetCachePaths(artifactKey, readyPath)
	if err != nil {
		return AcquireResult{}, err
	}
	return s.acquireOrJoinPaths(paths, artifactKey)
}

func (s *StateMachine) acquireOrJoinPaths(paths cachePaths, artifactKey string) (AcquireResult, error) {
	if dirExists(paths.ready) || fileExists(paths.ready) {
		s.logger.Debug("ready cache hit", "artifactKey", artifactKey, "readyPath", paths.ready)
		return AcquireResult{Phase: PhaseReady, ReadyPath: paths.ready}, nil
	}

	s.mu.Lock()
	defer s.mu.Unlock()

	if dirExists(paths.ready) || fileExists(paths.ready) {
		s.logger.Debug("ready cache hit after lease check", "artifactKey", artifactKey, "readyPath", paths.ready)
		return AcquireResult{Phase: PhaseReady, ReadyPath: paths.ready}, nil
	}

	if current, ok := s.entries[paths.key]; ok && current.phase == PhaseLoading {
		s.logger.Debug("join existing cache load", "artifactKey", artifactKey, "stagingPath", paths.staging)
		return AcquireResult{
			Phase:     PhaseLoading,
			Shared:    true,
			ReadyPath: paths.ready,
		}, nil
	}

	if err := os.MkdirAll(filepath.Dir(paths.staging), internalDirPerm); err != nil {
		s.logger.Error("create staging parent failed", "artifactKey", artifactKey, "stagingPath", paths.staging, "error", err)
		return AcquireResult{}, fmt.Errorf("create staging parent: %w", err)
	}
	if err := os.RemoveAll(paths.staging); err != nil {
		s.logger.Error("reset staging path failed", "artifactKey", artifactKey, "stagingPath", paths.staging, "error", err)
		return AcquireResult{}, fmt.Errorf("reset staging path: %w", err)
	}
	if err := os.MkdirAll(paths.staging, internalDirPerm); err != nil {
		s.logger.Error("create staging path failed", "artifactKey", artifactKey, "stagingPath", paths.staging, "error", err)
		return AcquireResult{}, fmt.Errorf("create staging path: %w", err)
	}

	s.entries[paths.key] = &entry{
		phase:  PhaseLoading,
		waitCh: make(chan struct{}),
	}

	s.logger.Debug("acquired cache load lease", "artifactKey", artifactKey, "stagingPath", paths.staging, "readyPath", paths.ready)
	return AcquireResult{
		Phase:       PhaseLoading,
		Shared:      false,
		StagingPath: paths.staging,
		ReadyPath:   paths.ready,
	}, nil
}

// WaitUntilReadyTarget waits for a target-authoritative cache load to become ready.
func (s *StateMachine) WaitUntilReadyTarget(ctx context.Context, artifactKey, readyPath string) (string, error) {
	paths, err := targetCachePaths(artifactKey, readyPath)
	if err != nil {
		return "", err
	}
	return s.waitUntilReadyPaths(ctx, paths, artifactKey)
}

func (s *StateMachine) waitUntilReadyPaths(ctx context.Context, paths cachePaths, _ string) (string, error) {
	if dirExists(paths.ready) || fileExists(paths.ready) {
		return paths.ready, nil
	}

	s.mu.Lock()
	current, ok := s.entries[paths.key]
	s.mu.Unlock()
	if !ok {
		s.logger.Error("wait requested without active loading entry", "artifactKey", paths.key, "readyPath", paths.ready)
		return "", errors.New("no active loading entry found")
	}

	select {
	case <-ctx.Done():
		return "", errutil.Wrap("wait for cache ready canceled", ctx.Err())
	case <-current.waitCh:
		if current.phase == PhaseReady || dirExists(paths.ready) || fileExists(paths.ready) {
			return paths.ready, nil
		}
		if current.err != nil {
			return "", current.err
		}
		return "", errors.New("cache loading finished without ready artifact")
	}
}

// PublishReadyTarget promotes the staging path to an explicit ready path.
func (s *StateMachine) PublishReadyTarget(artifactKey, readyPath string) (string, error) {
	paths, err := targetCachePaths(artifactKey, readyPath)
	if err != nil {
		return "", err
	}
	return s.publishReadyPaths(paths, artifactKey, false)
}

// PublishReadyKeepingStagingTarget publishes to an explicit ready path and retains staging.
func (s *StateMachine) PublishReadyKeepingStagingTarget(artifactKey, readyPath string) (string, error) {
	paths, err := targetCachePaths(artifactKey, readyPath)
	if err != nil {
		return "", err
	}
	return s.publishReadyPaths(paths, artifactKey, true)
}

func (s *StateMachine) publishReadyPaths(paths cachePaths, artifactKey string, keepStaging bool) (string, error) {
	s.mu.Lock()
	current, ok := s.entries[paths.key]
	s.mu.Unlock()
	if !ok {
		s.logger.Error("publish requested without loading state", "artifactKey", artifactKey, "readyPath", paths.ready)
		return "", errors.New("publish called without loading state")
	}

	if err := os.MkdirAll(filepath.Dir(paths.ready), internalDirPerm); err != nil {
		return "", s.failLoadingEntry(paths, artifactKey, current, errutil.Wrap("create ready parent", err))
	}
	if err := os.RemoveAll(paths.ready); err != nil {
		return "", s.failLoadingEntry(paths, artifactKey, current, errutil.Wrap("reset ready path", err))
	}
	if keepStaging {
		if err := cloneTreeLinkOrCopy(paths.staging, paths.ready); err != nil {
			return "", s.failLoadingEntry(paths, artifactKey, current, errutil.Wrap("publish ready cache without removing staging", err))
		}
	} else if err := os.Rename(paths.staging, paths.ready); err != nil {
		return "", s.failLoadingEntry(paths, artifactKey, current, errutil.Wrap("publish ready cache", err))
	}

	s.mu.Lock()
	current.phase = PhaseReady
	delete(s.entries, paths.key)
	s.mu.Unlock()
	current.closing.Do(func() { close(current.waitCh) })

	if keepStaging {
		s.logger.Debug("published ready cache while keeping staging", "artifactKey", artifactKey, "readyPath", paths.ready, "stagingPath", paths.staging)
	} else {
		s.logger.Debug("published ready cache", "artifactKey", artifactKey, "readyPath", paths.ready)
	}
	return paths.ready, nil
}

// PublishFailedTarget marks a target-authoritative artifact load as failed.
func (s *StateMachine) PublishFailedTarget(artifactKey, readyPath string, cause error) error {
	paths, err := targetCachePaths(artifactKey, readyPath)
	if err != nil {
		return err
	}
	return s.publishFailedPaths(paths, artifactKey, cause)
}

func (s *StateMachine) publishFailedPaths(paths cachePaths, artifactKey string, cause error) error {
	s.mu.Lock()
	current, ok := s.entries[paths.key]
	if ok {
		current.phase = PhaseInvalid
		current.err = cause
		delete(s.entries, paths.key)
	}
	s.mu.Unlock()

	var cleanupErr error
	if err := os.RemoveAll(paths.staging); err != nil {
		cleanupErr = errutil.Wrap("cleanup staging path", err)
		s.logger.Error("cleanup staging path failed after load failure", "artifactKey", artifactKey, "stagingPath", paths.staging, "cause", cause, "error", cleanupErr)
	}
	if ok {
		current.closing.Do(func() { close(current.waitCh) })
	}
	if cleanupErr != nil {
		return cleanupErr
	}

	s.logger.Debug("cache load failed and staging was cleaned", "artifactKey", artifactKey, "error", cause)
	return nil
}

// CleanupStagingTarget removes the explicit target staging path after publish.
func (s *StateMachine) CleanupStagingTarget(artifactKey, readyPath string) error {
	paths, err := targetCachePaths(artifactKey, readyPath)
	if err != nil {
		return err
	}
	if err := os.RemoveAll(paths.staging); err != nil {
		s.logger.Error("cleanup collective staging failed", "artifactKey", artifactKey, "stagingPath", paths.staging, "error", err)
		return fmt.Errorf("cleanup staging path: %w", err)
	}
	s.logger.Debug("collective staging cleaned", "artifactKey", artifactKey, "stagingPath", paths.staging)
	return nil
}

func (s *StateMachine) failLoadingEntry(paths cachePaths, artifactKey string, current *entry, cause error) error {
	s.mu.Lock()
	current.phase = PhaseInvalid
	current.err = cause
	delete(s.entries, paths.key)
	s.mu.Unlock()
	current.closing.Do(func() { close(current.waitCh) })
	s.logger.Error("cache load publishing failed", "artifactKey", artifactKey, "readyPath", paths.ready, "stagingPath", paths.staging, "error", cause)
	return cause
}

func sanitizeKey(input string) string {
	replacer := strings.NewReplacer("/", "_", "\\", "_", ":", "_", " ", "_")
	return replacer.Replace(input)
}

func targetCachePaths(artifactKey, readyPath string) (cachePaths, error) {
	if artifactKey == "" {
		return cachePaths{}, errors.New("artifactKey is required")
	}
	if strings.TrimSpace(readyPath) == "" {
		return cachePaths{}, errors.New("target ready path is required")
	}
	cleanReady := filepath.Clean(readyPath)
	return cachePaths{
		key:     sanitizeKey(artifactKey) + "\x00" + cleanReady,
		ready:   cleanReady,
		staging: StagingPathForReadyPath(cleanReady),
	}, nil
}

// StagingPathForReadyPath returns the sibling .staging path for one ready path.
func StagingPathForReadyPath(readyPath string) string {
	cleanReady := filepath.Clean(readyPath)
	return filepath.Join(filepath.Dir(cleanReady), ".staging", filepath.Base(cleanReady))
}

func dirExists(path string) bool {
	info, err := os.Stat(path)
	return err == nil && info.IsDir()
}

func fileExists(path string) bool {
	info, err := os.Stat(path)
	return err == nil && !info.IsDir()
}

func cloneTreeLinkOrCopy(srcRoot, dstRoot string) error {
	//nolint:gosec // The cache state machine only walks its own staging tree; switching to os.Root is a larger refactor.
	return errutil.Wrap("clone tree link or copy", filepath.Walk(srcRoot, func(path string, info os.FileInfo, err error) error {
		if err != nil {
			return errutil.Wrap("walk source tree", err)
		}
		rel, err := filepath.Rel(srcRoot, path)
		if err != nil {
			return errutil.Wrap("compute relative clone path", err)
		}
		dstPath := filepath.Join(dstRoot, rel)
		if info.IsDir() {
			return errutil.Wrap("create clone directory", os.MkdirAll(dstPath, info.Mode().Perm()))
		}
		if isFanoutMarkerPath(rel) {
			return nil
		}
		if err := os.MkdirAll(filepath.Dir(dstPath), internalDirPerm); err != nil {
			return errutil.Wrap("create clone parent directory", err)
		}
		if err := os.Link(path, dstPath); err == nil {
			return nil
		}
		return copyRegularFile(path, dstPath, info.Mode().Perm())
	}))
}

func isFanoutMarkerPath(path string) bool {
	return strings.HasSuffix(path, ".wdfanout.ready") || strings.HasSuffix(path, ".wdfanout.done")
}

func copyRegularFile(srcPath, dstPath string, perm os.FileMode) error {
	return withOpenFile(srcPath, os.O_RDONLY, 0, func(src *os.File) error {
		return withOpenFile(dstPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, perm, func(dst *os.File) error {
			if _, err := io.Copy(dst, src); err != nil {
				return errutil.Wrap("copy regular file", err)
			}
			return errutil.Wrap("sync target file", dst.Sync())
		})
	})
}

func withOpenFile(path string, flag int, perm os.FileMode, fn func(*os.File) error) error {
	file, err := os.OpenFile(path, flag, perm)
	if err != nil {
		return errutil.Wrap("open file", err)
	}

	resultErr := fn(file)

	closeErr := file.Close()
	if closeErr != nil {
		resultErr = errutil.MergeClose(resultErr, closeErr, fmt.Sprintf("close file %s", path))
	}
	return resultErr
}