* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#pragma once
#include "LocalInputChannel.h"
#include "table/utils/VectorBatchDeserializationUtils.h"
#include <optional>
#include <memory>
#include <queue>
#include "RemoteDataFetcherBridge.h"
#include "streaming/runtime/streamrecord/StreamRecord.h"
#include "streaming/api/watermark/Watermark.h"
#include "streaming/runtime/streamrecord/StreamElement.h"
#include "runtime/buffer/ObjectBufferRecycler.h"
#include "runtime/buffer/OriginalNetworkBufferRecycler.h"
namespace omnistream {
class RemoteInputChannel : public LocalInputChannel {
public:
RemoteInputChannel(std::shared_ptr<SingleInputGate> inputGate, int channelIndex,
ResultPartitionIDPOD partitionId,
std::shared_ptr<ResultPartitionManager> partitionManager,
int initialBackoff, int maxBackoff, int networkBuffersPerChannel,
std::shared_ptr<Counter> numBytesIn, std::shared_ptr<Counter> numBuffersIn,
std::shared_ptr<ChannelStateWriter> stateWriter
);
void requestSubpartition(int subpartitionIndex) override;
void notifyRemoteDataAvailableForVectorBatch(long bufferAddress, int bufferLength, int sequenceNumber);
std::optional<BufferAndAvailability> getNextBuffer() override;
std::shared_ptr<ObjectSegment> DoDataDeserializationResult(uint8_t*& buffer, int bufferLength);
void notifyRemoteDataAvailableForNetworkBuffer(long bufferAddress, int bufferLength, int readIndex,
int sequenceNumber,
std::shared_ptr<OriginalNetworkBufferRecycler>
originalNetworkBufferRecycler, bool isBuffer, int bufferType);
void SetRemoteDataFetcherBridge(std::shared_ptr<RemoteDataFetcherBridge> remoteDataFetcherBridge);
void resumeConsumption() override;
void CheckpointStarted(const CheckpointBarrier& barrier) override;
void CheckpointStopped(long checkpointId) override;
std::vector<Buffer*> GetInflightBuffersUnsafe(long checkpointId);
void ResetLastBarrier()
{
lastBarrierId_ = 1;
}
void SetForwardResumeToJava(bool forwardResumeToJava)
{
forwardResumeToJava_ = forwardResumeToJava;
}
private:
std::queue<Buffer*> dataQueue;
int expectSequenceNumber = 0;
int initialCredit;
std::recursive_mutex queueMutex;
std::shared_ptr<RemoteDataFetcherBridge> remoteDataFetcherBridge;
long lastBarrierId_ = -1;
bool forwardResumeToJava_ = true;
};
};