* Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved.
*/
#include "RecoveredInputChannel.h"
#include "buffer/ReadOnlySlicedNetworkBuffer.h"
std::shared_ptr<omnistream::InputChannel> RecoveredInputChannel::toInputChannel()
{
if(!stateConsumedFuture->IsDone()){
LOG("recovery not completed, do not convert to normal channel!");
throw std::runtime_error("recovery not completed, do not convert to normal channel!");
}
if(!stateConsumedFuture1.load()){
LOG("recovery not completed, do not convert to normal channel!");
throw std::runtime_error("recovery not completed, do not convert to normal channel!");
}
{
std::lock_guard<std::mutex> lock(bufferLock);
consumedRecoveredBufferOwners.clear();
}
std::shared_ptr<omnistream::InputChannel> inputChannel = toInputChannelInternal();
inputChannel->CheckpointStopped(lastStoppedCheckpointId);
return inputChannel;
}
void RecoveredInputChannel::onRecoveredStateBuffer(Buffer *buffer)
{
bool recycleBuffer = true;
bool wasEmpty = false;
{
std::lock_guard<std::mutex> lock(bufferLock);
if (!released) {
wasEmpty = receivedBuffers.empty();
receivedBuffers.emplace_back(buffer, nullptr);
recycleBuffer = false;
}
}
if (wasEmpty) {
notifyChannelNonEmpty();
}
if (recycleBuffer && buffer != nullptr) {
buffer->RecycleBuffer();
}
}
void RecoveredInputChannel::onRecoveredStateBuffer2(Buffer *buffer)
{
bool recycleBuffer = true;
bool wasEmpty = false;
ReadOnlySlicedNetworkBuffer* readOnlyBuffer;
{
readOnlyBuffer = new ReadOnlySlicedNetworkBuffer(
dynamic_cast<NetworkBuffer *>(buffer), 0, buffer->GetSize());
std::lock_guard<std::mutex> lock(bufferLock);
if (!released) {
wasEmpty = receivedBuffers.empty();
receivedBuffers.emplace_back(readOnlyBuffer, nullptr);
recycleBuffer = false;
}
}
if (wasEmpty) {
notifyChannelNonEmpty();
}
if (recycleBuffer && readOnlyBuffer != nullptr) {
readOnlyBuffer->RecycleBuffer();
}
}
void RecoveredInputChannel::finishReadRecoveredState()
{
LOG("Recovered input channel finishReadRecoveredState!");
NetworkBuffer* networkBuffer = omnistream::EventSerializer::toBuffer(
EndOfChannelStateEvent::getInstance(), false);
if (networkBuffer != nullptr) {
onRecoveredStateBuffer(networkBuffer);
bufferManager->releaseFloatingBuffers();
LOG(inputGate->getOwningTaskName()<<"/"<< channelInfo.toString()<< " finished recovering input!");
}
}
std::optional<omnistream::BufferAndAvailability> RecoveredInputChannel::getNextRecoveredStateBuffer()
{
LOG("Recovered input channel get Next record buffer!");
Buffer *next = nullptr;
omnistream::ObjectBufferDataType nextDataType;
{
std::lock_guard<std::mutex> lock(bufferLock);
if (released) {
LOG("Trying to read from released RecoveredInputChannel");
}
if (receivedBuffers.empty()) {
return std::nullopt;
}
auto entry = std::move(receivedBuffers.front());
receivedBuffers.pop_front();
next = entry.buffer;
if (entry.owner) {
consumedRecoveredBufferOwners.emplace_back(std::move(entry.owner));
}
nextDataType = peekDataTypeUnsafe();
}
if (next == nullptr) {
LOG("Recovered input channel next ele is null! to test, send a end of recover event");
finishReadRecoveredState();
return std::nullopt;
} else if (isEndOfChannelStateEvent(next)) {
LOG("Recovered input channel end of event!");
stateConsumedFuture->Complete();
stateConsumedFuture1.store(true);
return omnistream::BufferAndAvailability{next, nextDataType, 0, sequenceNumber++};
} else {
return omnistream::BufferAndAvailability{next, nextDataType, 0, sequenceNumber++};
}
}
omnistream::ObjectBufferDataType RecoveredInputChannel::peekDataTypeUnsafe()
{
if (receivedBuffers.empty()) {
return ObjectBufferDataType(ObjectBufferDataType::NONE);
}
if (receivedBuffers.front().buffer == nullptr) {
return ObjectBufferDataType(ObjectBufferDataType::NONE);
}
return ObjectBufferDataType(receivedBuffers.front().buffer->GetDataType());
}
bool RecoveredInputChannel::isEndOfChannelStateEvent(Buffer *buffer)
{
if(buffer->isBuffer()){
return false;
}
std::shared_ptr<AbstractEvent> event = EventSerializer::fromBufferNotRecycle(buffer);
buffer->SetReaderIndex(0);
if (dynamic_cast<EndOfChannelStateEvent*>(event.get())) {
return true;
}
return false;
}
std::optional<BufferAndAvailability> RecoveredInputChannel::getNextBuffer()
{
checkError();
return getNextRecoveredStateBuffer();
}
void RecoveredInputChannel::releaseAllResources()
{
std::deque<Buffer *> releasedBuffers;
bool shouldRelease = false;
{
std::lock_guard<std::mutex> lock(bufferLock);
if (!released) {
released = true;
shouldRelease = true;
while (!receivedBuffers.empty()) {
auto entry = std::move(receivedBuffers.front());
receivedBuffers.pop_front();
releasedBuffers.emplace_back(entry.buffer);
}
consumedRecoveredBufferOwners.clear();
}
}
if (shouldRelease) {
bufferManager->releaseAllBuffers(releasedBuffers);
}
}
std::shared_ptr<omnistream::Buffer> RecoveredInputChannel::requestBufferBlocking()
{
LOG("RecoveredInputChannel requestBufferBlocking111")
if (!exclusiveBuffersAssigned) {
LOG("RecoveredInputChannel requestBufferBlocking222")
bufferManager->requestExclusiveBuffers(1);
exclusiveBuffersAssigned = true;
}
LOG("RecoveredInputChannel requestBufferBlocking333")
return bufferManager->requestBufferBlocking();
}