/*
 * Copyright (c) 2026 Huawei Technologies Co., Ltd.
 * openFuyao is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package rdma

import (
	"context"
	"fmt"
	"hash/crc32"
	"math"
	"os"
	"path/filepath"
	"sort"
	"strings"
	"time"

	"github.com/openfuyao/weight-dispatcher/pkg/internal/errutil"
	sharedtypes "github.com/openfuyao/weight-dispatcher/pkg/types"
)

type relayFetchRun struct {
	iteration     int32
	result        sharedtypes.TransferResult
	transportPath sharedtypes.TransportPath
	err           error
}

type relayDispatchResult struct {
	payloadCount  int
	nextIteration int32
}

type relayFetchSubSpecResult struct {
	spec         sharedtypes.TransferSpec
	payloadCount int
}

type relayFetchContext struct {
	spec          sharedtypes.TransferSpec
	role          relayFanoutRole
	timeout       time.Duration
	relaySessions map[string]string
	resultCh      chan relayFetchRun
}

type relayFetchState struct {
	iteration       int32
	dispatchedCount int
	expectedCount   int
	inflight        int
	result          sharedtypes.TransferResult
	transportPath   sharedtypes.TransportPath
}

type ownerFetchFallbackRun struct {
	spec      sharedtypes.TransferSpec
	openFiles map[string]*os.File
	iteration int32
	ownerRank int32
	ownerPeer sharedtypes.CollectivePeerPlan
	jobs      []transferJob
}

func (a *Adapter) executeRelayPeerFetch(ctx context.Context, spec sharedtypes.TransferSpec) (sharedtypes.TransferResult, sharedtypes.TransportPath, bool, error) {
	role, ok := resolveRelayFanoutRole(spec)
	if !ok {
		return sharedtypes.TransferResult{}, "", false, nil
	}
	if err := validateRelayPeerFetchRole(role); err != nil {
		a.logger.Error("relay peer fetch validation failed", "taskID", spec.TaskID, "err", err)
		return sharedtypes.TransferResult{}, "", true, err
	}
	expectedCount := countValidRanges(role.rootPeer.OwnedRanges)
	if role.isRoot || expectedCount == 0 {
		return sharedtypes.TransferResult{}, sharedtypes.TransportPathRDMA, true, nil
	}
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	relaySessions := make(map[string]string)
	defer a.closeRelaySessions(ctx, relaySessions)
	result, transportPath, err := a.runRelayPeerFetch(ctx, relayFetchContext{
		spec:          spec,
		role:          role,
		timeout:       collectiveTimeout(spec),
		relaySessions: relaySessions,
		resultCh:      make(chan relayFetchRun, relayPeerMaxInflightIterations),
	}, relayFetchState{
		iteration:     1,
		expectedCount: expectedCount,
	})
	return result, transportPath, true, err
}

func validateRelayPeerFetchRole(role relayFanoutRole) error {
	if role.isRoot {
		return nil
	}
	if role.rootPeer.Endpoint == "" {
		return fmt.Errorf("relay peer requires root endpoint")
	}
	if role.rootPeer.StagingPath == "" {
		return fmt.Errorf("relay peer requires root staging path")
	}
	return nil
}

func (a *Adapter) runRelayPeerFetch(ctx context.Context, fetch relayFetchContext, state relayFetchState) (sharedtypes.TransferResult, sharedtypes.TransportPath, error) {
	for state.dispatchedCount < state.expectedCount || state.inflight > 0 {
		nextState, err := a.dispatchRelayPeerFetches(ctx, fetch, state)
		if err != nil {
			return sharedtypes.TransferResult{}, "", err
		}
		state = nextState
		if state.inflight == 0 {
			break
		}
		nextState, err = a.collectRelayPeerFetchRun(ctx, fetch, state)
		if err != nil {
			return sharedtypes.TransferResult{}, "", err
		}
		state = nextState
	}
	if state.transportPath == "" {
		state.transportPath = sharedtypes.TransportPathRDMA
	}
	return state.result, state.transportPath, nil
}

func (a *Adapter) dispatchRelayPeerFetches(ctx context.Context, fetch relayFetchContext, state relayFetchState) (relayFetchState, error) {
	for state.dispatchedCount < state.expectedCount && state.inflight < relayPeerMaxInflightIterations {
		dispatchResult, err := a.dispatchRelayPeerFetchIteration(ctx, fetch, state.iteration)
		if err != nil {
			return relayFetchState{}, err
		}
		state.iteration = dispatchResult.nextIteration
		if dispatchResult.payloadCount == 0 {
			continue
		}
		state.dispatchedCount += dispatchResult.payloadCount
		state.inflight++
	}
	return state, nil
}

func (a *Adapter) dispatchRelayPeerFetchIteration(ctx context.Context, fetch relayFetchContext, iteration int32) (relayDispatchResult, error) {
	payloads, err := a.waitCollectivePayloadMetadata(ctx, fetch.role.rootPeer.Endpoint, fetch.spec.TaskID, collectiveSessionID(fetch.spec), iteration, fetch.timeout)
	if err != nil {
		a.logger.Error("wait relay collective payload metadata failed", "taskID", fetch.spec.TaskID, "iteration", iteration, "rootNode", fetch.role.rootPeer.NodeName, "err", err)
		return relayDispatchResult{}, err
	}
	if len(payloads) == 0 {
		return relayDispatchResult{nextIteration: iteration + 1}, nil
	}
	collectRelaySessions(fetch.role.rootPeer.Endpoint, payloads, fetch.relaySessions)
	subSpecResult := buildRelayPeerFetchSubSpec(fetch.spec, fetch.role.rootPeer, payloads, iteration)
	logRelayPeerFetchSubSpec(a, fetch.spec, fetch.role.rootPeer, iteration, payloads, subSpecResult.spec)
	go a.executeRelayPeerFetchSubSpec(ctx, iteration, fetch.resultCh, subSpecResult.spec)
	return relayDispatchResult{
		payloadCount:  subSpecResult.payloadCount,
		nextIteration: iteration + 1,
	}, nil
}

func (a *Adapter) executeRelayPeerFetchSubSpec(ctx context.Context, iteration int32, resultCh chan relayFetchRun, subSpec sharedtypes.TransferSpec) {
	stepResult, err := a.executeTransferSubSpec(ctx, subSpec)
	path := stepResult.TransportPath
	if path == "" {
		path = sharedtypes.TransportPathRDMA
	}
	resultCh <- relayFetchRun{
		iteration:     iteration,
		result:        stepResult,
		transportPath: path,
		err:           err,
	}
}

func buildRelayPeerFetchSubSpec(
	spec sharedtypes.TransferSpec,
	rootPeer sharedtypes.CollectivePeerPlan,
	payloads []sharedtypes.CollectiveChunkPayload,
	iteration int32,
) relayFetchSubSpecResult {
	return relayFetchSubSpecResult{
		spec:         buildCollectiveRangeFetchSpec(spec, rootPeer, payloads, iteration),
		payloadCount: len(payloadsToByteRanges(payloads)),
	}
}

func logRelayPeerFetchSubSpec(
	a *Adapter,
	spec sharedtypes.TransferSpec,
	rootPeer sharedtypes.CollectivePeerPlan,
	iteration int32,
	payloads []sharedtypes.CollectiveChunkPayload,
	subSpec sharedtypes.TransferSpec,
) {
	relaySessionIDs := relaySessionIDs(payloads)
	relayHints := countRelayHints(payloads)
	if relayHints != len(payloads) {
		a.logger.Warn(
			"relay collective payloads missing RDMA hints",
			"taskID", spec.TaskID,
			"iteration", iteration,
			"payloadCount", len(payloads),
			"relayHintCount", relayHints,
			"rootNode", rootPeer.NodeName,
		)
	}
	a.logger.Info(
		"relay peer fetch sub-spec",
		"taskID", spec.TaskID,
		"iteration", iteration,
		"payloadCount", len(payloads),
		"relayHintCount", relayHints,
		"relaySessionIDs", strings.Join(relaySessionIDs, ","),
		"sourceSegments", strings.Join(collectiveSegmentKinds(subSpec.SourceSegments), ","),
	)
}

func countRelayHints(payloads []sharedtypes.CollectiveChunkPayload) int {
	relayHints := 0
	for _, payload := range payloads {
		if payload.RelayRDMA != nil && payload.RelayRDMA.SessionID != "" {
			relayHints++
		}
	}
	return relayHints
}

func collectiveSegmentKinds(sourceSegments []sharedtypes.SourceSegmentPlan) []string {
	segmentKinds := make([]string, 0, len(sourceSegments))
	for _, segment := range sourceSegments {
		mode := "plain"
		sessionID := ""
		if segment.SourceEndpoint.RelayRDMA != nil {
			mode = "relay"
			sessionID = segment.SourceEndpoint.RelayRDMA.SessionID
		}
		segmentKinds = append(segmentKinds, fmt.Sprintf(
			"%s:%s:ranges=%d:first=%d-%d",
			mode,
			sessionID,
			len(segment.ByteRanges),
			firstRangeStart(segment.ByteRanges),
			firstRangeEnd(segment.ByteRanges),
		))
	}
	return segmentKinds
}

func (a *Adapter) collectRelayPeerFetchRun(ctx context.Context, fetch relayFetchContext, state relayFetchState) (relayFetchState, error) {
	run := <-fetch.resultCh
	state.inflight--
	if run.err != nil {
		a.logger.Error("execute relay peer fetch sub-spec failed", "taskID", fetch.spec.TaskID, "iteration", run.iteration, "rootNode", fetch.role.rootPeer.NodeName, "err", run.err)
		return relayFetchState{}, run.err
	}
	if err := a.acknowledgeCollectiveIteration(ctx, fetch.role.rootPeer.Endpoint, fetch.spec.TaskID, collectiveSessionID(fetch.spec), run.iteration); err != nil {
		a.logger.Error("acknowledge relay collective iteration failed", "taskID", fetch.spec.TaskID, "iteration", run.iteration, "rootNode", fetch.role.rootPeer.NodeName, "err", err)
		return relayFetchState{}, err
	}
	state.result = mergeTransferResults(state.result, run.result)
	state.transportPath = mergeTransportPath(state.transportPath, run.transportPath)
	return state, nil
}

func firstRangeStart(ranges []sharedtypes.ByteRange) int64 {
	if len(ranges) == 0 {
		return -1
	}
	return ranges[0].Start
}

func firstRangeEnd(ranges []sharedtypes.ByteRange) int64 {
	if len(ranges) == 0 {
		return -1
	}
	return ranges[0].End
}

func buildCollectiveRangeFetchSpec(parent sharedtypes.TransferSpec, owner sharedtypes.CollectivePeerPlan, payloads []sharedtypes.CollectiveChunkPayload, iteration int32) sharedtypes.TransferSpec {
	sourceSegments := buildCollectiveRelaySourceSegments(parent, owner, payloads)
	if len(sourceSegments) == 0 {
		sourceID := fmt.Sprintf("collective-%s-%d", owner.NodeName, owner.Rank)
		sourceSegments = []sharedtypes.SourceSegmentPlan{{
			SourceID: sourceID,
			SourceEndpoint: sharedtypes.SourceEndpoint{
				SourceID:   sourceID,
				SourceType: sourceTypePeer,
				Endpoint:   owner.Endpoint,
				Path:       owner.StagingPath,
				NodeName:   owner.NodeName,
			},
			ByteRanges: payloadsToByteRanges(payloads),
		}}
	}
	return sharedtypes.TransferSpec{
		TaskID:            fmt.Sprintf("%s-owner-%s-%d", parent.TaskID, owner.NodeName, iteration),
		ArtifactKey:       parent.ArtifactKey,
		TransferMode:      sharedtypes.TransferModeSingleSourceDirect,
		LogicalManifest:   parent.LogicalManifest,
		SourceSegments:    sourceSegments,
		TargetTempPath:    parent.TargetTempPath,
		TargetFinalPath:   parent.TargetFinalPath,
		PreserveExisting:  true,
		EnableChunkCRC32C: parent.EnableChunkCRC32C,
		ChunkSizeBytes:    parent.ChunkSizeBytes,
		Parallelism:       collectiveRangeFetchParallelism(sourceSegments),
		RetryLimit:        parent.RetryLimit,
		TimeoutSeconds:    parent.TimeoutSeconds,
	}
}

func collectiveRangeFetchParallelism(sourceSegments []sharedtypes.SourceSegmentPlan) int32 {
	if len(sourceSegments) <= 1 {
		return 1
	}
	parallelism := int32(1)
	for range sourceSegments[1:] {
		if parallelism == math.MaxInt32 {
			return parallelism
		}
		parallelism++
	}
	return parallelism
}

func shouldUseRelayRdmaOwnerFetch(parent sharedtypes.TransferSpec) bool {
	// Large full-directory fanout runs already complete the owner's source-half
	// before owner-fetch begins. Re-reading the peer staging file over the
	// regular SINGLE_SOURCE_DIRECT RDMA path is slower than relay-memory fetch,
	// but it avoids the later-iteration heap corruption currently triggered by
	// relay export session churn on multi-file directories.
	return len(parent.LogicalManifest.Files) <= 1 && !isDirectoryPerFileFanoutSubTask(parent)
}

func isDirectoryPerFileFanoutSubTask(parent sharedtypes.TransferSpec) bool {
	return parent.TransferMode == sharedtypes.TransferModePartialPullAllGather &&
		parent.PreserveExisting &&
		len(parent.LogicalManifest.Files) == 1
}

func buildCollectiveRelaySourceSegments(parent sharedtypes.TransferSpec, owner sharedtypes.CollectivePeerPlan, payloads []sharedtypes.CollectiveChunkPayload) []sharedtypes.SourceSegmentPlan {
	if !shouldUseRelayRdmaOwnerFetch(parent) {
		return nil
	}
	grouped := make(map[string]*sharedtypes.SourceSegmentPlan)
	order := make([]string, 0)
	for _, payload := range payloads {
		if payload.Chunk.Size <= 0 || payload.RelayRDMA == nil || payload.RelayRDMA.SessionID == "" {
			continue
		}
		key := payload.RelayRDMA.SessionID
		segment, ok := grouped[key]
		if !ok {
			sourceID := fmt.Sprintf("collective-%s-%d-%s", owner.NodeName, owner.Rank, key)
			segment = &sharedtypes.SourceSegmentPlan{
				SourceID: sourceID,
				SourceEndpoint: sharedtypes.SourceEndpoint{
					SourceID:   sourceID,
					SourceType: "peer-relay",
					Endpoint:   owner.Endpoint,
					Path:       owner.StagingPath,
					NodeName:   owner.NodeName,
					RelayRDMA:  cloneRelayRDMAHint(payload.RelayRDMA),
				},
			}
			grouped[key] = segment
			order = append(order, key)
		}
		segment.ByteRanges = append(segment.ByteRanges, sharedtypes.ByteRange{
			RelativePath: payload.Chunk.RelativePath,
			Start:        payload.Chunk.Offset,
			End:          payload.Chunk.Offset + payload.Chunk.Size,
			RelayOffset:  payload.RelayOffset,
		})
	}
	segments := make([]sharedtypes.SourceSegmentPlan, 0, len(order))
	for _, key := range order {
		segment := grouped[key]
		sort.SliceStable(segment.ByteRanges, func(i, j int) bool {
			if segment.ByteRanges[i].RelativePath != segment.ByteRanges[j].RelativePath {
				return segment.ByteRanges[i].RelativePath < segment.ByteRanges[j].RelativePath
			}
			return segment.ByteRanges[i].Start < segment.ByteRanges[j].Start
		})
		segments = append(segments, *segment)
	}
	return segments
}

type relayFanoutRole struct {
	selfPeer sharedtypes.CollectivePeerPlan
	rootPeer sharedtypes.CollectivePeerPlan
	isRoot   bool
}

func resolveRelayFanoutRole(spec sharedtypes.TransferSpec) (relayFanoutRole, bool) {
	if spec.TransferMode != sharedtypes.TransferModePartialPullAllGather || spec.CollectiveSpec.Ring == nil {
		return relayFanoutRole{}, false
	}
	if spec.CollectiveSpec.Ring.WorldSize != 2 || len(spec.CollectiveSpec.Peers) != 2 {
		return relayFanoutRole{}, false
	}
	var (
		foundSelf bool
		rootPeer  sharedtypes.CollectivePeerPlan
		role      relayFanoutRole
	)
	for _, peer := range spec.CollectiveSpec.Peers {
		if peer.Rank == spec.CollectiveSpec.Ring.Rank {
			role.selfPeer = peer
			foundSelf = true
		}
		if countValidRanges(peer.OwnedRanges) > 0 {
			if rootPeer.NodeName != "" {
				return relayFanoutRole{}, false
			}
			rootPeer = peer
		}
	}
	if !foundSelf || rootPeer.NodeName == "" {
		return relayFanoutRole{}, false
	}
	role.rootPeer = rootPeer
	role.isRoot = role.selfPeer.Rank == rootPeer.Rank
	return role, true
}

func payloadsToByteRanges(payloads []sharedtypes.CollectiveChunkPayload) []sharedtypes.ByteRange {
	ranges := make([]sharedtypes.ByteRange, 0, len(payloads))
	for _, payload := range payloads {
		if payload.Chunk.Size <= 0 {
			continue
		}
		ranges = append(ranges, sharedtypes.ByteRange{
			RelativePath: payload.Chunk.RelativePath,
			Start:        payload.Chunk.Offset,
			End:          payload.Chunk.Offset + payload.Chunk.Size,
		})
	}
	return ranges
}

func (a *Adapter) executeRingCollectiveOverAPI(ctx context.Context, spec sharedtypes.TransferSpec, openFiles map[string]*os.File) (sharedtypes.TransferResult, sharedtypes.TransportPath, error) {
	peerByRank := make(map[int32]sharedtypes.CollectivePeerPlan, len(spec.CollectiveSpec.Peers))
	for _, peer := range spec.CollectiveSpec.Peers {
		peerByRank[peer.Rank] = peer
	}
	selfRank := spec.CollectiveSpec.Ring.Rank
	worldSize := spec.CollectiveSpec.Ring.WorldSize
	if worldSize <= 1 {
		return sharedtypes.TransferResult{}, sharedtypes.TransportPathRDMA, nil
	}

	selfPeer, ok := peerByRank[selfRank]
	if !ok {
		return sharedtypes.TransferResult{}, "", fmt.Errorf("self peer rank %d not found", selfRank)
	}
	outboundPayloads, err := a.buildLocalPayloads(spec, selfPeer.OwnedRanges, spec.TargetTempPath)
	if err != nil {
		return sharedtypes.TransferResult{}, "", err
	}

	result := sharedtypes.TransferResult{}
	transportPath := sharedtypes.TransportPathTCPFallback
	sessionID := collectiveSessionID(spec)
	timeout := collectiveTimeout(spec)
	for iteration := int32(1); iteration < worldSize; iteration++ {
		if len(outboundPayloads) > 0 {
			if err := a.pushCollectivePayloads(ctx, spec, sessionID, iteration, outboundPayloads); err != nil {
				return sharedtypes.TransferResult{}, "", err
			}
		}
		ownerRank := ((selfRank-iteration)%worldSize + worldSize) % worldSize
		expectedRanges := peerByRank[ownerRank].OwnedRanges
		if countValidRanges(expectedRanges) == 0 {
			outboundPayloads = nil
			continue
		}
		inboundPayloads, err := a.waitCollectivePayloads(ctx, spec, sessionID, iteration, expectedRanges, timeout)
		if err != nil {
			return sharedtypes.TransferResult{}, "", err
		}
		stepResult, err := a.applyCollectivePayloads(spec.TargetTempPath, openFiles, inboundPayloads)
		if err != nil {
			return sharedtypes.TransferResult{}, "", err
		}
		result = mergeTransferResults(result, stepResult)
		outboundPayloads = inboundPayloads
		a.logger.Debug("completed ring iteration", "taskID", spec.TaskID, "sessionID", sessionID, "iteration", iteration, "receivedChunkCount", len(inboundPayloads))
	}
	return result, transportPath, nil
}

func (a *Adapter) executeOwnerFetchFallback(ctx context.Context, spec sharedtypes.TransferSpec, openFiles map[string]*os.File) (sharedtypes.TransferResult, sharedtypes.TransportPath, error) {
	peerByRank := collectivePeersByRank(spec.CollectiveSpec.Peers)
	if spec.CollectiveSpec.Ring.WorldSize <= 1 {
		return sharedtypes.TransferResult{}, sharedtypes.TransportPathTCPFallback, nil
	}
	result := sharedtypes.TransferResult{}
	transportPath := sharedtypes.TransportPathTCPFallback
	for iteration := int32(1); iteration < spec.CollectiveSpec.Ring.WorldSize; iteration++ {
		ownerRank := collectiveOwnerRank(spec.CollectiveSpec.Ring.Rank, spec.CollectiveSpec.Ring.WorldSize, iteration)
		ownerPeer, jobs, err := buildOwnerFetchFallbackJobs(peerByRank, ownerRank)
		if err != nil {
			a.logger.Error("build owner-fetch fallback jobs failed", "taskID", spec.TaskID, "iteration", iteration, "ownerRank", ownerRank, "err", err)
			return sharedtypes.TransferResult{}, "", err
		}
		if len(jobs) == 0 {
			continue
		}
		stepResult, err := a.executeOwnerFetchFallbackIteration(ctx, ownerFetchFallbackRun{
			spec:      spec,
			openFiles: openFiles,
			iteration: iteration,
			ownerRank: ownerRank,
			ownerPeer: ownerPeer,
			jobs:      jobs,
		})
		if err != nil {
			return sharedtypes.TransferResult{}, "", err
		}
		result = mergeTransferResults(result, stepResult)
	}
	return result, transportPath, nil
}

func collectivePeersByRank(peers []sharedtypes.CollectivePeerPlan) map[int32]sharedtypes.CollectivePeerPlan {
	peerByRank := make(map[int32]sharedtypes.CollectivePeerPlan, len(peers))
	for _, peer := range peers {
		peerByRank[peer.Rank] = peer
	}
	return peerByRank
}

func collectiveOwnerRank(selfRank, worldSize, iteration int32) int32 {
	return ((selfRank-iteration)%worldSize + worldSize) % worldSize
}

func buildOwnerFetchFallbackJobs(
	peerByRank map[int32]sharedtypes.CollectivePeerPlan,
	ownerRank int32,
) (sharedtypes.CollectivePeerPlan, []transferJob, error) {
	ownerPeer, ok := peerByRank[ownerRank]
	if !ok {
		return sharedtypes.CollectivePeerPlan{}, nil, fmt.Errorf("collective owner rank %d not found", ownerRank)
	}
	jobs := make([]transferJob, 0, len(ownerPeer.OwnedRanges))
	for _, rng := range ownerPeer.OwnedRanges {
		if rng.End <= rng.Start {
			continue
		}
		jobs = append(jobs, newOwnerFetchTransferJob(ownerPeer, ownerRank, rng))
	}
	return ownerPeer, jobs, nil
}

func newOwnerFetchTransferJob(
	ownerPeer sharedtypes.CollectivePeerPlan,
	ownerRank int32,
	rng sharedtypes.ByteRange,
) transferJob {
	sourceID := fmt.Sprintf("collective-%s-%d", ownerPeer.NodeName, ownerRank)
	return transferJob{
		source: sharedtypes.SourceSegmentPlan{
			SourceID: sourceID,
			SourceEndpoint: sharedtypes.SourceEndpoint{
				SourceID:   sourceID,
				SourceType: sourceTypePeer,
				Endpoint:   ownerPeer.Endpoint,
				Path:       ownerPeer.StagingPath,
				NodeName:   ownerPeer.NodeName,
			},
		},
		relativePath: rng.RelativePath,
		offset:       rng.Start,
		length:       rng.End - rng.Start,
	}
}

func (a *Adapter) executeOwnerFetchFallbackIteration(ctx context.Context, run ownerFetchFallbackRun) (sharedtypes.TransferResult, error) {
	a.logger.Debug("executing owner-fetch fallback iteration", "taskID", run.spec.TaskID, "iteration", run.iteration, "ownerRank", run.ownerRank, "pullNode", run.ownerPeer.NodeName, "jobCount", len(run.jobs))
	stepResult, err := a.executeJobs(ctx, run.spec.TargetTempPath, run.jobs, run.openFiles, len(run.jobs), effectiveChunkRetryLimit(run.spec))
	if err != nil {
		a.logger.Error("execute owner-fetch fallback iteration failed", "taskID", run.spec.TaskID, "iteration", run.iteration, "ownerRank", run.ownerRank, "pullNode", run.ownerPeer.NodeName, "jobCount", len(run.jobs), "err", err)
		return sharedtypes.TransferResult{}, err
	}
	return stepResult, nil
}

func (*Adapter) buildLocalPayloads(spec sharedtypes.TransferSpec, ranges []sharedtypes.ByteRange, targetTempPath string) ([]sharedtypes.CollectiveChunkPayload, error) {
	payloads := make([]sharedtypes.CollectiveChunkPayload, 0, len(ranges))
	for _, rng := range ranges {
		if rng.End <= rng.Start {
			continue
		}
		data, err := readLocalChunkData(targetTempPath, rng.RelativePath, rng.Start, rng.End-rng.Start)
		if err != nil {
			return nil, err
		}
		sum := crc32.Checksum(data, crc32cTable)
		payloads = append(payloads, sharedtypes.CollectiveChunkPayload{
			Chunk: sharedtypes.TransferredChunk{
				ChunkID:      fmt.Sprintf("%s:%s:%d", spec.TargetTempPath, rng.RelativePath, rng.Start),
				FilePath:     filepath.ToSlash(filepath.Join(targetTempPath, filepath.FromSlash(rng.RelativePath))),
				RelativePath: rng.RelativePath,
				Offset:       rng.Start,
				Size:         int64(len(data)),
				CRC32C:       encodeCRC32C(sum),
				SourceID:     targetNodeID(spec),
			},
			Data: data,
		})
	}
	return payloads, nil
}

func (*Adapter) applyCollectivePayloads(targetTempPath string, openFiles map[string]*os.File, payloads []sharedtypes.CollectiveChunkPayload) (sharedtypes.TransferResult, error) {
	result := sharedtypes.TransferResult{}
	for _, payload := range payloads {
		targetFile := openFiles[payload.Chunk.RelativePath]
		if targetFile == nil {
			return sharedtypes.TransferResult{}, fmt.Errorf("target file %s not prepared", payload.Chunk.RelativePath)
		}
		if payload.Chunk.CRC32C != "" {
			sum := crc32.Checksum(payload.Data, crc32cTable)
			if encodeCRC32C(sum) != payload.Chunk.CRC32C {
				return sharedtypes.TransferResult{}, fmt.Errorf("collective chunk crc mismatch for %s@%d", payload.Chunk.RelativePath, payload.Chunk.Offset)
			}
		}
		if _, err := targetFile.WriteAt(payload.Data, payload.Chunk.Offset); err != nil {
			return sharedtypes.TransferResult{}, errutil.Wrap(fmt.Sprintf("write collective payload %s at offset %d", payload.Chunk.RelativePath, payload.Chunk.Offset), err)
		}
		payload.Chunk.FilePath = filepath.ToSlash(filepath.Join(targetTempPath, filepath.FromSlash(payload.Chunk.RelativePath)))
		result.BytesTransferred += int64(len(payload.Data))
		result.ChunkCount++
		result.SucceededChunks++
		result.TransferredChunks = append(result.TransferredChunks, payload.Chunk)
	}
	return result, nil
}