* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: state cache implement for state client.
*/
#include "datasystem/java_api/kv/kv_impl.h"
#include <string>
#include "datasystem/common/util/status_helper.h"
#include "datasystem/utils/status.h"
#include "datasystem/client/object_cache/object_client_impl.h"
namespace datasystem {
namespace java_api {
Status SetDirectBufferNativeImpl(JNIEnv *env, jlong handle, jstring keyJO, jobject valueJO, jobject paramJO,
ObjectAccessRecorder &access)
{
auto client = reinterpret_cast<std::shared_ptr<ObjectClientImpl> *>(handle);
std::string key = ToString(env, keyJO);
access.ObjectKeyOwned(key);
auto body = env->GetDirectBufferAddress(valueJO);
if (!body) {
std::string msg = "cant get data address";
RETURN_STATUS(StatusCode::K_RUNTIME_ERROR, msg);
}
SetParam param = ToCppSetParam(env, paramJO);
access.WriteMode(static_cast<int>(param.writeMode)).TtlSecond(param.ttlSecond);
if (env->ExceptionOccurred()) {
std::string msg =
"Exception Occurs when Java_org_yuanrong_datasystem_statecache_StateClient_setDirectBufferNative function "
"to call ToCppSetParam()";
RETURN_STATUS(StatusCode::K_RUNTIME_ERROR, msg);
} else {
datasystem::object_cache::FullParam fullParam;
fullParam.writeMode = param.writeMode;
fullParam.ttlSecond = param.ttlSecond;
fullParam.cacheType = param.cacheType;
fullParam.consistencyType = ConsistencyType::CAUSAL;
int limit = GetByteBufferLimit(env, valueJO);
access.DataSize(static_cast<uint64_t>(limit));
RETURN_IF_NOT_OK(
(*client)->Put(key, static_cast<const uint8_t *>(body), limit, fullParam, {}, param.ttlSecond));
}
body = NULL;
return Status::OK();
}
Status SetHeapBufferNativeImpl(JNIEnv *env, jlong handle, jstring keyJO, jbyteArray byteArray, jlong size,
jobject paramJO, ObjectAccessRecorder &access)
{
auto client = reinterpret_cast<std::shared_ptr<ObjectClientImpl> *>(handle);
std::string key = ToString(env, keyJO);
access.ObjectKeyOwned(key);
SetParam param = ToCppSetParam(env, paramJO);
access.WriteMode(static_cast<int>(param.writeMode)).TtlSecond(param.ttlSecond)
.DataSize(static_cast<uint64_t>(size));
datasystem::object_cache::FullParam fullParam;
if (env->ExceptionOccurred()) {
std::string msg =
"Exception Occurs when Java_org_yuanrong_datasystem_statecache_StateClient_setDirectBufferNative function "
"to call ToCppSetParam()";
RETURN_STATUS(StatusCode::K_RUNTIME_ERROR, msg);
} else {
fullParam.writeMode = param.writeMode;
fullParam.consistencyType = ConsistencyType::CAUSAL;
fullParam.cacheType = param.cacheType;
fullParam.ttlSecond = param.ttlSecond;
}
if ((*client)->ShmCreateable(size)) {
std::shared_ptr<Buffer> buffer;
RETURN_IF_NOT_OK((*client)->Create(key, size, fullParam, buffer));
RETURN_IF_NOT_OK(CopyToBufferNoLatch(env, byteArray, size, *buffer));
RETURN_IF_NOT_OK(buffer->Publish());
} else {
jbyte *bytes = env->GetByteArrayElements(byteArray, 0);
RETURN_IF_NOT_OK((*client)->Put(key, reinterpret_cast<const uint8_t *>(bytes), size, fullParam, {}));
env->ReleaseByteArrayElements(byteArray, bytes, 0);
}
return Status::OK();
}
Status GetKeyNativeImpl(JNIEnv *env, jlong handle, jstring keyJO, jint timeoutMs, jint &totalSize,
jobject &heapBuffer, ObjectAccessRecorder &access)
{
auto client = reinterpret_cast<std::shared_ptr<ObjectClientImpl> *>(handle);
std::string key = ToString(env, keyJO);
access.ObjectKeyOwned(key).TimeoutMs(timeoutMs);
std::vector<Optional<Buffer>> buffers;
RETURN_IF_NOT_OK((*client)->Get({ key }, timeoutMs, buffers));
RETURN_IF_NOT_OK(buffers[0]->CopyDataWithRLatch([&] {
heapBuffer =
ToHeapBuffer(env, reinterpret_cast<const char *>(buffers[0]->ImmutableData()), buffers[0]->GetSize());
if (env->ExceptionOccurred()) {
std::string msg =
"Exception Occurs when Java_org_yuanrong_datasystem_statecache_StateClient_get__JLjava_lang_String_2 "
"function to call ToHeapBuffer()";
RETURN_STATUS(StatusCode::K_RUNTIME_ERROR, msg);
}
return Status::OK();
}));
totalSize += buffers[0]->GetSize();
return Status::OK();
}
Status GetKeysNativeImpl(JNIEnv *env, jlong handle, jobject keysJO, jint timeoutMs, jint &totalSize,
jobject &ListJO, ObjectAccessRecorder &access)
{
auto client = reinterpret_cast<std::shared_ptr<ObjectClientImpl> *>(handle);
std::vector<std::string> keys;
GetJavaStringListVal(env, keysJO, keys);
access.ObjectKeyProvider([&keys] { return objectKeysToString(keys); });
access.TimeoutMs(timeoutMs);
std::vector<Optional<Buffer>> buffers;
Status rc = (*client)->Get(keys, timeoutMs, buffers);
if (rc.IsOk()) {
jclass listJC = env->FindClass("java/util/ArrayList");
jmethodID initId = env->GetMethodID(listJC, "<init>", "()V");
ListJO = env->NewObject(listJC, initId);
jmethodID addId = env->GetMethodID(listJC, "add", "(Ljava/lang/Object;)Z");
jobject byteBuffer = NULL;
for (auto &buffer : buffers) {
if (buffer) {
Status copyRc = buffer->CopyDataWithRLatch([&] {
byteBuffer = ToHeapBuffer(env, reinterpret_cast<const char *>(buffer->ImmutableData()),
buffer->GetSize());
if (env->ExceptionOccurred()) {
std::string msg =
"Exception Occurs when Java_org_yuanrong_datasystem_statecache_StateClient_getKeysNative "
"function to call ToHeapBuffer()";
RETURN_STATUS(StatusCode::K_RUNTIME_ERROR, msg);
}
return Status::OK();
});
if (!copyRc.IsOk()) {
rc = copyRc;
break;
}
totalSize += buffer->GetSize();
} else {
byteBuffer = nullptr;
}
env->CallBooleanMethod(ListJO, addId, byteBuffer);
}
env->DeleteLocalRef(listJC);
env->DeleteLocalRef(byteBuffer);
}
Status accessRc = (rc.GetCode() == K_NOT_FOUND) ? Status::OK() : rc;
access.Result(accessRc).DataSize(totalSize).Record();
return rc;
}
Status DelKeyNativeImpl(JNIEnv *env, jlong handle, jstring keyJO, std::vector<std::string> &failedKeys,
ObjectAccessRecorder &access)
{
auto client = reinterpret_cast<std::shared_ptr<ObjectClientImpl> *>(handle);
std::string key = ToString(env, keyJO);
access.ObjectKeyOwned(key);
RETURN_IF_NOT_OK((*client)->Delete({ key }, failedKeys));
if (!failedKeys.empty()) {
std::string msg = "The failed key is not empty, delete key failed!";
RETURN_STATUS(StatusCode::K_RUNTIME_ERROR, msg);
}
return Status::OK();
}
Status DelKeysNativeImpl(JNIEnv *env, jlong handle, jobject keysJO, std::vector<std::string> &failedKeys,
ObjectAccessRecorder &access)
{
auto client = reinterpret_cast<std::shared_ptr<ObjectClientImpl> *>(handle);
std::vector<std::string> keys;
GetJavaStringListVal(env, keysJO, keys);
access.ObjectKeyProvider([&keys] { return objectKeysToString(keys); });
Status rc = (*client)->Delete(keys, failedKeys);
access.Result(rc).Record();
return rc;
}
}
}