* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: Client c wrapper for object cache client.
*/
#include "datasystem/c_api/object_client_c_wrapper.h"
#include <cstddef>
#include <cstring>
#include <functional>
#include <securec.h>
#include "datasystem/c_api/util.h"
#include "datasystem/c_api/utilC.h"
#include "datasystem/client/object_cache/object_client_impl.h"
#include "datasystem/common/log/log.h"
#include "datasystem/common/log/access_recorder.h"
#include "datasystem/common/log/trace.h"
#include "datasystem/object/buffer.h"
#include "datasystem/utils/status.h"
ObjectClient_p OCCreateClient(const char *cWorkerHost, const int workerPort, const int timeOut, const char *token,
size_t tokenLen, const char *clientPublicKey, size_t cClientPublicKeyLen,
const char *clientPrivateKey, size_t clientPrivateKeyLen, const char *serverPublicKey,
size_t cServerPublicKeyLen, const char *accessKey, size_t cAccessKeyLen,
const char *secretKey, size_t secretKeyLen, const char *tenantId, size_t cTenantIdLen,
const char *enableCrossNodeConnection)
{
datasystem::TraceGuard traceGuard = datasystem::Trace::Instance().SetRequestTraceUUID();
return CreateObjectClient(cWorkerHost, workerPort, timeOut, token, tokenLen, clientPublicKey, cClientPublicKeyLen,
clientPrivateKey, clientPrivateKeyLen, serverPublicKey, cServerPublicKeyLen, accessKey,
cAccessKeyLen, secretKey, secretKeyLen, tenantId, cTenantIdLen,
enableCrossNodeConnection);
}
struct StatusC OCConnectWorker(ObjectClient_p clientPtr)
{
datasystem::TraceGuard traceGuard = datasystem::Trace::Instance().SetRequestTraceUUID();
return ConnectWorker(clientPtr);
}
struct StatusC OCUpdateAkSk(ObjectClient_p clientPtr, const char *cAccessKey, size_t cAccessKeyLen,
const char *cSecretKey, size_t cSecretKeyLen)
{
auto client = reinterpret_cast<std::shared_ptr<datasystem::object_cache::ObjectClientImpl> *>(clientPtr);
std::string accessKey(cAccessKey, cAccessKeyLen);
accessKey.assign(cAccessKey, cAccessKeyLen);
datasystem::SensitiveValue secretKey(cSecretKey, cSecretKeyLen);
datasystem::Status rc = (*client)->UpdateAkSk(accessKey, secretKey);
if (rc.IsError()) {
return ToStatusC(rc);
}
return StatusC{ datasystem::K_OK, {} };
}
void OCFreeClient(ObjectClient_p clientPtr)
{
FreeClient(clientPtr);
}
struct StatusC ObjectExecutePut(ObjectClient_p clientPtr, const char *cObjKey, size_t cObjKeyLen, const char *cVal,
size_t cValLen, const char **cNestedObjectKeys, const size_t *cNestedObjKeyLenArray,
const size_t cNestedObjectKeysNum, const char *cConsistencyType)
{
std::string errorMsg;
CheckNullptr(clientPtr, "clientPtr", errorMsg);
CheckNullptr(cObjKey, "cObjKey", errorMsg);
CheckNullptr(cVal, "cVal", errorMsg);
CheckNullptr(cConsistencyType, "cConsistencyType", errorMsg);
if (!errorMsg.empty()) {
return MakeStatusC(datasystem::K_INVALID, errorMsg);
}
std::unordered_set<std::string> nestedObjectKeysSet;
if (cNestedObjectKeys != nullptr && cNestedObjKeyLenArray != nullptr) {
for (size_t i = 0; i < cNestedObjectKeysNum; i++) {
if (cNestedObjectKeys[i] != nullptr) {
std::string objectKey(cNestedObjectKeys[i], cNestedObjKeyLenArray[i]);
(void)nestedObjectKeysSet.insert(objectKey);
}
}
}
auto client = reinterpret_cast<std::shared_ptr<datasystem::object_cache::ObjectClientImpl> *>(clientPtr);
std::string consistencyType(cConsistencyType);
datasystem::object_cache::FullParam createParam;
StatusC s = InitCreateParam(consistencyType, createParam);
if (s.code != datasystem::K_OK) {
return s;
}
std::string objKey(cObjKey, cObjKeyLen);
Status rc =
(*client)->Put(objKey, reinterpret_cast<const uint8_t *>(cVal), cValLen, createParam, nestedObjectKeysSet);
if (rc.IsError()) {
return ToStatusC(rc);
}
return StatusC{ datasystem::K_OK, {} };
}
struct StatusC OCPut(ObjectClient_p clientPtr, const char *cObjKey, size_t cObjKeyLen, const char *cVal, size_t cValLen,
const char **cNestedObjectKeys, const size_t *cNestedObjKeyLenArray,
const size_t cNestedObjectKeysNum, const char *cConsistencyType)
{
datasystem::TraceGuard traceGuard = datasystem::Trace::Instance().SetRequestTraceUUID();
auto access = datasystem::AccessRecorder::Object(datasystem::AccessRecorderKey::DS_OBJECT_CLIENT_PUT);
StatusC rc = ObjectExecutePut(clientPtr, cObjKey, cObjKeyLen, cVal, cValLen, cNestedObjectKeys,
cNestedObjKeyLenArray, cNestedObjectKeysNum, cConsistencyType);
std::string_view objectKey = (cObjKey != nullptr) ? std::string_view(cObjKey, cObjKeyLen) : std::string_view("", 0);
std::string_view consistencyTypeView =
(cConsistencyType != nullptr) ? std::string_view(cConsistencyType) : std::string_view("", 0);
access.ObjectKeyRef(objectKey);
if (cNestedObjectKeys != nullptr) {
access.NestedKeysRef(cNestedObjectKeys, cNestedObjKeyLenArray, cNestedObjectKeysNum);
}
access.ConsistencyTypeText(consistencyTypeView)
.Result(rc.code, rc.errMsg)
.DataSize(cValLen)
.Record();
return rc;
}
struct StatusC OCGet(ObjectClient_p clientPtr, const char **cObjKeys, const size_t *cObjKeysLen, uint64_t objsNum,
uint32_t cTimeoutMs, char **cVals, size_t *valsLen)
{
datasystem::TraceGuard traceGuard = datasystem::Trace::Instance().SetRequestTraceUUID();
auto access = datasystem::AccessRecorder::Object(datasystem::AccessRecorderKey::DS_OBJECT_CLIENT_GET);
size_t totalSize = 0;
StatusC rc = ExecuteGetArray(clientPtr, cObjKeys, cObjKeysLen, objsNum, cTimeoutMs, cVals, valsLen, &totalSize);
if (cObjKeys != nullptr) {
access.ObjectKeysRef(cObjKeys, cObjKeysLen, objsNum);
}
access.TimeoutMs(cTimeoutMs)
.Result(rc.code, rc.errMsg)
.DataSize(totalSize)
.Record();
return rc;
}
struct StatusC OCExecuteGIncreaseRef(ObjectClient_p clientPtr, const char **cObjKeys, const size_t *cObjKeysLen,
uint64_t cObjKeysNum, char *cRemoteClientId, size_t cRemoteClientIdLen,
char **cFailedObjKeys, size_t *failedObjKeysCount)
{
std::string errorMsg;
CheckNullptr(clientPtr, "clientPtr", errorMsg);
CheckNullptr(cObjKeys, "cObjKeys", errorMsg);
if (!errorMsg.empty()) {
return MakeStatusC(datasystem::K_INVALID, errorMsg);
}
std::string remoteClientId;
if (cRemoteClientId != nullptr) {
remoteClientId = std::string(cRemoteClientId, cRemoteClientIdLen);
}
auto client = reinterpret_cast<std::shared_ptr<datasystem::object_cache::ObjectClientImpl> *>(clientPtr);
std::vector<std::string> objKeysVec = GetObjKeysVector(cObjKeys, cObjKeysLen, cObjKeysNum);
std::vector<std::string> failedObjectKeys;
datasystem::Status rc = (*client)->GIncreaseRef(objKeysVec, failedObjectKeys, remoteClientId);
*failedObjKeysCount = failedObjectKeys.size();
for (size_t i = 0; i < failedObjectKeys.size(); ++i) {
cFailedObjKeys[i] = StringToCString(failedObjectKeys[i]);
}
if (rc.IsError()) {
return ToStatusC(rc);
}
return StatusC{ datasystem::K_OK, {} };
}
struct StatusC OCGIncreaseRef(ObjectClient_p clientPtr, const char **cObjKeys, const size_t *cObjKeysLen,
uint64_t cObjKeysNum, char *cRemoteClientId, size_t cRemoteClientIdLen,
char **cFailedObjKeys, size_t *failedObjKeysCount)
{
datasystem::TraceGuard traceGuard = datasystem::Trace::Instance().SetRequestTraceUUID();
auto access = datasystem::AccessRecorder::Object(datasystem::AccessRecorderKey::DS_OBJECT_CLIENT_GINCREASEREF);
StatusC rc = OCExecuteGIncreaseRef(clientPtr, cObjKeys, cObjKeysLen, cObjKeysNum, cRemoteClientId,
cRemoteClientIdLen, cFailedObjKeys, failedObjKeysCount);
if (cObjKeys != nullptr) {
access.ObjectKeysRef(cObjKeys, cObjKeysLen, cObjKeysNum);
}
std::string_view remoteClientIdView =
(cRemoteClientId != nullptr) ? std::string_view(cRemoteClientId, cRemoteClientIdLen) : std::string_view("", 0);
access.RemoteClientId(remoteClientIdView)
.Result(rc.code, rc.errMsg)
.Record();
return rc;
}
struct StatusC OCExecuteGDecreaseRef(ObjectClient_p clientPtr, const char **cObjKeys, const size_t *cObjKeysLen,
uint64_t cObjKeysNum, char *cRemoteClientId, size_t cRemoteClientIdLen,
char **cFailedObjKeys, size_t *failedObjKeysCount)
{
std::string errorMsg;
CheckNullptr(clientPtr, "clientPtr", errorMsg);
CheckNullptr(cObjKeys, "cObjKeys", errorMsg);
if (!errorMsg.empty()) {
return MakeStatusC(datasystem::K_INVALID, errorMsg);
}
std::string remoteClientId;
if (cRemoteClientId != nullptr) {
remoteClientId = std::string(cRemoteClientId, cRemoteClientIdLen);
}
auto client = reinterpret_cast<std::shared_ptr<datasystem::object_cache::ObjectClientImpl> *>(clientPtr);
std::vector<std::string> failedObjectKeys;
std::vector<std::string> objKeysVec = GetObjKeysVector(cObjKeys, cObjKeysLen, cObjKeysNum);
datasystem::Status rc = (*client)->GDecreaseRef(objKeysVec, failedObjectKeys, remoteClientId);
*failedObjKeysCount = failedObjectKeys.size();
for (size_t i = 0; i < failedObjectKeys.size(); ++i) {
cFailedObjKeys[i] = StringToCString(failedObjectKeys[i]);
}
if (rc.IsError()) {
return ToStatusC(rc);
}
return StatusC{ datasystem::K_OK, {} };
}
struct StatusC OCDeccreaseRef(ObjectClient_p clientPtr, const char **cObjKeys, const size_t *cObjKeysLen,
uint64_t cObjKeysNum, char *cRemoteClientId, size_t cRemoteClientIdLen,
char **cFailedObjKeys, size_t *failedObjKeysCount)
{
datasystem::TraceGuard traceGuard = datasystem::Trace::Instance().SetRequestTraceUUID();
auto access = datasystem::AccessRecorder::Object(datasystem::AccessRecorderKey::DS_OBJECT_CLIENT_GDECREASEREF);
StatusC rc = OCExecuteGDecreaseRef(clientPtr, cObjKeys, cObjKeysLen, cObjKeysNum, cRemoteClientId,
cRemoteClientIdLen, cFailedObjKeys, failedObjKeysCount);
if (cObjKeys != nullptr) {
access.ObjectKeysRef(cObjKeys, cObjKeysLen, cObjKeysNum);
}
std::string_view remoteClientIdView =
(cRemoteClientId != nullptr) ? std::string_view(cRemoteClientId, cRemoteClientIdLen) : std::string_view("", 0);
access.RemoteClientId(remoteClientIdView)
.Result(rc.code, rc.errMsg)
.Record();
return rc;
}
struct StatusC OCExecuteReleaseGRefs(ObjectClient_p clientPtr, char *cRemoteClientId, size_t cRemoteClientIdLen)
{
std::string errorMsg;
CheckNullptr(clientPtr, "clientPtr", errorMsg);
CheckNullptr(cRemoteClientId, "cRemoteClientId", errorMsg);
if (!errorMsg.empty()) {
return MakeStatusC(datasystem::K_INVALID, errorMsg);
}
auto client = reinterpret_cast<std::shared_ptr<datasystem::object_cache::ObjectClientImpl> *>(clientPtr);
std::string remoteClientId(cRemoteClientId, cRemoteClientIdLen);
datasystem::Status rc = (*client)->ReleaseGRefs(remoteClientId);
if (rc.IsError()) {
return ToStatusC(rc);
}
return StatusC{ datasystem::K_OK, {} };
}
struct StatusC OCReleaseGRefs(ObjectClient_p clientPtr, char *cRemoteClientId, size_t cRemoteClientIdLen)
{
datasystem::TraceGuard traceGuard = datasystem::Trace::Instance().SetRequestTraceUUID();
auto access = datasystem::AccessRecorder::Object(datasystem::AccessRecorderKey::DS_OBJECT_CLIENT_RELEASEGREFS);
StatusC rc = OCExecuteReleaseGRefs(clientPtr, cRemoteClientId, cRemoteClientIdLen);
std::string_view remoteClientIdView =
(cRemoteClientId != nullptr) ? std::string_view(cRemoteClientId, cRemoteClientIdLen) : std::string_view("", 0);
access.RemoteClientId(remoteClientIdView).Result(rc.code, rc.errMsg).Record();
return rc;
}
struct StatusC GetObjMetaInfo(ObjectClient_p clientPtr, const char *cTenantId, const size_t cTenantIdLen,
const char **cObjKeys, const size_t *cObjKeysLen, const size_t cObjNum, size_t *cObjSizes,
char ***cLocations, size_t *cLocNumPerObj, int *cLocationNum)
{
std::string errorMsg;
CheckNullptr(clientPtr, "clientPtr", errorMsg);
CheckNullptr(cTenantId, "cTenantId", errorMsg);
CheckNullptr(cObjKeys, "cObjKeys", errorMsg);
CheckNullptr(cObjKeysLen, "cObjKeysLen", errorMsg);
CheckNullptr(cObjSizes, "cObjSizes", errorMsg);
CheckNullptr(cLocations, "cLocations", errorMsg);
CheckNullptr(cLocNumPerObj, "cLocNumPerObj", errorMsg);
CheckNullptr(cLocationNum, "cLocationNum", errorMsg);
if (!errorMsg.empty()) {
return MakeStatusC(datasystem::K_INVALID, errorMsg);
}
std::string tenantId(cTenantId, cTenantIdLen);
std::vector<std::string> objKeysVec = GetObjKeysVector(cObjKeys, cObjKeysLen, cObjNum);
std::vector<datasystem::ObjMetaInfo> objMetas;
auto client = reinterpret_cast<std::shared_ptr<datasystem::object_cache::ObjectClientImpl> *>(clientPtr);
datasystem::Status rc = (*client)->GetObjMetaInfo(tenantId, objKeysVec, objMetas);
if (rc.IsError()) {
return ToStatusC(rc);
}
int totalNum = 0;
for (const auto &objMeta : objMetas) {
totalNum += objMeta.locations.size();
}
*cLocationNum = totalNum;
*cLocations = MakeCharsArray(totalNum);
if (*cLocations == nullptr) {
return MakeStatusC(datasystem::K_RUNTIME_ERROR, "failed to allocate memory");
}
int pos = 0;
for (size_t i = 0; i < objMetas.size(); i++) {
cObjSizes[i] = objMetas[i].objSize;
cLocNumPerObj[i] = objMetas[i].locations.size();
for (auto &loc : objMetas[i].locations) {
(*cLocations)[pos++] = StringToCString(loc);
}
}
return StatusC{ datasystem::K_OK, {} };
}