/**
 * Copyright (c) Huawei Technologies Co., Ltd. 2026. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/**
 * Description: Persistence API implementation for aggregated slot-based storage.
 */

#include "datasystem/common/l2cache/aggregated_persistence_api.h"

#include "datasystem/common/inject/inject_point.h"
#include "datasystem/common/log/log.h"
#include "datasystem/common/util/status_helper.h"

namespace datasystem {
AggregatedPersistenceApi::AggregatedPersistenceApi(std::shared_ptr<StorageClient> storageClient)
    : storageClient_(std::move(storageClient))
{
}

AggregatedPersistenceApi::~AggregatedPersistenceApi() = default;

Status AggregatedPersistenceApi::Init()
{
    CHECK_FAIL_RETURN_STATUS(storageClient_ != nullptr, K_RUNTIME_ERROR, "storageClient_ is nullptr");
    return storageClient_->Init();
}

Status AggregatedPersistenceApi::Save(const std::string &objectKey, uint64_t version, int64_t timeoutMs,
                                      const std::shared_ptr<std::iostream> &body, uint64_t asyncElapse,
                                      WriteMode writeMode, uint32_t ttlSecond)
{
    INJECT_POINT("PersistenceApi.Save.timeout", [&timeoutMs](int timeout) {
        timeoutMs = timeout;
        return Status::OK();
    });
    LOG(INFO) << FormatString("invoke save object to aggregated persistence. objectKey:%s, version %llu", objectKey,
                              version);
    INJECT_POINT("persistence.service.save");
    return storageClient_->Save(objectKey, version, timeoutMs, body, asyncElapse, writeMode, ttlSecond);
}

Status AggregatedPersistenceApi::Get(const std::string &objectKey, uint64_t version, int64_t timeoutMs,
                                     std::shared_ptr<std::stringstream> &content)
{
    INJECT_POINT("persistence.service.get", [&content](std::string mockContent) {
        *content.get() << mockContent;
        return Status::OK();
    });
    auto rc = storageClient_->Get(objectKey, version, timeoutMs, content);
    if (rc.GetCode() == StatusCode::K_NOT_FOUND || rc.GetCode() == StatusCode::K_NOT_FOUND_IN_L2CACHE) {
        return GetWithoutVersion(objectKey, timeoutMs, 0, content);
    }
    return rc;
}

Status AggregatedPersistenceApi::GetWithoutVersion(const std::string &objectKey, int64_t timeoutMs, uint64_t minVersion,
                                                   std::shared_ptr<std::stringstream> &content)
{
    return storageClient_->GetWithoutVersion(objectKey, timeoutMs, minVersion, content);
}

Status AggregatedPersistenceApi::Del(const std::string &objectKey, uint64_t maxVerToDelete, bool deleteAllVersion,
                                     uint64_t asyncElapse, const uint64_t *const objectVersion,
                                     bool listIncompleteVersions)
{
    INJECT_POINT("persistence.service.del");
    LOG(INFO) << FormatString("invoke delete object from aggregated persistence. objectKey: %s, max version is %llu",
                              objectKey, maxVerToDelete);
    (void)objectVersion;
    (void)listIncompleteVersions;
    return storageClient_->Delete(objectKey, maxVerToDelete, deleteAllVersion, asyncElapse);
}

Status AggregatedPersistenceApi::PreloadSlot(const std::string &sourceWorkerAddress, uint32_t slotId,
                                             const SlotPreloadCallback &callback)
{
    return storageClient_->PreloadSlot(sourceWorkerAddress, slotId, callback);
}

Status AggregatedPersistenceApi::MergeSlot(const std::string &sourceWorkerAddress, uint32_t slotId)
{
    return storageClient_->MergeSlot(sourceWorkerAddress, slotId);
}

Status AggregatedPersistenceApi::CleanupLocalSlots()
{
    CHECK_FAIL_RETURN_STATUS(storageClient_ != nullptr, K_RUNTIME_ERROR, "storageClient_ is nullptr");
    return storageClient_->CleanupLocalSlots();
}

std::string AggregatedPersistenceApi::GetL2CacheRequestSuccessRate() const
{
    if (storageClient_ == nullptr) {
        return "";
    }
    return storageClient_->GetRequestSuccessRate();
}
}  // namespace datasystem