/**
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/**
 * Description: Client c wrapper for state cache client.
 */
#include "datasystem/c_api/kv_client_c_wrapper.h"

#include <cstddef>
#include <cstring>
#include <functional>

#include <securec.h>

#include "datasystem/c_api/util.h"
#include "datasystem/client/object_cache/object_client_impl.h"
#include "datasystem/common/log/access_recorder.h"
#include "datasystem/common/log/log.h"
#include "datasystem/common/log/trace.h"
#include "datasystem/object/buffer.h"
#include "datasystem/utils/optional.h"
#include "datasystem/utils/status.h"
#include "datasystem/utils/string_view.h"
#include "kv_client_c_wrapper.h"
#include "status_definition.h"

KVClient_p KVCreateClient(const char *cWorkerHost, const int workerPort, const int timeOut, const char *token,
                          size_t tokenLen, const char *clientPublicKey, size_t cClientPublicKeyLen,
                          const char *clientPrivateKey, size_t clientPrivateKeyLen, const char *serverPublicKey,
                          size_t cServerPublicKeyLen, const char *accessKey, size_t cAccessKeyLen,
                          const char *secretKey, size_t secretKeyLen, const char *tenantId, size_t cTenantIdLen,
                          const char *enableCrossNodeConnection)
{
    datasystem::TraceGuard traceGuard = datasystem::Trace::Instance().SetRequestTraceUUID();
    return CreateObjectClient(cWorkerHost, workerPort, timeOut, token, tokenLen, clientPublicKey, cClientPublicKeyLen,
                              clientPrivateKey, clientPrivateKeyLen, serverPublicKey, cServerPublicKeyLen, accessKey,
                              cAccessKeyLen, secretKey, secretKeyLen, tenantId, cTenantIdLen,
                              enableCrossNodeConnection);
}

struct StatusC SCConnectWorker(KVClient_p clientPtr)
{
    datasystem::TraceGuard traceGuard = datasystem::Trace::Instance().SetRequestTraceUUID();
    return ConnectWorker(clientPtr);
}

struct StatusC SCUpdateAkSk(KVClient_p clientPtr, const char *cAccessKey, size_t cAccessKeyLen, const char *cSecretKey,
                            size_t cSecretKeyLen)
{
    auto client = reinterpret_cast<std::shared_ptr<datasystem::object_cache::ObjectClientImpl> *>(clientPtr);
    std::string accessKey(cAccessKey, cAccessKeyLen);
    accessKey.assign(cAccessKey, cAccessKeyLen);
    datasystem::SensitiveValue secretKey(cSecretKey, cSecretKeyLen);
    datasystem::Status rc = (*client)->UpdateAkSk(accessKey, secretKey);
    if (rc.IsError()) {
        return ToStatusC(rc);
    }
    return StatusC{ datasystem::K_OK, {} };
}

void SCFreeClient(KVClient_p clientPtr)
{
    FreeClient(clientPtr);
}

struct StatusC SCExecuteSet(KVClient_p clientPtr, const char *cKey, size_t keyLen, const char *cVal, size_t valLen,
                            const char *cWriteMode, uint32_t ttlSecond, const char *cExistenceOpt)
{
    std::string errorMsg;
    CheckNullptr(clientPtr, "clientPtr", errorMsg);
    CheckNullptr(cKey, "cKey", errorMsg);
    CheckNullptr(cVal, "cVal", errorMsg);
    CheckNullptr(cWriteMode, "cWriteMode", errorMsg);
    CheckNullptr(cExistenceOpt, "cExistenceOpt", errorMsg);
    if (!errorMsg.empty()) {
        return MakeStatusC(datasystem::K_INVALID, errorMsg);
    }
    auto client = reinterpret_cast<std::shared_ptr<datasystem::object_cache::ObjectClientImpl> *>(clientPtr);
    std::string key(cKey, keyLen);
    datasystem::StringView val(cVal, valLen);
    std::string writeMode(cWriteMode);
    std::string existenceOpt(cExistenceOpt);
    datasystem::SetParam setParam;
    StatusC s = InitSetParam(writeMode, ttlSecond, existenceOpt, setParam);
    if (s.code != datasystem::K_OK) {
        return s;
    }
    datasystem::Status rc = (*client)->Set(key, val, setParam);
    if (rc.IsError()) {
        return ToStatusC(rc);
    }
    return StatusC{ datasystem::K_OK, {} };
}

struct StatusC SCSet(KVClient_p clientPtr, const char *cKey, size_t keyLen, const char *cVal, size_t valLen,
                     const char *cWriteMode, uint32_t ttlSecond, const char *cExistenceOpt)
{
    datasystem::TraceGuard traceGuard = datasystem::Trace::Instance().SetRequestTraceUUID();
    auto access = datasystem::AccessRecorder::Object(datasystem::AccessRecorderKey::DS_KV_CLIENT_SET);
    StatusC rc = SCExecuteSet(clientPtr, cKey, keyLen, cVal, valLen, cWriteMode, ttlSecond, cExistenceOpt);
    std::string_view keyView = (cKey != nullptr) ? std::string_view(cKey, keyLen) : std::string_view("", 0);
    std::string_view writeModeView = (cWriteMode != nullptr) ? std::string_view(cWriteMode) : std::string_view("", 0);
    access.ObjectKeyRef(keyView).WriteModeText(writeModeView).TtlSecond(ttlSecond)
        .Result(rc.code, rc.errMsg).DataSize(valLen).Record();
    return rc;
}

struct StatusC SCExecuteSetValue(KVClient_p clientPtr, char **cKey, size_t *keyLen, const char *cVal, size_t valLen,
                                 const char *cWriteMode, uint32_t ttlSecond, const char *cExistenceOpt)
{
    std::string errorMsg;
    CheckNullptr(clientPtr, "clientPtr", errorMsg);
    CheckNullptr(cKey, "cKey", errorMsg);
    CheckNullptr(cVal, "cVal", errorMsg);
    CheckNullptr(cWriteMode, "cWriteMode", errorMsg);
    if (!errorMsg.empty()) {
        return MakeStatusC(datasystem::K_INVALID, errorMsg);
    }
    auto client = reinterpret_cast<std::shared_ptr<datasystem::object_cache::ObjectClientImpl> *>(clientPtr);
    datasystem::StringView val(cVal, valLen);
    std::string writeMode(cWriteMode);
    datasystem::SetParam setParam;
    StatusC s = InitSetParam(writeMode, ttlSecond, cExistenceOpt, setParam);
    if (s.code != datasystem::K_OK) {
        return s;
    }
    std::string key;
    datasystem::Status rc = (*client)->Set(val, setParam, key);
    if (rc.IsError()) {
        return ToStatusC(rc);
    }
    *cKey = StringToCString(key);
    if (*cKey == nullptr) {
        return MakeStatusC(datasystem::K_RUNTIME_ERROR, "Failed to construct the c_style character string.");
    }
    *keyLen = key.size();
    return StatusC{ datasystem::K_OK, {} };
}

struct StatusC SCSetValue(KVClient_p clientPtr, char **cKey, size_t *keyLen, const char *cVal, size_t valLen,
                          const char *cWriteMode, uint32_t ttlSecond, const char *cExistenceOpt)
{
    datasystem::TraceGuard traceGuard = datasystem::Trace::Instance().SetRequestTraceUUID();
    auto access = datasystem::AccessRecorder::Object(datasystem::AccessRecorderKey::DS_KV_CLIENT_SET);
    StatusC rc = SCExecuteSetValue(clientPtr, cKey, keyLen, cVal, valLen, cWriteMode, ttlSecond, cExistenceOpt);
    std::string_view objKey(cKey != nullptr && *cKey != nullptr ? *cKey : "");
    std::string_view writeModeView = (cWriteMode != nullptr) ? std::string_view(cWriteMode) : std::string_view("", 0);
    access.ObjectKeyRef(objKey).WriteModeText(writeModeView).TtlSecond(ttlSecond)
        .Result(rc.code, rc.errMsg).DataSize(valLen).Record();
    return rc;
}

struct StatusC SCExecuteGet(KVClient_p clientPtr, const char *cKey, const size_t keyLen, uint32_t ctimeoutms,
                            char **cVal, size_t *valLen)
{
    std::string errorMsg;
    CheckNullptr(clientPtr, "clientPtr", errorMsg);
    CheckNullptr(cKey, "cKey", errorMsg);
    CheckNullptr(cVal, "cVal", errorMsg);
    if (!errorMsg.empty()) {
        return MakeStatusC(datasystem::K_INVALID, errorMsg);
    }
    auto client = reinterpret_cast<std::shared_ptr<datasystem::object_cache::ObjectClientImpl> *>(clientPtr);
    std::string key(cKey, keyLen);
    std::vector<datasystem::Optional<datasystem::Buffer>> buffers;
    datasystem::Status rc = (*client)->Get({ key }, ctimeoutms, buffers);
    if (rc.IsError()) {
        return ToStatusC(rc);
    }
    StatusC s = BufferToCString(std::move(buffers[0]), (*client)->GetMemoryCopyThreadPool(), cVal, valLen);
    if (s.code != datasystem::K_OK) {
        return s;
    }
    return StatusC{ datasystem::K_OK, {} };
}

struct StatusC SCGet(KVClient_p clientPtr, const char *cKey, const size_t keyLen, uint32_t ctimeoutms, char **cVal,
                     size_t *valLen)
{
    datasystem::TraceGuard traceGuard = datasystem::Trace::Instance().SetRequestTraceUUID();
    auto access = datasystem::AccessRecorder::Object(datasystem::AccessRecorderKey::DS_KV_CLIENT_GET);
    StatusC rc = SCExecuteGet(clientPtr, cKey, keyLen, ctimeoutms, cVal, valLen);
    datasystem::Status accessRc = (rc.code == datasystem::K_NOT_FOUND) ? datasystem::Status::OK()
        : datasystem::Status(static_cast<datasystem::StatusCode>(rc.code), rc.errMsg);
    uint64_t dataLen = (rc.code == datasystem::K_OK && valLen != nullptr) ? *valLen : 0;
    std::string_view keyView = (cKey != nullptr) ? std::string_view(cKey, keyLen) : std::string_view("", 0);
    access.ObjectKeyRef(keyView)
        .TimeoutMs(ctimeoutms)
        .Result(accessRc)
        .DataSize(dataLen)
        .Record();
    return rc;
}

struct StatusC SCExecuteDel(KVClient_p clientPtr, const char *cKey, const size_t keyLen)
{
    std::string errorMsg;
    CheckNullptr(clientPtr, "clientPtr", errorMsg);
    CheckNullptr(cKey, "cKey", errorMsg);
    if (!errorMsg.empty()) {
        return MakeStatusC(datasystem::K_INVALID, errorMsg);
    }
    auto client = reinterpret_cast<std::shared_ptr<datasystem::object_cache::ObjectClientImpl> *>(clientPtr);
    std::vector<std::string> failedKeys;
    std::string key(cKey, keyLen);
    datasystem::Status rc = (*client)->Delete({ key }, failedKeys);
    if (rc.IsError()) {
        return ToStatusC(rc);
    }
    if (!failedKeys.empty()) {
        return MakeStatusC(datasystem::K_RUNTIME_ERROR, "The failed key is not empty, delete key failed!");
    }
    return StatusC{ datasystem::K_OK, {} };
}

struct StatusC SCDel(KVClient_p clientPtr, const char *cKey, const size_t keyLen)
{
    datasystem::TraceGuard traceGuard = datasystem::Trace::Instance().SetRequestTraceUUID();
    auto access = datasystem::AccessRecorder::Object(datasystem::AccessRecorderKey::DS_KV_CLIENT_DELETE);
    StatusC rc = SCExecuteDel(clientPtr, cKey, keyLen);
    std::string_view keyView = (cKey != nullptr) ? std::string_view(cKey, keyLen) : std::string_view("", 0);
    access.ObjectKeyRef(keyView).Result(rc.code, rc.errMsg).Record();
    return rc;
}

struct StatusC SCGetArray(KVClient_p clientPtr, const char **cKeys, const size_t *keysLen, uint64_t keysNum,
                          uint32_t ctimeoutms, char **cVals, size_t *valsLen)
{
    datasystem::TraceGuard traceGuard = datasystem::Trace::Instance().SetRequestTraceUUID();
    auto access = datasystem::AccessRecorder::Object(datasystem::AccessRecorderKey::DS_KV_CLIENT_GET);
    size_t totalSize = 0;
    StatusC rc = ExecuteGetArray(clientPtr, cKeys, keysLen, keysNum, ctimeoutms, cVals, valsLen, &totalSize);
    datasystem::Status accessRc = (rc.code == datasystem::K_NOT_FOUND) ? datasystem::Status::OK()
        : datasystem::Status(static_cast<datasystem::StatusCode>(rc.code), rc.errMsg);
    if (cKeys != nullptr) {
        access.ObjectKeysRef(cKeys, keysLen, keysNum);
    }
    access.TimeoutMs(ctimeoutms).Result(accessRc).DataSize(totalSize).Record();
    return rc;
}

struct StatusC SCExecuteDelArray(KVClient_p clientPtr, const char **cKeys, uint64_t numObjs, char **cFailedKeys,
                                 uint64_t *failedCount)
{
    std::string errorMsg;
    CheckNullptr(clientPtr, "clientPtr", errorMsg);
    CheckNullptr(cKeys, "cKeys", errorMsg);
    CheckNullptr(cFailedKeys, "cFailedKeys", errorMsg);
    CheckNullptr(failedCount, "failedCount", errorMsg);
    if (!errorMsg.empty()) {
        return MakeStatusC(datasystem::K_INVALID, errorMsg);
    }
    auto client = reinterpret_cast<std::shared_ptr<datasystem::object_cache::ObjectClientImpl> *>(clientPtr);
    std::vector<std::string> keysVec(cKeys, cKeys + numObjs);
    std::vector<std::string> failedKeys;
    datasystem::Status rc = (*client)->Delete(keysVec, failedKeys);
    // Even with errors we can have partial results
    // send the results back as const char
    *failedCount = failedKeys.size();
    for (size_t i = 0; i < failedKeys.size(); ++i) {
        cFailedKeys[i] = StringToCString(failedKeys[i]);
    }
    if (rc.IsError()) {
        LOG(ERROR) << datasystem::FormatString("SCDelArray with error:%s", rc.GetMsg());
    }
    return ToStatusC(rc);
}

struct StatusC SCDelArray(KVClient_p clientPtr, const char **cKeys, uint64_t numObjs, char **cFailedKeys,
                          uint64_t *failedCount)
{
    datasystem::TraceGuard traceGuard = datasystem::Trace::Instance().SetRequestTraceUUID();
    auto access = datasystem::AccessRecorder::Object(datasystem::AccessRecorderKey::DS_KV_CLIENT_DELETE);
    StatusC rc = SCExecuteDelArray(clientPtr, cKeys, numObjs, cFailedKeys, failedCount);
    if (cKeys != nullptr) {
        access.ObjectKeysRef(cKeys, nullptr, numObjs);
    }
    access.Result(rc.code, rc.errMsg).Record();
    return rc;
}

size_t SCGenerateKey(KVClient_p clientPtr, char **key)
{
    if (clientPtr == nullptr) {
        return 0;
    }
    auto client = reinterpret_cast<std::shared_ptr<datasystem::object_cache::ObjectClientImpl> *>(clientPtr);
    std::string strKey;
    (void)(*client)->GenerateKey(strKey);
    *key = StringToCString(strKey);
    return strKey.size();
}