* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "OmniStreamOneInputProcessor.h"
#include "io/recover/OmniRescalingStreamTaskNetworkInput.h"
namespace omnistream {
OmniStreamOneInputProcessor::OmniStreamOneInputProcessor(
OmniStreamTaskInput* input, OmniStreamTaskInput::OmniDataOutput* output, OperatorChainV2* operatorChain)
: input(input),
output(output),
endOfInputAware(operatorChain)
{
}
OmniStreamOneInputProcessor::~OmniStreamOneInputProcessor()
{
if (input != nullptr) {
delete input;
input = nullptr;
}
if (output != nullptr) {
delete output;
output = nullptr;
}
}
DataInputStatus OmniStreamOneInputProcessor::processInput()
{
DataInputStatus status = input->emitNext(output);
if (status == DataInputStatus::END_OF_RECOVERY) {
auto recoverInput = dynamic_cast<OmniRescalingStreamTaskNetworkInput*>(input);
if (recoverInput) {
input = recoverInput->finishRecover();
}
return status;
}
LOG("emitNext return status: " << DataInputStatusHelper::mapToInt(status));
return status;
}
std::shared_ptr<CompletableFuture> OmniStreamOneInputProcessor::GetAvailableFuture()
{
return input->GetAvailableFuture();
}
OmniStreamTaskInput* OmniStreamOneInputProcessor::GetInput()
{
return input;
}
std::shared_ptr<CompletableFutureV2<void>> OmniStreamOneInputProcessor::PrepareSnapshot(
std::shared_ptr<ChannelStateWriter> writer, long checkpointID)
{
LOG("OneInput prepare snapshot, checkpointID: " << checkpointID);
return input->PrepareSnapshot(writer, checkpointID);
}
void OmniStreamOneInputProcessor::close()
{
if (input != nullptr) {
input->close();
}
if (output != nullptr) {
output->close();
}
}
}