* Copyright (c) 2026 Huawei Technologies Co., Ltd.
* openFuyao is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package rdma
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"github.com/openfuyao/weight-dispatcher/pkg/internal/errutil"
sharedtypes "github.com/openfuyao/weight-dispatcher/pkg/types"
)
func resolveFiles(ctx context.Context, spec sharedtypes.TransferSpec) ([]sharedtypes.ArtifactFile, error) {
if ctx == nil {
return nil, fmt.Errorf("resolve files requires non-nil context")
}
files := spec.LogicalManifest.Files
if len(files) != 0 {
return files, nil
}
if len(spec.SourceSegments) == 0 {
return nil, fmt.Errorf("no source segments configured")
}
ep := spec.SourceSegments[0].SourceEndpoint
if IsHuggingFaceEndpoint(ep.SourceType, ep.Endpoint) {
chunkSize := spec.ChunkSizeBytes
if chunkSize <= 0 {
chunkSize = 64 * 1024 * 1024
}
manifest, err := ResolveHuggingFaceManifest(ctx, nil, HuggingFaceManifestRequest{
Endpoint: ep.Endpoint,
ModelID: ep.Path,
Token: ep.AccessToken,
Revision: ExtractHFRevision(ep.Endpoint),
ChunkSizeBytes: chunkSize,
})
if err != nil {
return nil, fmt.Errorf("resolve huggingface manifest: %w", err)
}
return manifest.Files, nil
}
size, err := LocalChunkClient{}.Stat(ctx, "", ep.Path, "")
if err != nil {
return nil, err
}
return []sharedtypes.ArtifactFile{{
RelativePath: filepath.Base(ep.Path),
SizeBytes: size,
Kind: sharedtypes.ArtifactFileKindSafeTensors,
Chunkable: true,
Required: true,
}}, nil
}
func prepareTargetFiles(targetTempPath string, files []sharedtypes.ArtifactFile) (map[string]*os.File, error) {
if err := os.RemoveAll(targetTempPath); err != nil {
return nil, fmt.Errorf("reset temp path: %w", err)
}
if err := os.MkdirAll(targetTempPath, rdmaDirPerm); err != nil {
return nil, fmt.Errorf("create temp root: %w", err)
}
openFiles := make(map[string]*os.File, len(files))
for _, file := range files {
targetPath := filepath.Join(targetTempPath, filepath.FromSlash(file.RelativePath))
if err := os.MkdirAll(filepath.Dir(targetPath), rdmaDirPerm); err != nil {
return nil, errutil.Wrap(fmt.Sprintf("create target directory for %s", targetPath), err)
}
handle, err := os.OpenFile(targetPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, rdmaFilePerm)
if err != nil {
return nil, errutil.Wrap(fmt.Sprintf("open target file %s", targetPath), err)
}
if err := handle.Truncate(file.SizeBytes); err != nil {
if closeErr := handle.Close(); closeErr != nil {
return nil, errors.Join(err, closeErr)
}
return nil, errutil.Wrap(fmt.Sprintf("truncate target file %s to %d bytes", targetPath, file.SizeBytes), err)
}
openFiles[file.RelativePath] = handle
}
return openFiles, nil
}
func prepareTargetFilesPreserve(targetTempPath string, files []sharedtypes.ArtifactFile) (map[string]*os.File, error) {
if err := os.MkdirAll(targetTempPath, rdmaDirPerm); err != nil {
return nil, fmt.Errorf("create temp root: %w", err)
}
openFiles := make(map[string]*os.File, len(files))
for _, file := range files {
targetPath := filepath.Join(targetTempPath, filepath.FromSlash(file.RelativePath))
if err := os.MkdirAll(filepath.Dir(targetPath), rdmaDirPerm); err != nil {
return nil, errutil.Wrap(fmt.Sprintf("create target directory for %s", targetPath), err)
}
handle, err := os.OpenFile(targetPath, os.O_CREATE|os.O_RDWR, rdmaFilePerm)
if err != nil {
return nil, errutil.Wrap(fmt.Sprintf("open target file %s", targetPath), err)
}
if err := handle.Truncate(file.SizeBytes); err != nil {
if closeErr := handle.Close(); closeErr != nil {
return nil, errors.Join(err, closeErr)
}
return nil, errutil.Wrap(fmt.Sprintf("truncate target file %s to %d bytes", targetPath, file.SizeBytes), err)
}
openFiles[file.RelativePath] = handle
}
return openFiles, nil
}
func openPreparedFiles(targetTempPath string, files []sharedtypes.ArtifactFile) (map[string]*os.File, error) {
openFiles := make(map[string]*os.File, len(files))
for _, file := range files {
targetPath := filepath.Join(targetTempPath, filepath.FromSlash(file.RelativePath))
handle, err := os.OpenFile(targetPath, os.O_RDWR, rdmaFilePerm)
if err != nil {
return nil, errutil.Wrap(fmt.Sprintf("open prepared target file %s", targetPath), err)
}
openFiles[file.RelativePath] = handle
}
return openFiles, nil
}
func closeOpenFiles(openFiles map[string]*os.File) {
for _, handle := range openFiles {
_ = handle.Close()
}
}
func resolveSourceFilePath(rootPath, relativePath string) string {
if relativePath == "" {
return rootPath
}
return filepath.Join(rootPath, filepath.FromSlash(relativePath))
}
func readLocalChunkData(rootPath, relativePath string, offset, length int64) (_ []byte, err error) {
path := resolveSourceFilePath(rootPath, relativePath)
file, err := os.Open(path)
if err != nil {
return nil, errutil.Wrap(fmt.Sprintf("open local chunk data file %s", path), err)
}
defer func() {
err = mergeCloseError(err, file.Close(), fmt.Sprintf("close local chunk data file %s", path))
}()
buffer := make([]byte, length)
n, err := file.ReadAt(buffer, offset)
if err != nil && int64(n) != length {
return nil, errutil.Wrap(fmt.Sprintf("read local chunk data file %s at offset %d", path, offset), err)
}
return buffer[:n], nil
}
func mergeCloseError(baseErr, closeErr error, action string) error {
if closeErr == nil {
return baseErr
}
wrappedCloseErr := errutil.Wrap(action, closeErr)
if baseErr == nil {
return wrappedCloseErr
}
return errors.Join(baseErr, wrappedCloseErr)
}