/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

#include "com_huawei_omniruntime_flink_streaming_runtime_io_OmniStreamTaskNetworkInput.h"
#include "streaming/runtime/io/StreamOneInputProcessor.h"
#include "streaming/runtime/tasks/StreamTask.h"


JNIEXPORT jobject JNICALL Java_com_huawei_omniruntime_flink_streaming_runtime_io_OmniStreamTaskNetworkInput_TNELProcessBuffer
        (JNIEnv *env, jobject input, jlong omniStreamTaskRef, jlong omniInputProcessorRef, jlong address, jint offset, jint numBytes, jlong channelInfo)
{
    auto *streamTask = reinterpret_cast<omnistream::datastream::StreamTask *>(omniStreamTaskRef);
    auto *processor = reinterpret_cast<omnistream::datastream::StreamOneInputProcessor *>(omniInputProcessorRef);

    auto buffAddress = reinterpret_cast<uint8_t *>(address);

    LOG(" Inside Java_com_huawei_omniruntime_flink_streaming_runtime_io_OmniStreamTaskNetworkInput_TNELProcessBuffer buffAddress + offset " +
        std::to_string(reinterpret_cast<long>(buffAddress + offset)) + " numBytes " + std::to_string(numBytes))

    auto myClass = env->FindClass("com/huawei/omniruntime/flink/streaming/runtime/io/TNELProcessState");
    auto constructor = env->GetMethodID(myClass, "<init>", "(IJ)V");
    int32_t inputNumber = 0;
    auto status = processor->processInput(buffAddress + offset, numBytes, channelInfo, inputNumber);
    streamTask->resetOutputBufferAndRecordWriter();
    auto myObject = env->NewObject(myClass, constructor, status, inputNumber);
    return myObject;
}