* Copyright (c) 2026 Huawei Technologies Co., Ltd.
* openFuyao is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package cache
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"strings"
"sync"
"github.com/openfuyao/weight-dispatcher/pkg/internal/errutil"
)
type Phase string
const (
PhaseEmpty Phase = "EMPTY"
PhaseLoading Phase = "LOADING"
PhaseReady Phase = "READY"
PhaseInvalid Phase = "INVALID"
)
type AcquireResult struct {
Phase Phase
Shared bool
StagingPath string
ReadyPath string
}
type entry struct {
phase Phase
err error
waitCh chan struct{}
closing sync.Once
}
type cachePaths struct {
key string
ready string
staging string
}
const (
internalDirPerm = 0o750
)
type StateMachine struct {
logger *slog.Logger
mu sync.Mutex
entries map[string]*entry
}
func NewStateMachine(logger *slog.Logger) *StateMachine {
if logger == nil {
logger = slog.Default()
}
return &StateMachine{
logger: logger,
entries: map[string]*entry{},
}
}
func (s *StateMachine) AcquireOrJoinTarget(artifactKey, readyPath string) (AcquireResult, error) {
paths, err := targetCachePaths(artifactKey, readyPath)
if err != nil {
return AcquireResult{}, err
}
return s.acquireOrJoinPaths(paths, artifactKey)
}
func (s *StateMachine) acquireOrJoinPaths(paths cachePaths, artifactKey string) (AcquireResult, error) {
if dirExists(paths.ready) || fileExists(paths.ready) {
s.logger.Debug("ready cache hit", "artifactKey", artifactKey, "readyPath", paths.ready)
return AcquireResult{Phase: PhaseReady, ReadyPath: paths.ready}, nil
}
s.mu.Lock()
defer s.mu.Unlock()
if dirExists(paths.ready) || fileExists(paths.ready) {
s.logger.Debug("ready cache hit after lease check", "artifactKey", artifactKey, "readyPath", paths.ready)
return AcquireResult{Phase: PhaseReady, ReadyPath: paths.ready}, nil
}
if current, ok := s.entries[paths.key]; ok && current.phase == PhaseLoading {
s.logger.Debug("join existing cache load", "artifactKey", artifactKey, "stagingPath", paths.staging)
return AcquireResult{
Phase: PhaseLoading,
Shared: true,
ReadyPath: paths.ready,
}, nil
}
if err := os.MkdirAll(filepath.Dir(paths.staging), internalDirPerm); err != nil {
s.logger.Error("create staging parent failed", "artifactKey", artifactKey, "stagingPath", paths.staging, "error", err)
return AcquireResult{}, fmt.Errorf("create staging parent: %w", err)
}
if err := os.RemoveAll(paths.staging); err != nil {
s.logger.Error("reset staging path failed", "artifactKey", artifactKey, "stagingPath", paths.staging, "error", err)
return AcquireResult{}, fmt.Errorf("reset staging path: %w", err)
}
if err := os.MkdirAll(paths.staging, internalDirPerm); err != nil {
s.logger.Error("create staging path failed", "artifactKey", artifactKey, "stagingPath", paths.staging, "error", err)
return AcquireResult{}, fmt.Errorf("create staging path: %w", err)
}
s.entries[paths.key] = &entry{
phase: PhaseLoading,
waitCh: make(chan struct{}),
}
s.logger.Debug("acquired cache load lease", "artifactKey", artifactKey, "stagingPath", paths.staging, "readyPath", paths.ready)
return AcquireResult{
Phase: PhaseLoading,
Shared: false,
StagingPath: paths.staging,
ReadyPath: paths.ready,
}, nil
}
func (s *StateMachine) WaitUntilReadyTarget(ctx context.Context, artifactKey, readyPath string) (string, error) {
paths, err := targetCachePaths(artifactKey, readyPath)
if err != nil {
return "", err
}
return s.waitUntilReadyPaths(ctx, paths, artifactKey)
}
func (s *StateMachine) waitUntilReadyPaths(ctx context.Context, paths cachePaths, _ string) (string, error) {
if dirExists(paths.ready) || fileExists(paths.ready) {
return paths.ready, nil
}
s.mu.Lock()
current, ok := s.entries[paths.key]
s.mu.Unlock()
if !ok {
s.logger.Error("wait requested without active loading entry", "artifactKey", paths.key, "readyPath", paths.ready)
return "", errors.New("no active loading entry found")
}
select {
case <-ctx.Done():
return "", errutil.Wrap("wait for cache ready canceled", ctx.Err())
case <-current.waitCh:
if current.phase == PhaseReady || dirExists(paths.ready) || fileExists(paths.ready) {
return paths.ready, nil
}
if current.err != nil {
return "", current.err
}
return "", errors.New("cache loading finished without ready artifact")
}
}
func (s *StateMachine) PublishReadyTarget(artifactKey, readyPath string) (string, error) {
paths, err := targetCachePaths(artifactKey, readyPath)
if err != nil {
return "", err
}
return s.publishReadyPaths(paths, artifactKey, false)
}
func (s *StateMachine) PublishReadyKeepingStagingTarget(artifactKey, readyPath string) (string, error) {
paths, err := targetCachePaths(artifactKey, readyPath)
if err != nil {
return "", err
}
return s.publishReadyPaths(paths, artifactKey, true)
}
func (s *StateMachine) publishReadyPaths(paths cachePaths, artifactKey string, keepStaging bool) (string, error) {
s.mu.Lock()
current, ok := s.entries[paths.key]
s.mu.Unlock()
if !ok {
s.logger.Error("publish requested without loading state", "artifactKey", artifactKey, "readyPath", paths.ready)
return "", errors.New("publish called without loading state")
}
if err := os.MkdirAll(filepath.Dir(paths.ready), internalDirPerm); err != nil {
return "", s.failLoadingEntry(paths, artifactKey, current, errutil.Wrap("create ready parent", err))
}
if err := os.RemoveAll(paths.ready); err != nil {
return "", s.failLoadingEntry(paths, artifactKey, current, errutil.Wrap("reset ready path", err))
}
if keepStaging {
if err := cloneTreeLinkOrCopy(paths.staging, paths.ready); err != nil {
return "", s.failLoadingEntry(paths, artifactKey, current, errutil.Wrap("publish ready cache without removing staging", err))
}
} else if err := os.Rename(paths.staging, paths.ready); err != nil {
return "", s.failLoadingEntry(paths, artifactKey, current, errutil.Wrap("publish ready cache", err))
}
s.mu.Lock()
current.phase = PhaseReady
delete(s.entries, paths.key)
s.mu.Unlock()
current.closing.Do(func() { close(current.waitCh) })
if keepStaging {
s.logger.Debug("published ready cache while keeping staging", "artifactKey", artifactKey, "readyPath", paths.ready, "stagingPath", paths.staging)
} else {
s.logger.Debug("published ready cache", "artifactKey", artifactKey, "readyPath", paths.ready)
}
return paths.ready, nil
}
func (s *StateMachine) PublishFailedTarget(artifactKey, readyPath string, cause error) error {
paths, err := targetCachePaths(artifactKey, readyPath)
if err != nil {
return err
}
return s.publishFailedPaths(paths, artifactKey, cause)
}
func (s *StateMachine) publishFailedPaths(paths cachePaths, artifactKey string, cause error) error {
s.mu.Lock()
current, ok := s.entries[paths.key]
if ok {
current.phase = PhaseInvalid
current.err = cause
delete(s.entries, paths.key)
}
s.mu.Unlock()
var cleanupErr error
if err := os.RemoveAll(paths.staging); err != nil {
cleanupErr = errutil.Wrap("cleanup staging path", err)
s.logger.Error("cleanup staging path failed after load failure", "artifactKey", artifactKey, "stagingPath", paths.staging, "cause", cause, "error", cleanupErr)
}
if ok {
current.closing.Do(func() { close(current.waitCh) })
}
if cleanupErr != nil {
return cleanupErr
}
s.logger.Debug("cache load failed and staging was cleaned", "artifactKey", artifactKey, "error", cause)
return nil
}
func (s *StateMachine) CleanupStagingTarget(artifactKey, readyPath string) error {
paths, err := targetCachePaths(artifactKey, readyPath)
if err != nil {
return err
}
if err := os.RemoveAll(paths.staging); err != nil {
s.logger.Error("cleanup collective staging failed", "artifactKey", artifactKey, "stagingPath", paths.staging, "error", err)
return fmt.Errorf("cleanup staging path: %w", err)
}
s.logger.Debug("collective staging cleaned", "artifactKey", artifactKey, "stagingPath", paths.staging)
return nil
}
func (s *StateMachine) failLoadingEntry(paths cachePaths, artifactKey string, current *entry, cause error) error {
s.mu.Lock()
current.phase = PhaseInvalid
current.err = cause
delete(s.entries, paths.key)
s.mu.Unlock()
current.closing.Do(func() { close(current.waitCh) })
s.logger.Error("cache load publishing failed", "artifactKey", artifactKey, "readyPath", paths.ready, "stagingPath", paths.staging, "error", cause)
return cause
}
func sanitizeKey(input string) string {
replacer := strings.NewReplacer("/", "_", "\\", "_", ":", "_", " ", "_")
return replacer.Replace(input)
}
func targetCachePaths(artifactKey, readyPath string) (cachePaths, error) {
if artifactKey == "" {
return cachePaths{}, errors.New("artifactKey is required")
}
if strings.TrimSpace(readyPath) == "" {
return cachePaths{}, errors.New("target ready path is required")
}
cleanReady := filepath.Clean(readyPath)
return cachePaths{
key: sanitizeKey(artifactKey) + "\x00" + cleanReady,
ready: cleanReady,
staging: StagingPathForReadyPath(cleanReady),
}, nil
}
func StagingPathForReadyPath(readyPath string) string {
cleanReady := filepath.Clean(readyPath)
return filepath.Join(filepath.Dir(cleanReady), ".staging", filepath.Base(cleanReady))
}
func dirExists(path string) bool {
info, err := os.Stat(path)
return err == nil && info.IsDir()
}
func fileExists(path string) bool {
info, err := os.Stat(path)
return err == nil && !info.IsDir()
}
func cloneTreeLinkOrCopy(srcRoot, dstRoot string) error {
return errutil.Wrap("clone tree link or copy", filepath.Walk(srcRoot, func(path string, info os.FileInfo, err error) error {
if err != nil {
return errutil.Wrap("walk source tree", err)
}
rel, err := filepath.Rel(srcRoot, path)
if err != nil {
return errutil.Wrap("compute relative clone path", err)
}
dstPath := filepath.Join(dstRoot, rel)
if info.IsDir() {
return errutil.Wrap("create clone directory", os.MkdirAll(dstPath, info.Mode().Perm()))
}
if isFanoutMarkerPath(rel) {
return nil
}
if err := os.MkdirAll(filepath.Dir(dstPath), internalDirPerm); err != nil {
return errutil.Wrap("create clone parent directory", err)
}
if err := os.Link(path, dstPath); err == nil {
return nil
}
return copyRegularFile(path, dstPath, info.Mode().Perm())
}))
}
func isFanoutMarkerPath(path string) bool {
return strings.HasSuffix(path, ".wdfanout.ready") || strings.HasSuffix(path, ".wdfanout.done")
}
func copyRegularFile(srcPath, dstPath string, perm os.FileMode) error {
return withOpenFile(srcPath, os.O_RDONLY, 0, func(src *os.File) error {
return withOpenFile(dstPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, perm, func(dst *os.File) error {
if _, err := io.Copy(dst, src); err != nil {
return errutil.Wrap("copy regular file", err)
}
return errutil.Wrap("sync target file", dst.Sync())
})
})
}
func withOpenFile(path string, flag int, perm os.FileMode, fn func(*os.File) error) error {
file, err := os.OpenFile(path, flag, perm)
if err != nil {
return errutil.Wrap("open file", err)
}
resultErr := fn(file)
closeErr := file.Close()
if closeErr != nil {
resultErr = errutil.MergeClose(resultErr, closeErr, fmt.Sprintf("close file %s", path))
}
return resultErr
}