* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "org_apache_flink_runtime_io_network_partition_consumer_OmniLocalInputChannel.h"
#include <nlohmann/json.hpp>
#include <string>
#include "partition/consumer/OmniLocalChannelReader.h"
#include "runtime/executiongraph/descriptor/ResultPartitionIDPOD.h"
#include "runtime/taskmanager/OmniTask.h"
#include "runtime/state/bridge/OmniLocalInputChannelBridge.h"
#include "jni/bridge/OmniLocalInputChannelBridgeImpl.h"
#include "runtime/partition/consumer/OmniLocalInputChannel.h"
JNIEXPORT jlong JNICALL
Java_org_apache_flink_runtime_io_network_partition_consumer_OmniLocalInputChannel_doChangeNativeLocalInputChannel(
JNIEnv* jniEnv, jobject, jlong nativeTaskRef, jstring partitionIdJson)
{
const char* paritionIdChars = jniEnv->GetStringUTFChars(partitionIdJson, nullptr);
std::string paritionIdStr(paritionIdChars);
jniEnv->ReleaseStringUTFChars(partitionIdJson, paritionIdChars);
nlohmann::json partitionId = nlohmann::json::parse(paritionIdStr);
omnistream::ResultPartitionIDPOD partitionIdPOD = partitionId;
auto task = reinterpret_cast<omnistream::OmniTask*>(nativeTaskRef);
return task->changeLocalInputChannelToOriginal(partitionIdPOD);
}
JNIEXPORT void JNICALL
Java_org_apache_flink_runtime_io_network_partition_consumer_OmniLocalInputChannel_sendMemorySegmentToNative(
JNIEnv*,
jobject,
jlong omniLocalInputChannelRef,
jlong segmentAddress,
jint readIndex,
jint length,
jint memorySegmentOffset,
jint sequenceNum,
jint bufferType)
{
auto omniInputChannel = reinterpret_cast<omnistream::OmniLocalInputChannel*>(omniLocalInputChannelRef);
omniInputChannel->notifyOriginalDataAvailable(
segmentAddress, length, readIndex, sequenceNum, memorySegmentOffset, bufferType);
}
JNIEXPORT jlong JNICALL
Java_org_apache_flink_runtime_io_network_partition_consumer_OmniLocalInputChannel_getRecycleBufferAddress(
JNIEnv*, jobject, jlong omniLocalInputChannelRef)
{
auto omniInputChannel = reinterpret_cast<omnistream::OmniLocalInputChannel*>(omniLocalInputChannelRef);
return omniInputChannel->GetRecycleBufferAddress();
}
JNIEXPORT void JNICALL
Java_org_apache_flink_runtime_io_network_partition_consumer_OmniLocalInputChannel_registerJavaOmniLocalInputChannel(
JNIEnv* env, jobject thiz, jlong omniLocalInputChannelRef)
{
auto omniInputChannel = reinterpret_cast<omnistream::OmniLocalInputChannel*>(omniLocalInputChannelRef);
std::shared_ptr<OmniLocalInputChannelBridgeImpl> bridge = std::make_shared<OmniLocalInputChannelBridgeImpl>();
bridge->RegisterJavaOmniLocalInputChannel(env, thiz);
omniInputChannel->SetOmniLocalInputChannelBridge(bridge);
}