* Copyright (c) 2026 Huawei Technologies Co., Ltd.
* openFuyao is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package rdma
import (
"context"
"fmt"
"hash/crc32"
"math"
"os"
"path/filepath"
"sort"
"strings"
"time"
"github.com/openfuyao/weight-dispatcher/pkg/internal/errutil"
sharedtypes "github.com/openfuyao/weight-dispatcher/pkg/types"
)
type relayFetchRun struct {
iteration int32
result sharedtypes.TransferResult
transportPath sharedtypes.TransportPath
err error
}
type relayDispatchResult struct {
payloadCount int
nextIteration int32
}
type relayFetchSubSpecResult struct {
spec sharedtypes.TransferSpec
payloadCount int
}
type relayFetchContext struct {
spec sharedtypes.TransferSpec
role relayFanoutRole
timeout time.Duration
relaySessions map[string]string
resultCh chan relayFetchRun
}
type relayFetchState struct {
iteration int32
dispatchedCount int
expectedCount int
inflight int
result sharedtypes.TransferResult
transportPath sharedtypes.TransportPath
}
type ownerFetchFallbackRun struct {
spec sharedtypes.TransferSpec
openFiles map[string]*os.File
iteration int32
ownerRank int32
ownerPeer sharedtypes.CollectivePeerPlan
jobs []transferJob
}
func (a *Adapter) executeRelayPeerFetch(ctx context.Context, spec sharedtypes.TransferSpec) (sharedtypes.TransferResult, sharedtypes.TransportPath, bool, error) {
role, ok := resolveRelayFanoutRole(spec)
if !ok {
return sharedtypes.TransferResult{}, "", false, nil
}
if err := validateRelayPeerFetchRole(role); err != nil {
a.logger.Error("relay peer fetch validation failed", "taskID", spec.TaskID, "err", err)
return sharedtypes.TransferResult{}, "", true, err
}
expectedCount := countValidRanges(role.rootPeer.OwnedRanges)
if role.isRoot || expectedCount == 0 {
return sharedtypes.TransferResult{}, sharedtypes.TransportPathRDMA, true, nil
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
relaySessions := make(map[string]string)
defer a.closeRelaySessions(ctx, relaySessions)
result, transportPath, err := a.runRelayPeerFetch(ctx, relayFetchContext{
spec: spec,
role: role,
timeout: collectiveTimeout(spec),
relaySessions: relaySessions,
resultCh: make(chan relayFetchRun, relayPeerMaxInflightIterations),
}, relayFetchState{
iteration: 1,
expectedCount: expectedCount,
})
return result, transportPath, true, err
}
func validateRelayPeerFetchRole(role relayFanoutRole) error {
if role.isRoot {
return nil
}
if role.rootPeer.Endpoint == "" {
return fmt.Errorf("relay peer requires root endpoint")
}
if role.rootPeer.StagingPath == "" {
return fmt.Errorf("relay peer requires root staging path")
}
return nil
}
func (a *Adapter) runRelayPeerFetch(ctx context.Context, fetch relayFetchContext, state relayFetchState) (sharedtypes.TransferResult, sharedtypes.TransportPath, error) {
for state.dispatchedCount < state.expectedCount || state.inflight > 0 {
nextState, err := a.dispatchRelayPeerFetches(ctx, fetch, state)
if err != nil {
return sharedtypes.TransferResult{}, "", err
}
state = nextState
if state.inflight == 0 {
break
}
nextState, err = a.collectRelayPeerFetchRun(ctx, fetch, state)
if err != nil {
return sharedtypes.TransferResult{}, "", err
}
state = nextState
}
if state.transportPath == "" {
state.transportPath = sharedtypes.TransportPathRDMA
}
return state.result, state.transportPath, nil
}
func (a *Adapter) dispatchRelayPeerFetches(ctx context.Context, fetch relayFetchContext, state relayFetchState) (relayFetchState, error) {
for state.dispatchedCount < state.expectedCount && state.inflight < relayPeerMaxInflightIterations {
dispatchResult, err := a.dispatchRelayPeerFetchIteration(ctx, fetch, state.iteration)
if err != nil {
return relayFetchState{}, err
}
state.iteration = dispatchResult.nextIteration
if dispatchResult.payloadCount == 0 {
continue
}
state.dispatchedCount += dispatchResult.payloadCount
state.inflight++
}
return state, nil
}
func (a *Adapter) dispatchRelayPeerFetchIteration(ctx context.Context, fetch relayFetchContext, iteration int32) (relayDispatchResult, error) {
payloads, err := a.waitCollectivePayloadMetadata(ctx, fetch.role.rootPeer.Endpoint, fetch.spec.TaskID, collectiveSessionID(fetch.spec), iteration, fetch.timeout)
if err != nil {
a.logger.Error("wait relay collective payload metadata failed", "taskID", fetch.spec.TaskID, "iteration", iteration, "rootNode", fetch.role.rootPeer.NodeName, "err", err)
return relayDispatchResult{}, err
}
if len(payloads) == 0 {
return relayDispatchResult{nextIteration: iteration + 1}, nil
}
collectRelaySessions(fetch.role.rootPeer.Endpoint, payloads, fetch.relaySessions)
subSpecResult := buildRelayPeerFetchSubSpec(fetch.spec, fetch.role.rootPeer, payloads, iteration)
logRelayPeerFetchSubSpec(a, fetch.spec, fetch.role.rootPeer, iteration, payloads, subSpecResult.spec)
go a.executeRelayPeerFetchSubSpec(ctx, iteration, fetch.resultCh, subSpecResult.spec)
return relayDispatchResult{
payloadCount: subSpecResult.payloadCount,
nextIteration: iteration + 1,
}, nil
}
func (a *Adapter) executeRelayPeerFetchSubSpec(ctx context.Context, iteration int32, resultCh chan relayFetchRun, subSpec sharedtypes.TransferSpec) {
stepResult, err := a.executeTransferSubSpec(ctx, subSpec)
path := stepResult.TransportPath
if path == "" {
path = sharedtypes.TransportPathRDMA
}
resultCh <- relayFetchRun{
iteration: iteration,
result: stepResult,
transportPath: path,
err: err,
}
}
func buildRelayPeerFetchSubSpec(
spec sharedtypes.TransferSpec,
rootPeer sharedtypes.CollectivePeerPlan,
payloads []sharedtypes.CollectiveChunkPayload,
iteration int32,
) relayFetchSubSpecResult {
return relayFetchSubSpecResult{
spec: buildCollectiveRangeFetchSpec(spec, rootPeer, payloads, iteration),
payloadCount: len(payloadsToByteRanges(payloads)),
}
}
func logRelayPeerFetchSubSpec(
a *Adapter,
spec sharedtypes.TransferSpec,
rootPeer sharedtypes.CollectivePeerPlan,
iteration int32,
payloads []sharedtypes.CollectiveChunkPayload,
subSpec sharedtypes.TransferSpec,
) {
relaySessionIDs := relaySessionIDs(payloads)
relayHints := countRelayHints(payloads)
if relayHints != len(payloads) {
a.logger.Warn(
"relay collective payloads missing RDMA hints",
"taskID", spec.TaskID,
"iteration", iteration,
"payloadCount", len(payloads),
"relayHintCount", relayHints,
"rootNode", rootPeer.NodeName,
)
}
a.logger.Info(
"relay peer fetch sub-spec",
"taskID", spec.TaskID,
"iteration", iteration,
"payloadCount", len(payloads),
"relayHintCount", relayHints,
"relaySessionIDs", strings.Join(relaySessionIDs, ","),
"sourceSegments", strings.Join(collectiveSegmentKinds(subSpec.SourceSegments), ","),
)
}
func countRelayHints(payloads []sharedtypes.CollectiveChunkPayload) int {
relayHints := 0
for _, payload := range payloads {
if payload.RelayRDMA != nil && payload.RelayRDMA.SessionID != "" {
relayHints++
}
}
return relayHints
}
func collectiveSegmentKinds(sourceSegments []sharedtypes.SourceSegmentPlan) []string {
segmentKinds := make([]string, 0, len(sourceSegments))
for _, segment := range sourceSegments {
mode := "plain"
sessionID := ""
if segment.SourceEndpoint.RelayRDMA != nil {
mode = "relay"
sessionID = segment.SourceEndpoint.RelayRDMA.SessionID
}
segmentKinds = append(segmentKinds, fmt.Sprintf(
"%s:%s:ranges=%d:first=%d-%d",
mode,
sessionID,
len(segment.ByteRanges),
firstRangeStart(segment.ByteRanges),
firstRangeEnd(segment.ByteRanges),
))
}
return segmentKinds
}
func (a *Adapter) collectRelayPeerFetchRun(ctx context.Context, fetch relayFetchContext, state relayFetchState) (relayFetchState, error) {
run := <-fetch.resultCh
state.inflight--
if run.err != nil {
a.logger.Error("execute relay peer fetch sub-spec failed", "taskID", fetch.spec.TaskID, "iteration", run.iteration, "rootNode", fetch.role.rootPeer.NodeName, "err", run.err)
return relayFetchState{}, run.err
}
if err := a.acknowledgeCollectiveIteration(ctx, fetch.role.rootPeer.Endpoint, fetch.spec.TaskID, collectiveSessionID(fetch.spec), run.iteration); err != nil {
a.logger.Error("acknowledge relay collective iteration failed", "taskID", fetch.spec.TaskID, "iteration", run.iteration, "rootNode", fetch.role.rootPeer.NodeName, "err", err)
return relayFetchState{}, err
}
state.result = mergeTransferResults(state.result, run.result)
state.transportPath = mergeTransportPath(state.transportPath, run.transportPath)
return state, nil
}
func firstRangeStart(ranges []sharedtypes.ByteRange) int64 {
if len(ranges) == 0 {
return -1
}
return ranges[0].Start
}
func firstRangeEnd(ranges []sharedtypes.ByteRange) int64 {
if len(ranges) == 0 {
return -1
}
return ranges[0].End
}
func buildCollectiveRangeFetchSpec(parent sharedtypes.TransferSpec, owner sharedtypes.CollectivePeerPlan, payloads []sharedtypes.CollectiveChunkPayload, iteration int32) sharedtypes.TransferSpec {
sourceSegments := buildCollectiveRelaySourceSegments(parent, owner, payloads)
if len(sourceSegments) == 0 {
sourceID := fmt.Sprintf("collective-%s-%d", owner.NodeName, owner.Rank)
sourceSegments = []sharedtypes.SourceSegmentPlan{{
SourceID: sourceID,
SourceEndpoint: sharedtypes.SourceEndpoint{
SourceID: sourceID,
SourceType: sourceTypePeer,
Endpoint: owner.Endpoint,
Path: owner.StagingPath,
NodeName: owner.NodeName,
},
ByteRanges: payloadsToByteRanges(payloads),
}}
}
return sharedtypes.TransferSpec{
TaskID: fmt.Sprintf("%s-owner-%s-%d", parent.TaskID, owner.NodeName, iteration),
ArtifactKey: parent.ArtifactKey,
TransferMode: sharedtypes.TransferModeSingleSourceDirect,
LogicalManifest: parent.LogicalManifest,
SourceSegments: sourceSegments,
TargetTempPath: parent.TargetTempPath,
TargetFinalPath: parent.TargetFinalPath,
PreserveExisting: true,
EnableChunkCRC32C: parent.EnableChunkCRC32C,
ChunkSizeBytes: parent.ChunkSizeBytes,
Parallelism: collectiveRangeFetchParallelism(sourceSegments),
RetryLimit: parent.RetryLimit,
TimeoutSeconds: parent.TimeoutSeconds,
}
}
func collectiveRangeFetchParallelism(sourceSegments []sharedtypes.SourceSegmentPlan) int32 {
if len(sourceSegments) <= 1 {
return 1
}
parallelism := int32(1)
for range sourceSegments[1:] {
if parallelism == math.MaxInt32 {
return parallelism
}
parallelism++
}
return parallelism
}
func shouldUseRelayRdmaOwnerFetch(parent sharedtypes.TransferSpec) bool {
return len(parent.LogicalManifest.Files) <= 1 && !isDirectoryPerFileFanoutSubTask(parent)
}
func isDirectoryPerFileFanoutSubTask(parent sharedtypes.TransferSpec) bool {
return parent.TransferMode == sharedtypes.TransferModePartialPullAllGather &&
parent.PreserveExisting &&
len(parent.LogicalManifest.Files) == 1
}
func buildCollectiveRelaySourceSegments(parent sharedtypes.TransferSpec, owner sharedtypes.CollectivePeerPlan, payloads []sharedtypes.CollectiveChunkPayload) []sharedtypes.SourceSegmentPlan {
if !shouldUseRelayRdmaOwnerFetch(parent) {
return nil
}
grouped := make(map[string]*sharedtypes.SourceSegmentPlan)
order := make([]string, 0)
for _, payload := range payloads {
if payload.Chunk.Size <= 0 || payload.RelayRDMA == nil || payload.RelayRDMA.SessionID == "" {
continue
}
key := payload.RelayRDMA.SessionID
segment, ok := grouped[key]
if !ok {
sourceID := fmt.Sprintf("collective-%s-%d-%s", owner.NodeName, owner.Rank, key)
segment = &sharedtypes.SourceSegmentPlan{
SourceID: sourceID,
SourceEndpoint: sharedtypes.SourceEndpoint{
SourceID: sourceID,
SourceType: "peer-relay",
Endpoint: owner.Endpoint,
Path: owner.StagingPath,
NodeName: owner.NodeName,
RelayRDMA: cloneRelayRDMAHint(payload.RelayRDMA),
},
}
grouped[key] = segment
order = append(order, key)
}
segment.ByteRanges = append(segment.ByteRanges, sharedtypes.ByteRange{
RelativePath: payload.Chunk.RelativePath,
Start: payload.Chunk.Offset,
End: payload.Chunk.Offset + payload.Chunk.Size,
RelayOffset: payload.RelayOffset,
})
}
segments := make([]sharedtypes.SourceSegmentPlan, 0, len(order))
for _, key := range order {
segment := grouped[key]
sort.SliceStable(segment.ByteRanges, func(i, j int) bool {
if segment.ByteRanges[i].RelativePath != segment.ByteRanges[j].RelativePath {
return segment.ByteRanges[i].RelativePath < segment.ByteRanges[j].RelativePath
}
return segment.ByteRanges[i].Start < segment.ByteRanges[j].Start
})
segments = append(segments, *segment)
}
return segments
}
type relayFanoutRole struct {
selfPeer sharedtypes.CollectivePeerPlan
rootPeer sharedtypes.CollectivePeerPlan
isRoot bool
}
func resolveRelayFanoutRole(spec sharedtypes.TransferSpec) (relayFanoutRole, bool) {
if spec.TransferMode != sharedtypes.TransferModePartialPullAllGather || spec.CollectiveSpec.Ring == nil {
return relayFanoutRole{}, false
}
if spec.CollectiveSpec.Ring.WorldSize != 2 || len(spec.CollectiveSpec.Peers) != 2 {
return relayFanoutRole{}, false
}
var (
foundSelf bool
rootPeer sharedtypes.CollectivePeerPlan
role relayFanoutRole
)
for _, peer := range spec.CollectiveSpec.Peers {
if peer.Rank == spec.CollectiveSpec.Ring.Rank {
role.selfPeer = peer
foundSelf = true
}
if countValidRanges(peer.OwnedRanges) > 0 {
if rootPeer.NodeName != "" {
return relayFanoutRole{}, false
}
rootPeer = peer
}
}
if !foundSelf || rootPeer.NodeName == "" {
return relayFanoutRole{}, false
}
role.rootPeer = rootPeer
role.isRoot = role.selfPeer.Rank == rootPeer.Rank
return role, true
}
func payloadsToByteRanges(payloads []sharedtypes.CollectiveChunkPayload) []sharedtypes.ByteRange {
ranges := make([]sharedtypes.ByteRange, 0, len(payloads))
for _, payload := range payloads {
if payload.Chunk.Size <= 0 {
continue
}
ranges = append(ranges, sharedtypes.ByteRange{
RelativePath: payload.Chunk.RelativePath,
Start: payload.Chunk.Offset,
End: payload.Chunk.Offset + payload.Chunk.Size,
})
}
return ranges
}
func (a *Adapter) executeRingCollectiveOverAPI(ctx context.Context, spec sharedtypes.TransferSpec, openFiles map[string]*os.File) (sharedtypes.TransferResult, sharedtypes.TransportPath, error) {
peerByRank := make(map[int32]sharedtypes.CollectivePeerPlan, len(spec.CollectiveSpec.Peers))
for _, peer := range spec.CollectiveSpec.Peers {
peerByRank[peer.Rank] = peer
}
selfRank := spec.CollectiveSpec.Ring.Rank
worldSize := spec.CollectiveSpec.Ring.WorldSize
if worldSize <= 1 {
return sharedtypes.TransferResult{}, sharedtypes.TransportPathRDMA, nil
}
selfPeer, ok := peerByRank[selfRank]
if !ok {
return sharedtypes.TransferResult{}, "", fmt.Errorf("self peer rank %d not found", selfRank)
}
outboundPayloads, err := a.buildLocalPayloads(spec, selfPeer.OwnedRanges, spec.TargetTempPath)
if err != nil {
return sharedtypes.TransferResult{}, "", err
}
result := sharedtypes.TransferResult{}
transportPath := sharedtypes.TransportPathTCPFallback
sessionID := collectiveSessionID(spec)
timeout := collectiveTimeout(spec)
for iteration := int32(1); iteration < worldSize; iteration++ {
if len(outboundPayloads) > 0 {
if err := a.pushCollectivePayloads(ctx, spec, sessionID, iteration, outboundPayloads); err != nil {
return sharedtypes.TransferResult{}, "", err
}
}
ownerRank := ((selfRank-iteration)%worldSize + worldSize) % worldSize
expectedRanges := peerByRank[ownerRank].OwnedRanges
if countValidRanges(expectedRanges) == 0 {
outboundPayloads = nil
continue
}
inboundPayloads, err := a.waitCollectivePayloads(ctx, spec, sessionID, iteration, expectedRanges, timeout)
if err != nil {
return sharedtypes.TransferResult{}, "", err
}
stepResult, err := a.applyCollectivePayloads(spec.TargetTempPath, openFiles, inboundPayloads)
if err != nil {
return sharedtypes.TransferResult{}, "", err
}
result = mergeTransferResults(result, stepResult)
outboundPayloads = inboundPayloads
a.logger.Debug("completed ring iteration", "taskID", spec.TaskID, "sessionID", sessionID, "iteration", iteration, "receivedChunkCount", len(inboundPayloads))
}
return result, transportPath, nil
}
func (a *Adapter) executeOwnerFetchFallback(ctx context.Context, spec sharedtypes.TransferSpec, openFiles map[string]*os.File) (sharedtypes.TransferResult, sharedtypes.TransportPath, error) {
peerByRank := collectivePeersByRank(spec.CollectiveSpec.Peers)
if spec.CollectiveSpec.Ring.WorldSize <= 1 {
return sharedtypes.TransferResult{}, sharedtypes.TransportPathTCPFallback, nil
}
result := sharedtypes.TransferResult{}
transportPath := sharedtypes.TransportPathTCPFallback
for iteration := int32(1); iteration < spec.CollectiveSpec.Ring.WorldSize; iteration++ {
ownerRank := collectiveOwnerRank(spec.CollectiveSpec.Ring.Rank, spec.CollectiveSpec.Ring.WorldSize, iteration)
ownerPeer, jobs, err := buildOwnerFetchFallbackJobs(peerByRank, ownerRank)
if err != nil {
a.logger.Error("build owner-fetch fallback jobs failed", "taskID", spec.TaskID, "iteration", iteration, "ownerRank", ownerRank, "err", err)
return sharedtypes.TransferResult{}, "", err
}
if len(jobs) == 0 {
continue
}
stepResult, err := a.executeOwnerFetchFallbackIteration(ctx, ownerFetchFallbackRun{
spec: spec,
openFiles: openFiles,
iteration: iteration,
ownerRank: ownerRank,
ownerPeer: ownerPeer,
jobs: jobs,
})
if err != nil {
return sharedtypes.TransferResult{}, "", err
}
result = mergeTransferResults(result, stepResult)
}
return result, transportPath, nil
}
func collectivePeersByRank(peers []sharedtypes.CollectivePeerPlan) map[int32]sharedtypes.CollectivePeerPlan {
peerByRank := make(map[int32]sharedtypes.CollectivePeerPlan, len(peers))
for _, peer := range peers {
peerByRank[peer.Rank] = peer
}
return peerByRank
}
func collectiveOwnerRank(selfRank, worldSize, iteration int32) int32 {
return ((selfRank-iteration)%worldSize + worldSize) % worldSize
}
func buildOwnerFetchFallbackJobs(
peerByRank map[int32]sharedtypes.CollectivePeerPlan,
ownerRank int32,
) (sharedtypes.CollectivePeerPlan, []transferJob, error) {
ownerPeer, ok := peerByRank[ownerRank]
if !ok {
return sharedtypes.CollectivePeerPlan{}, nil, fmt.Errorf("collective owner rank %d not found", ownerRank)
}
jobs := make([]transferJob, 0, len(ownerPeer.OwnedRanges))
for _, rng := range ownerPeer.OwnedRanges {
if rng.End <= rng.Start {
continue
}
jobs = append(jobs, newOwnerFetchTransferJob(ownerPeer, ownerRank, rng))
}
return ownerPeer, jobs, nil
}
func newOwnerFetchTransferJob(
ownerPeer sharedtypes.CollectivePeerPlan,
ownerRank int32,
rng sharedtypes.ByteRange,
) transferJob {
sourceID := fmt.Sprintf("collective-%s-%d", ownerPeer.NodeName, ownerRank)
return transferJob{
source: sharedtypes.SourceSegmentPlan{
SourceID: sourceID,
SourceEndpoint: sharedtypes.SourceEndpoint{
SourceID: sourceID,
SourceType: sourceTypePeer,
Endpoint: ownerPeer.Endpoint,
Path: ownerPeer.StagingPath,
NodeName: ownerPeer.NodeName,
},
},
relativePath: rng.RelativePath,
offset: rng.Start,
length: rng.End - rng.Start,
}
}
func (a *Adapter) executeOwnerFetchFallbackIteration(ctx context.Context, run ownerFetchFallbackRun) (sharedtypes.TransferResult, error) {
a.logger.Debug("executing owner-fetch fallback iteration", "taskID", run.spec.TaskID, "iteration", run.iteration, "ownerRank", run.ownerRank, "pullNode", run.ownerPeer.NodeName, "jobCount", len(run.jobs))
stepResult, err := a.executeJobs(ctx, run.spec.TargetTempPath, run.jobs, run.openFiles, len(run.jobs), effectiveChunkRetryLimit(run.spec))
if err != nil {
a.logger.Error("execute owner-fetch fallback iteration failed", "taskID", run.spec.TaskID, "iteration", run.iteration, "ownerRank", run.ownerRank, "pullNode", run.ownerPeer.NodeName, "jobCount", len(run.jobs), "err", err)
return sharedtypes.TransferResult{}, err
}
return stepResult, nil
}
func (*Adapter) buildLocalPayloads(spec sharedtypes.TransferSpec, ranges []sharedtypes.ByteRange, targetTempPath string) ([]sharedtypes.CollectiveChunkPayload, error) {
payloads := make([]sharedtypes.CollectiveChunkPayload, 0, len(ranges))
for _, rng := range ranges {
if rng.End <= rng.Start {
continue
}
data, err := readLocalChunkData(targetTempPath, rng.RelativePath, rng.Start, rng.End-rng.Start)
if err != nil {
return nil, err
}
sum := crc32.Checksum(data, crc32cTable)
payloads = append(payloads, sharedtypes.CollectiveChunkPayload{
Chunk: sharedtypes.TransferredChunk{
ChunkID: fmt.Sprintf("%s:%s:%d", spec.TargetTempPath, rng.RelativePath, rng.Start),
FilePath: filepath.ToSlash(filepath.Join(targetTempPath, filepath.FromSlash(rng.RelativePath))),
RelativePath: rng.RelativePath,
Offset: rng.Start,
Size: int64(len(data)),
CRC32C: encodeCRC32C(sum),
SourceID: targetNodeID(spec),
},
Data: data,
})
}
return payloads, nil
}
func (*Adapter) applyCollectivePayloads(targetTempPath string, openFiles map[string]*os.File, payloads []sharedtypes.CollectiveChunkPayload) (sharedtypes.TransferResult, error) {
result := sharedtypes.TransferResult{}
for _, payload := range payloads {
targetFile := openFiles[payload.Chunk.RelativePath]
if targetFile == nil {
return sharedtypes.TransferResult{}, fmt.Errorf("target file %s not prepared", payload.Chunk.RelativePath)
}
if payload.Chunk.CRC32C != "" {
sum := crc32.Checksum(payload.Data, crc32cTable)
if encodeCRC32C(sum) != payload.Chunk.CRC32C {
return sharedtypes.TransferResult{}, fmt.Errorf("collective chunk crc mismatch for %s@%d", payload.Chunk.RelativePath, payload.Chunk.Offset)
}
}
if _, err := targetFile.WriteAt(payload.Data, payload.Chunk.Offset); err != nil {
return sharedtypes.TransferResult{}, errutil.Wrap(fmt.Sprintf("write collective payload %s at offset %d", payload.Chunk.RelativePath, payload.Chunk.Offset), err)
}
payload.Chunk.FilePath = filepath.ToSlash(filepath.Join(targetTempPath, filepath.FromSlash(payload.Chunk.RelativePath)))
result.BytesTransferred += int64(len(payload.Data))
result.ChunkCount++
result.SucceededChunks++
result.TransferredChunks = append(result.TransferredChunks, payload.Chunk)
}
return result, nil
}