* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "RocksDBStateDownloader.h"
#include "common/global.h"
#include <stdexcept>
jobject convertToJavaByteStreamStateHandle(
JNIEnv* env,
const ByteStreamStateHandle& cppHandle)
{
jclass byteStreamStateHandleClass = env->FindClass(
"org/apache/flink/runtime/state/memory/ByteStreamStateHandle");
if (!byteStreamStateHandleClass) {
env->ExceptionDescribe();
return nullptr;
}
jmethodID constructor = env->GetMethodID(
byteStreamStateHandleClass,
"<init>",
"(Ljava/lang/String;[B)V");
if (!constructor) {
env->ExceptionDescribe();
env->DeleteLocalRef(byteStreamStateHandleClass);
return nullptr;
}
jstring jHandleName = env->NewStringUTF(cppHandle.GetHandleName().c_str());
if (!jHandleName) {
env->DeleteLocalRef(byteStreamStateHandleClass);
return nullptr;
}
const auto& cppData = cppHandle.GetData();
jbyteArray jData = env->NewByteArray(static_cast<jsize>(cppData.size()));
if (!jData) {
env->DeleteLocalRef(jHandleName);
env->DeleteLocalRef(byteStreamStateHandleClass);
return nullptr;
}
env->SetByteArrayRegion(
jData,
0,
static_cast<jsize>(cppData.size()),
reinterpret_cast<const jbyte*>(cppData.data()));
jobject javaHandle = env->NewObject(
byteStreamStateHandleClass,
constructor,
jHandleName,
jData);
env->DeleteLocalRef(jHandleName);
env->DeleteLocalRef(jData);
env->DeleteLocalRef(byteStreamStateHandleClass);
return javaHandle;
}
jobject convertToJavaFileStateHandle(JNIEnv* env, const FileStateHandle& cppHandle)
{
jclass fileStateHandleClass = env->FindClass("org/apache/flink/runtime/state/filesystem/FileStateHandle");
if (!fileStateHandleClass) {
env->ExceptionDescribe();
return nullptr;
}
jmethodID constructor = env->GetMethodID(
fileStateHandleClass,
"<init>",
"(Lorg/apache/flink/core/fs/Path;J)V");
if (!constructor) {
env->ExceptionDescribe();
env->DeleteLocalRef(fileStateHandleClass);
return nullptr;
}
jclass pathClass = env->FindClass("org/apache/flink/core/fs/Path");
jmethodID pathConstructor = env->GetMethodID(pathClass, "<init>", "(Ljava/lang/String;)V");
std::string filePathStr = cppHandle.GetFilePath().toString();
jstring jFilePathStr = env->NewStringUTF(filePathStr.c_str());
jobject javaPath = env->NewObject(pathClass, pathConstructor, jFilePathStr);
jobject javaHandle = env->NewObject(
fileStateHandleClass,
constructor,
javaPath,
static_cast<jlong>(cppHandle.GetStateSize())
);
env->DeleteLocalRef(jFilePathStr);
env->DeleteLocalRef(javaPath);
env->DeleteLocalRef(pathClass);
env->DeleteLocalRef(fileStateHandleClass);
return javaHandle;
}
jobject convertToJavaRelativeFileStateHandle(
JNIEnv* env,
const RelativeFileStateHandle& cppHandle)
{
jclass relativeHandleClass = env->FindClass(
"org/apache/flink/runtime/state/filesystem/RelativeFileStateHandle");
if (!relativeHandleClass) {
env->ExceptionDescribe();
return nullptr;
}
jmethodID constructor = env->GetMethodID(
relativeHandleClass,
"<init>",
"(Lorg/apache/flink/core/fs/Path;Ljava/lang/String;J)V");
if (!constructor) {
env->ExceptionDescribe();
env->DeleteLocalRef(relativeHandleClass);
return nullptr;
}
jclass pathClass = env->FindClass("org/apache/flink/core/fs/Path");
jmethodID pathConstructor = env->GetMethodID(pathClass, "<init>", "(Ljava/lang/String;)V");
std::string filePathStr = cppHandle.GetFilePath().toString();
jstring jFilePathStr = env->NewStringUTF(filePathStr.c_str());
jobject javaPath = env->NewObject(pathClass, pathConstructor, jFilePathStr);
jstring jRelativePath = env->NewStringUTF(cppHandle.GetRelativePath().c_str());
jobject javaHandle = env->NewObject(
relativeHandleClass,
constructor,
javaPath,
jRelativePath,
static_cast<jlong>(cppHandle.GetStateSize())
);
env->DeleteLocalRef(jFilePathStr);
env->DeleteLocalRef(javaPath);
env->DeleteLocalRef(jRelativePath);
env->DeleteLocalRef(pathClass);
env->DeleteLocalRef(relativeHandleClass);
return javaHandle;
}
jobject convertToJavaStreamStateHandle(
JNIEnv* env,
const StreamStateHandle& cppHandle)
{
if (auto byteHandle = dynamic_cast<const ByteStreamStateHandle*>(&cppHandle)) {
return convertToJavaByteStreamStateHandle(env, *byteHandle);
} else if (auto fileHandle = dynamic_cast<const FileStateHandle*>(&cppHandle)) {
return convertToJavaFileStateHandle(env, *fileHandle);
} else if (auto relHandle = dynamic_cast<const RelativeFileStateHandle*>(&cppHandle)) {
return convertToJavaRelativeFileStateHandle(env, *relHandle);
} else {
env->ThrowNew(
env->FindClass("java/lang/UnsupportedOperationException"),
"Unsupported StreamStateHandle type");
return nullptr;
}
}
jobject convertToJavaIncrementalHandleAndLocalPath(
JNIEnv* env,
const IncrementalKeyedStateHandle::HandleAndLocalPath& cppHandle)
{
jclass incrementalHandleClass = env->FindClass(
"org/apache/flink/runtime/state/IncrementalKeyedStateHandle");
if (!incrementalHandleClass) {
env->ExceptionDescribe();
return nullptr;
}
jclass handleAndPathClass = env->FindClass(
"org/apache/flink/runtime/state/IncrementalKeyedStateHandle$HandleAndLocalPath");
if (!handleAndPathClass) {
env->ExceptionDescribe();
env->DeleteLocalRef(incrementalHandleClass);
return nullptr;
}
jmethodID factoryMethod = env->GetStaticMethodID(
handleAndPathClass,
"of",
"(Lorg/apache/flink/runtime/state/StreamStateHandle;Ljava/lang/String;)"
"Lorg/apache/flink/runtime/state/IncrementalKeyedStateHandle$HandleAndLocalPath;");
if (!factoryMethod) {
env->ExceptionDescribe();
env->DeleteLocalRef(handleAndPathClass);
env->DeleteLocalRef(incrementalHandleClass);
return nullptr;
}
jobject javaStateHandle = nullptr;
if (cppHandle.getHandle()) {
javaStateHandle = convertToJavaStreamStateHandle(env, *cppHandle.getHandle());
if (!javaStateHandle) {
env->DeleteLocalRef(handleAndPathClass);
env->DeleteLocalRef(incrementalHandleClass);
return nullptr;
}
}
jstring jLocalPath = env->NewStringUTF(cppHandle.getLocalPath().c_str());
if (!jLocalPath) {
if (javaStateHandle) env->DeleteLocalRef(javaStateHandle);
env->DeleteLocalRef(handleAndPathClass);
env->DeleteLocalRef(incrementalHandleClass);
return nullptr;
}
jobject javaHandleAndPath = env->CallStaticObjectMethod(
handleAndPathClass, factoryMethod, javaStateHandle, jLocalPath);
if (javaStateHandle) env->DeleteLocalRef(javaStateHandle);
env->DeleteLocalRef(jLocalPath);
env->DeleteLocalRef(handleAndPathClass);
env->DeleteLocalRef(incrementalHandleClass);
return javaHandleAndPath;
}
jobject convertToJavaHandleAndLocalPathList(
JNIEnv* env,
const std::vector<HandleAndLocalPath>& cppHandles)
{
jclass arrayListClass = env->FindClass("java/util/ArrayList");
jmethodID constructor = env->GetMethodID(arrayListClass, "<init>", "()V");
jmethodID addMethod = env->GetMethodID(arrayListClass, "add", "(Ljava/lang/Object;)Z");
jobject javaList = env->NewObject(arrayListClass, constructor);
if (!javaList) {
env->DeleteLocalRef(arrayListClass);
return nullptr;
}
for (const auto& cppHandle : cppHandles) {
jobject javaItem = convertToJavaIncrementalHandleAndLocalPath(env, cppHandle);
if (!javaItem) {
env->DeleteLocalRef(javaList);
env->DeleteLocalRef(arrayListClass);
return nullptr;
}
env->CallBooleanMethod(javaList, addMethod, javaItem);
env->DeleteLocalRef(javaItem);
}
env->DeleteLocalRef(arrayListClass);
return javaList;
}
jobject convertFsPathToJavaPath(JNIEnv* env, const fs::path& restoreInstancePath)
{
jclass pathsClass = env->FindClass("java/nio/file/Paths");
if (pathsClass == nullptr) {
return nullptr;
}
jmethodID getMethod = env->GetStaticMethodID(
pathsClass,
"get",
"(Ljava/lang/String;[Ljava/lang/String;)Ljava/nio/file/Path;");
if (getMethod == nullptr) {
env->DeleteLocalRef(pathsClass);
return nullptr;
}
jstring pathString = env->NewStringUTF(restoreInstancePath.string().c_str());
jobjectArray emptyArray = env->NewObjectArray(0, env->FindClass("java/lang/String"), nullptr);
jobject javaPath = env->CallStaticObjectMethod(
pathsClass,
getMethod,
pathString,
emptyArray);
env->DeleteLocalRef(pathString);
env->DeleteLocalRef(emptyArray);
env->DeleteLocalRef(pathsClass);
return javaPath;
}
void RocksDBStateDownloader::callDownloadDataForAllStateHandles(
const std::vector<HandleAndLocalPath> &handleWithPaths,
const fs::path &restoreInstancePath,
std::shared_ptr<omnistream::OmniTaskBridge> omniTaskBridge)
{
auto env = omniTaskBridge->getJNIEnv();
jclass downloaderClass = env->FindClass("org/apache/flink/contrib/streaming/state/RocksDBStateDownloader");
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
throw std::runtime_error("Failed to find RocksDBStateUploader class");
}
jmethodID constructor = env->GetMethodID(downloaderClass, "<init>", "(I)V");
jobject downloaderInstance = env->NewObject(downloaderClass, constructor, restoringThreadNum_);
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
throw std::runtime_error("Failed to NewObject");
}
jobject jPath = convertFsPathToJavaPath(env, restoreInstancePath);
jobject javaHandleAndLocalPathList = convertToJavaHandleAndLocalPathList(env, handleWithPaths);
jclass closeableClass = env->FindClass("org/apache/flink/core/fs/CloseableRegistry");
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
throw std::runtime_error("Failed to find CloseableRegistry class");
}
jmethodID closeableCtor = env->GetMethodID(closeableClass, "<init>", "()V");
jobject jCloseableRegistry = env->NewObject(closeableClass, closeableCtor);
jmethodID downloadMethod = env->GetMethodID(
downloaderClass,
"downloadDataForAllStateHandles",
"(Ljava/util/List;Ljava/nio/file/Path;"
"Lorg/apache/flink/core/fs/CloseableRegistry;)V");
env->CallVoidMethod(downloaderInstance,
downloadMethod,
javaHandleAndLocalPathList,
jPath,
jCloseableRegistry);
env->DeleteLocalRef(closeableClass);
env->DeleteLocalRef(downloaderInstance);
env->DeleteLocalRef(javaHandleAndLocalPathList);
env->DeleteLocalRef(jPath);
env->DeleteLocalRef(jCloseableRegistry);
env->DeleteLocalRef(downloaderClass);
}
RocksDBStateDownloader::RocksDBStateDownloader(int restoringThreadNum)
: RocksDBStateDataTransfer(restoringThreadNum),
restoringThreadNum_(restoringThreadNum) {
}