/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */
#include "CheckpointOptions.h"
#include "SavepointType.h"
#include "common.h"

CheckpointOptions::CheckpointOptions(
    SnapshotType *checkpointType,
    std::shared_ptr<CheckpointStorageLocationReference> targetLocation,
    AlignmentType alignmentType, long alignedCheckpointTimeout)
    : checkpointType_(checkpointType), targetLocation_(targetLocation),
      alignmentType_(alignmentType), alignedCheckpointTimeout_(alignedCheckpointTimeout)
{
    if (alignmentType == AlignmentType::UNALIGNED && checkpointType->IsSavepoint()) {
        throw std::invalid_argument("Savepoint can't be unaligned");
    }

    if (alignedCheckpointTimeout != NO_ALIGNED_CHECKPOINT_TIME_OUT &&
        alignmentType == AlignmentType::UNALIGNED) {
        throw std::invalid_argument(
            "Unaligned checkpoint can't have timeout (" +
            std::to_string(alignedCheckpointTimeout) + ")");
    }

    if (targetLocation == nullptr) {
        throw std::invalid_argument("targetLocation must not be null");
    }

    if (checkpointType == nullptr) {
        throw std::invalid_argument("checkpointType must not be null");
    }
}

CheckpointOptions::CheckpointOptions(
    SnapshotType *checkpointType,
    std::shared_ptr<CheckpointStorageLocationReference> targetLocation)
    : checkpointType_(checkpointType), targetLocation_(targetLocation),
      alignmentType_(AlignmentType::ALIGNED), alignedCheckpointTimeout_(NO_ALIGNED_CHECKPOINT_TIME_OUT)
{
}

CheckpointOptions *CheckpointOptions::ToRuntimeAlignedNoTimeout() const 
{
    if (!IsExactlyOnceMode()) {
        return const_cast<CheckpointOptions *>(this);
    }
    
    if (alignmentType_ == AlignmentType::ALIGNED &&
                alignedCheckpointTimeout_ == NO_ALIGNED_CHECKPOINT_TIME_OUT) {
        return const_cast<CheckpointOptions *>(this);
    }
    
    return AlignedNoTimeout(*checkpointType_, targetLocation_);
}

CheckpointOptions *CheckpointOptions::NotExactlyOnce(
    SnapshotType &type, std::shared_ptr<CheckpointStorageLocationReference> location)
{
    return new CheckpointOptions(&type, location, AlignmentType::AT_LEAST_ONCE,
                                 NO_ALIGNED_CHECKPOINT_TIME_OUT);
}

CheckpointOptions *CheckpointOptions::AlignedNoTimeout(
    SnapshotType &type, std::shared_ptr<CheckpointStorageLocationReference> location)
{
    return new CheckpointOptions(&type, location, AlignmentType::ALIGNED,
                                 NO_ALIGNED_CHECKPOINT_TIME_OUT);
}

CheckpointOptions *CheckpointOptions::Unaligned(
    SnapshotType &type, std::shared_ptr<CheckpointStorageLocationReference> location)
{
    if (type.IsSavepoint()) {
        throw std::invalid_argument("Savepoints can not be unaligned");
    }
    return new CheckpointOptions(&type, location, AlignmentType::UNALIGNED,
                                 NO_ALIGNED_CHECKPOINT_TIME_OUT);
}

CheckpointOptions *CheckpointOptions::AlignedWithTimeout(
    SnapshotType &type, std::shared_ptr<CheckpointStorageLocationReference> location,
    long alignedCheckpointTimeout)
{
    if (type.IsSavepoint()) {
        throw std::invalid_argument("Savepoints can not be unaligned");
    }
    return new CheckpointOptions(&type, location, AlignmentType::ALIGNED,
                                 alignedCheckpointTimeout);
}

CheckpointOptions *CheckpointOptions::ForceAligned(
    SnapshotType &type, std::shared_ptr<CheckpointStorageLocationReference> location,
    long alignedCheckpointTimeout)
{
    if (type.IsSavepoint()) {
        throw std::invalid_argument("Savepoints can not be unaligned");
    }
    return new CheckpointOptions(&type, location, AlignmentType::FORCED_ALIGNED,
                                 alignedCheckpointTimeout);
}
CheckpointOptions *CheckpointOptions::FromJson(nlohmann::json &config)
{
    auto alignmentTypeStr = config["alignment"].get<std::string>();
    AlignmentType alignmentType;
    if (alignmentTypeStr == "AT_LEAST_ONCE") {
        alignmentType = AlignmentType::AT_LEAST_ONCE;
    } else if (alignmentTypeStr == "ALIGNED") {
        alignmentType = AlignmentType::ALIGNED;
    } else if (alignmentTypeStr == "UNALIGNED") {
        alignmentType = AlignmentType::UNALIGNED;
    } else if (alignmentTypeStr == "FORCED_ALIGNED") {
        alignmentType = AlignmentType::FORCED_ALIGNED;
    } else {
        INFO_RELEASE("Error:Unknown alignment type : " << alignmentTypeStr);
        throw std::invalid_argument("Unknown alignment type : " + alignmentTypeStr);
    }

    auto alignedCheckpointTimeout = config["alignedCheckpointTimeout"].get<long>();

    std::shared_ptr<CheckpointStorageLocationReference> targetLocation = nullptr;
    auto reference = config["targetLocation"]["referenceBytes"];
    if (reference.is_null()) {
        targetLocation = CheckpointStorageLocationReference::GetDefault();
    } else {
        auto encodedReference = Base64_decode(reference.get<std::string>());
        auto referenceBytes = std::make_shared<std::vector<uint8_t>>(encodedReference.begin(), encodedReference.end());
        if (referenceBytes == nullptr || referenceBytes->empty()){
            targetLocation = CheckpointStorageLocationReference::GetDefault();
        } else {
            targetLocation = std::make_shared<CheckpointStorageLocationReference>(referenceBytes);
        }
    }
    
    bool isSavepoint = config["checkpointType"]["name"].get<std::string>().find("Savepoint") != std::string::npos;
    if (isSavepoint){
        auto savepointFormatTypeStr = config["checkpointType"]["formatType"].get<std::string>();
        SavepointType *savepointType;
        SavepointFormatType savepointFormatType;
        if (savepointFormatTypeStr == "CANONICAL") {
            savepointFormatType = SavepointFormatType::CANONICAL;
        } else if (savepointFormatTypeStr == "NATIVE") {
            savepointFormatType = SavepointFormatType::NATIVE;
        } else {
            INFO_RELEASE("Error: Unknown savepoint formatType:"<<savepointFormatTypeStr);
            throw std::invalid_argument("Unknown savepoint formatType : " + savepointFormatTypeStr);
        }

        auto savepointTypeStr = config["checkpointType"]["name"].get<std::string>();
        if (savepointTypeStr == "Savepoint") {
            savepointType = SavepointType::savepoint(savepointFormatType);
        } else if (savepointTypeStr == "Terminate Savepoint") {
            savepointType = SavepointType::terminate(savepointFormatType);
        } else if (savepointTypeStr == "Suspend Savepoint") {
            savepointType = SavepointType::suspend(savepointFormatType);
        } else {
            INFO_RELEASE("Error: Unknown savepoint type: "<<savepointTypeStr);
            throw std::invalid_argument("Unknown savepoint type : " + savepointTypeStr);
        }
        return new CheckpointOptions(savepointType, targetLocation, alignmentType, alignedCheckpointTimeout);
    } else {
        auto checkpointTypeStr = config["checkpointType"]["name"].get<std::string>();
        CheckpointType *checkpointType;
        if (checkpointTypeStr == "Checkpoint") {
            checkpointType = CheckpointType::CHECKPOINT;
        } else if (checkpointTypeStr == "Full Checkpoint") {
            checkpointType = CheckpointType::FULL_CHECKPOINT;
        } else {
            INFO_RELEASE("Error: Unknown checkpoint type: "<<checkpointTypeStr);
            throw std::invalid_argument("Unknown checkpoint type : " + checkpointTypeStr);
        }
        return new CheckpointOptions(checkpointType, targetLocation, alignmentType, alignedCheckpointTimeout);
    }
}

CheckpointOptions *CheckpointOptions::ForConfig(
    SnapshotType &checkpointType,
     std::shared_ptr<CheckpointStorageLocationReference> locationReference,
    bool isExactlyOnceMode, bool isUnalignedEnabled,
    long alignedCheckpointTimeout)
{
    if (!isExactlyOnceMode) {
        return NotExactlyOnce(checkpointType, locationReference);
    } else if (checkpointType.IsSavepoint()) {
        return AlignedNoTimeout(checkpointType, locationReference);
    } else if (!isUnalignedEnabled) {
        return AlignedNoTimeout(checkpointType, locationReference);
    } else if (alignedCheckpointTimeout == 0 ||
             alignedCheckpointTimeout == NO_ALIGNED_CHECKPOINT_TIME_OUT) {
        return Unaligned(checkpointType, locationReference);
    } else {
        return AlignedWithTimeout(checkpointType, locationReference,
                                  alignedCheckpointTimeout);
    }
}

long CheckpointOptions::GetAlignedCheckpointTimeout() const
{
    return alignedCheckpointTimeout_;
}

bool CheckpointOptions::NeedsAlignment() const
{
    return IsExactlyOnceMode() &&
           (checkpointType_->IsSavepoint() || !IsUnalignedCheckpoint());
}

bool CheckpointOptions::operator==(const CheckpointOptions &other) const
{
    const bool sameLocation =
        (targetLocation_ == other.targetLocation_) ||
        (targetLocation_ != nullptr && other.targetLocation_ != nullptr &&
         ((*targetLocation_) == (*other.targetLocation_)));

    return *checkpointType_ == (*other.checkpointType_) &&
           sameLocation &&
           alignmentType_ == other.alignmentType_ &&
           alignedCheckpointTimeout_ == other.alignedCheckpointTimeout_;
}

bool CheckpointOptions::operator!=(const CheckpointOptions &other) const
{
    return !(*this == other);
}

std::string CheckpointOptions::ToString() const
{
    nlohmann::json json;

    if (checkpointType_ != nullptr)
    {
        json["checkpointType"] = checkpointType_->ToString();
    }
    if (targetLocation_ != nullptr)
    {
        json["targetLocation"] = targetLocation_->ToString();
    } else{
        INFO_RELEASE("savepoint: targetLocation_ == null:");
        json["targetLocation"]["referenceBytes"] =  "(default)";
    }
    if (alignmentType_ == AlignmentType::AT_LEAST_ONCE) {
        json["alignmentType"] = "AT_LEAST_ONCE";
    } else if (alignmentType_ == AlignmentType::ALIGNED) {
        json["alignmentType"] = "ALIGNED";
    } else if (alignmentType_ == AlignmentType::UNALIGNED) {
        json["alignmentType"] = "UNALIGNED";
    } else if (alignmentType_ == AlignmentType::FORCED_ALIGNED) {
        json["alignmentType"] = "FORCED_ALIGNED";
    }
    json["alignedCheckpointTimeout"] = alignedCheckpointTimeout_;
    return json.dump();
}

nlohmann::json CheckpointOptions::ToJson() const
{
    nlohmann::json json;

    if (checkpointType_ != nullptr) {
        json["checkpointType"] = checkpointType_->ToJson();
    }
    if (targetLocation_ != nullptr) {
        json["targetLocation"] = targetLocation_->ToJson();
    } else {
        INFO_RELEASE("savepoint: targetLocation_ == null:");
        json["targetLocation"]["referenceBytes"] =  "(default)";
    }
    if (alignmentType_ == AlignmentType::AT_LEAST_ONCE) {
        json["alignmentType"] = "AT_LEAST_ONCE";
    } else if (alignmentType_ == AlignmentType::ALIGNED) {
        json["alignmentType"] = "ALIGNED";
    } else if (alignmentType_ == AlignmentType::UNALIGNED) {
        json["alignmentType"] = "UNALIGNED";
    } else if (alignmentType_ == AlignmentType::FORCED_ALIGNED) {
        json["alignmentType"] = "FORCED_ALIGNED";
    }
    json["alignedCheckpointTimeout"] = alignedCheckpointTimeout_;
    return json;
}

CheckpointOptions *CheckpointOptions::ForCheckpointWithDefaultLocation()
{
    return CHECKPOINT_AT_DEFAULT_LOCATION;
}

CheckpointOptions *CheckpointOptions::ToUnaligned() const
{
    if (alignmentType_ != AlignmentType::ALIGNED) {
        throw std::logic_error(
            "toUnaligned() can only be called on ALIGNED checkpoints");
    }
    return Unaligned(*checkpointType_, targetLocation_);
}

CheckpointOptions *CheckpointOptions::CHECKPOINT_AT_DEFAULT_LOCATION =
    new CheckpointOptions(CheckpointType::CHECKPOINT,
                          CheckpointStorageLocationReference::GetDefault());

bool CheckpointOptions::IsTimeoutable() const
{
    if (alignmentType_ == AlignmentType::FORCED_ALIGNED) {
        return false;
    }
    return alignmentType_ == AlignmentType::ALIGNED &&
           (alignedCheckpointTimeout_ > 0 &&
            alignedCheckpointTimeout_ != NO_ALIGNED_CHECKPOINT_TIME_OUT);
}

SnapshotType *CheckpointOptions::GetCheckpointType() const
{
    return checkpointType_;
}

 std::shared_ptr<CheckpointStorageLocationReference> CheckpointOptions::GetTargetLocation()
    const
{
    return targetLocation_;
}

CheckpointOptions::AlignmentType CheckpointOptions::GetAlignment() const
{
    return alignmentType_;
}

bool CheckpointOptions::IsExactlyOnceMode() const
{
    return alignmentType_ != AlignmentType::AT_LEAST_ONCE;
}

bool CheckpointOptions::IsUnalignedCheckpoint() const
{
    return alignmentType_ == AlignmentType::UNALIGNED;
}

bool CheckpointOptions::NeedsChannelState() const
{
    return IsUnalignedCheckpoint() || IsTimeoutable();
}

CheckpointOptions *CheckpointOptions::WithUnalignedSupported()
{
    if (alignmentType_ == AlignmentType::FORCED_ALIGNED) {
        if (alignedCheckpointTimeout_ != NO_ALIGNED_CHECKPOINT_TIME_OUT) {
            return AlignedWithTimeout(*checkpointType_, targetLocation_,
                                      alignedCheckpointTimeout_);
        } else {
            return Unaligned(*checkpointType_, targetLocation_);
        }
    }
    return this;
}

CheckpointOptions *CheckpointOptions::WithUnalignedUnsupported()
{
    if (NeedsChannelState()) {
        return ForceAligned(*checkpointType_, targetLocation_,
                            alignedCheckpointTimeout_);
    }
    return this;
}