/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */
#ifndef OMNISTREAM_LOCALRECOVERYDIRECTORYPROVIDERIMPL_H
#define OMNISTREAM_LOCALRECOVERYDIRECTORYPROVIDERIMPL_H

#include "LocalRecoveryDirectoryProvider.h"
#include "runtime/executiongraph/JobIDPOD.h"
#include "runtime/jobgraph/JobVertexID.h"
#include <vector>
#include <string>
#include <climits>
#include <filesystem>

class LocalRecoveryDirectoryProviderImpl : public LocalRecoveryDirectoryProvider {
public:
    LocalRecoveryDirectoryProviderImpl(
        const std::vector<std::filesystem::path>& allocationBaseDirs,
        const omnistream::JobIDPOD& jobID,
        const omnistream::JobVertexID& jobVertexID,
        int subtaskIndex)
        : allocationBaseDirs_(allocationBaseDirs),
          jobID_(jobID),
          jobVertexID_(jobVertexID),
          subtaskIndex_(subtaskIndex) {
        if (allocationBaseDirs_.empty()) {
            throw std::invalid_argument("At least one base directory required");
        }
        
        for (const auto& dir : allocationBaseDirs_) {
            std::filesystem::create_directories(dir);
        }
    }

    std::filesystem::path AllocationBaseDirectory(long checkpointId) override
    {
        uint32_t narrowed = static_cast<uint32_t>(static_cast<int32_t>(checkpointId));
        uint32_t masked = narrowed & static_cast<uint32_t>(INT32_MAX);
        size_t index = masked % allocationBaseDirs_.size();

        return SelectAllocationBaseDirectory(index);
    }

    std::filesystem::path SubtaskBaseDirectory(long checkpointId) override
    {
        return AllocationBaseDirectory(checkpointId) / SubtaskDirString();
    }

    std::filesystem::path SubtaskSpecificCheckpointDirectory(long checkpointId) override
    {
        return SubtaskBaseDirectory(checkpointId) / CheckpointDirString(checkpointId);
    }

    std::filesystem::path SelectAllocationBaseDirectory(int idx) override
    {
        if (idx < 0 || idx >= static_cast<int>(allocationBaseDirs_.size())) {
            throw std::out_of_range("Invalid directory index");
        }
        return allocationBaseDirs_[idx];
    }

    std::filesystem::path SelectSubtaskBaseDirectory(int idx) override
    {
        return SelectAllocationBaseDirectory(idx) / SubtaskDirString();
    }

    int AllocationBaseDirsCount() const override
    {
        return static_cast<int>(allocationBaseDirs_.size());
    }

    std::string ToString() const override
    {
        std::string result = "LocalRecoveryDirectoryProvider{AllocationBaseDirectories=[";
        for (size_t i = 0; i < allocationBaseDirs_.size(); ++i) {
            if (i > 0) result += ", ";
            result += allocationBaseDirs_[i].string();
        }
        result += "], JobID=" + jobID_.toString() +
                  ", JobVertexID=" + jobVertexID_.toString() +
                  ", SubtaskIndex=" + std::to_string(subtaskIndex_) + "}";
        return result;
    }

    std::vector<std::filesystem::path> GetPaths() const override
    {
        return allocationBaseDirs_;
    }

    int GetSubIndex() const override
    {
        return subtaskIndex_;
    }

    void SetJobIdHexStr(std::string hexStr) override
    {
        jobIdHexStr = hexStr;
    }
    std::string GetJobIdHexStr()  override
    {
        return jobIdHexStr;
    }
    void SetVertexIdHexStr(std::string hexStr)  override
    {
        vertexIdHexStr = hexStr;
    }
    std::string GetVertexIdHexStr()  override
    {
        return vertexIdHexStr;
    }

private:
    std::string SubtaskDirString() const
    {
        return "jid_" + jobID_.AbstractIDPOD::toString() +
               "/vtx_" + jobVertexID_.AbstractIDPOD::toString() +
               "_sti_" + std::to_string(subtaskIndex_);
    }

    std::string CheckpointDirString(long checkpointId) const
    {
        return "chk_" + std::to_string(checkpointId);
    }

    std::vector<std::filesystem::path> allocationBaseDirs_;
    omnistream::JobIDPOD jobID_;
    omnistream::JobVertexID jobVertexID_;
    int subtaskIndex_;
    std::string jobIdHexStr;
    std::string vertexIdHexStr;
};

#endif // OMNISTREAM_LOCALRECOVERYDIRECTORYPROVIDERIMPL_H