* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "LocalInputChannel.h"
#include <iostream>
#include <chrono>
#include <thread>
#include "runtime/partition/PartitionNotFoundException.h"
namespace omnistream {
LocalInputChannel::LocalInputChannel(std::shared_ptr<SingleInputGate> inputGate, int channelIndex,
ResultPartitionIDPOD partitionId, std::shared_ptr<ResultPartitionManager> _partitionManager,
int initialBackoff, int maxBackoff, std::shared_ptr<Counter> numBytesIn, std::shared_ptr<Counter> numBuffersIn
)
: InputChannel(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, numBytesIn, numBuffersIn),
partitionManager(_partitionManager),
channelStatePersister(nullptr)
{}
LocalInputChannel::LocalInputChannel(std::shared_ptr<SingleInputGate> inputGate, int channelIndex,
ResultPartitionIDPOD partitionId, std::shared_ptr<ResultPartitionManager> _partitionManager,
int initialBackoff, int maxBackoff, std::shared_ptr<Counter> numBytesIn, std::shared_ptr<Counter> numBuffersIn,
std::shared_ptr<ChannelStateWriter> stateWriter
)
: InputChannel(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, numBytesIn, numBuffersIn),
partitionManager(_partitionManager)
{
channelStatePersister = std::make_shared<ChannelStatePersister>(stateWriter, getChannelInfo());
}
void LocalInputChannel::CheckpointStarted(const CheckpointBarrier& barrier)
{
if (!channelStatePersister) {
INFO_RELEASE("LocalInputChannel::CheckpointStarted skipped because channelStatePersister is not initialized.");
return;
}
std::vector<Buffer*> knownBuffers;
channelStatePersister->StartPersisting(barrier.GetId(), knownBuffers);
}
void LocalInputChannel::SetChannelStateWriter(std::shared_ptr<ChannelStateWriter> channelStateWriter)
{
channelStatePersister = std::make_shared<ChannelStatePersister>(channelStateWriter, getChannelInfo());
}
void LocalInputChannel::CheckpointStopped(long checkpointId) {
if (!channelStatePersister) {
INFO_RELEASE("LocalInputChannel::CheckpointStopped skipped because channelStatePersister is not initialized.");
return;
}
channelStatePersister->StopPersisting(checkpointId);
}
void LocalInputChannel::requestSubpartition(int subpartitionIndex)
{
bool retriggerRequestFlag = false;
bool notifyDataAvailableFlag = false;
LOG_PART("requestSubpartition with " << std::to_string(subpartitionIndex))
LOCK_BEFORE()
std::lock_guard<std::recursive_mutex> lock(requestLock);
LOCK_AFTER()
if (isReleased_.load()) {
throw std::runtime_error("LocalInputChannel has been released already");
}
if (!subpartitionView) {
LOG("partitionManager->createSubpartitionView partitionId " << partitionId.toString() << " subpartitionIndex "
<< std::to_string(subpartitionIndex))
LOG_PART("partitionManager " << std::to_string(reinterpret_cast<long>(partitionManager.get())))
try {
auto subpartitionViewTmp = partitionManager->createSubpartitionView(
partitionId, subpartitionIndex,
this);
LOG("after partitionManager->createSubpartitionView subpartitionView "
<< std::to_string(reinterpret_cast<long>(subpartitionView.get())));
if (!subpartitionViewTmp) {
THROW_RUNTIME_ERROR("Error requesting subpartition.");
}
subpartitionView = subpartitionViewTmp;
if (isReleased_.load()) {
subpartitionView->releaseAllResources();
subpartitionView.reset();
} else {
notifyDataAvailableFlag = true;
}
} catch (const PartitionNotFoundException &e) {
if (increaseBackoff()) {
LOG("retriggerRequest happens and currentBackoff is " << std::to_string(currentBackoff))
retriggerRequestFlag = true;
} else {
LOG("after doing enought retriggerRequest failed, throw PartitionNotFoundException to " <<
"let current task stop and currentBackoff is " << std::to_string(currentBackoff))
throw e;
}
}
if (notifyDataAvailableFlag) {
notifyDataAvailable();
}
if (retriggerRequestFlag) {
inputGate->retriggerPartitionRequest(partitionId.getPartitionId());
}
}
}
void LocalInputChannel::retriggerSubpartitionRequest(
std::shared_ptr<std::chrono::steady_clock::time_point> timer, int subpartitionIndex)
{
LOCK_BEFORE()
std::lock_guard<std::recursive_mutex> lock(requestLock);
LOCK_AFTER()
if (subpartitionView) {
throw std::runtime_error("already requested partition");
}
std::thread([this, subpartitionIndex]() {
INFO_RELEASE("LocalInputChannel sleep time: " << std::to_string(getCurrentBackoff()))
std::this_thread::sleep_for(std::chrono::milliseconds(getCurrentBackoff()));
try {
requestSubpartition(subpartitionIndex);
} catch (...) {
std::exception_ptr eptr = std::current_exception();
setError(eptr);
}
}).detach();
}
std::optional<BufferAndAvailability> LocalInputChannel::getNextBuffer()
{
checkError();
std::shared_ptr<ResultSubpartitionView> subpartitionViewPtr = subpartitionView;
if (!subpartitionViewPtr) {
if (isReleased_.load()) {
return std::nullopt;
}
subpartitionViewPtr = checkAndWaitForSubpartitionView();
}
LOG("subpartitionViewPtr.get()" << subpartitionViewPtr.get())
BufferAndBacklog* next = subpartitionViewPtr->getNextBuffer();
while (next) {
Buffer* emptyCandidate = next->getBuffer();
if (emptyCandidate == nullptr || emptyCandidate->GetSize() != 0) {
break;
}
emptyCandidate->RecycleBuffer();
delete emptyCandidate;
delete next;
next = subpartitionViewPtr->getNextBuffer();
numBuffersIn->Inc();
}
if (!next) {
if (subpartitionViewPtr->isReleased()) {
THROW_LOGIC_EXCEPTION("TBD")
} else {
return std::nullopt;
}
}
LOG("before LocalInputChannel::next->getBuffer()")
Buffer* buffer = next->getBuffer();
if (buffer == nullptr) {
delete next;
return std::nullopt;
}
LOG("after LocalInputChannel::next->getBuffer()")
if (auto fileRegionBuffer = std::dynamic_pointer_cast<FileRegionBuffer>(buffer)) {
buffer = fileRegionBuffer->readInto(inputGate->getUnpooledSegment());
}
**/
LOG("after LocalInputChannel::next->getBuffer() 2")
if (channelStatePersister) {
channelStatePersister->CheckForBarrier(buffer);
channelStatePersister->MaybePersist(buffer);
}
if (next->getNextDataType().isEvent()) {
LOG_TRACE("event buffer " << buffer->ToDebugString(false))
}
const ObjectBufferDataType &bufferDataType = next->getNextDataType();
int buffersInBacklog = next->getBuffersInBacklog();
int sequenceNumber = next->getSequenceNumber();
delete next;
return BufferAndAvailability{
buffer, bufferDataType, buffersInBacklog, sequenceNumber};
}
void LocalInputChannel::notifyDataAvailable()
{
LOG("LocalInputChannel notifyDataAvailable invoke!");
notifyChannelNonEmpty();
}
void LocalInputChannel::notifyPriorityEvent(int prioritySequenceNumber)
{
NotifyPriorityEvent(prioritySequenceNumber);
}
void LocalInputChannel::ConvertToPriorityEvent(int sequenceNumber)
{
if (isReleased_) {
return;
}
if (subpartitionView) {
LOG("subpartitionView->ConvertToPriorityEvent(sequenceNumber)")
subpartitionView->ConvertToPriorityEvent(sequenceNumber);
}
if (inputGate) {
LOG("inputGate->notifyPriorityEventForce")
inputGate->notifyPriorityEventForce(InputChannel::shared_from_this());
}
}
std::shared_ptr<ResultSubpartitionView> LocalInputChannel::checkAndWaitForSubpartitionView()
{
LOCK_BEFORE()
std::lock_guard<std::recursive_mutex> lock(requestLock);
LOCK_AFTER()
if (isReleased_.load()) {
throw std::runtime_error("released");
}
if (!subpartitionView) {
throw std::runtime_error("Queried for a buffer before requesting the subpartition.");
}
return subpartitionView;
}
void LocalInputChannel::resumeConsumption()
{
if (isReleased_.load()) {
throw std::runtime_error("Channel released.");
}
if (subpartitionView) {
subpartitionView->resumeConsumption();
if (subpartitionView->getAvailabilityAndBacklog(INT_MAX).getIsAvailable()) {
notifyChannelNonEmpty();
}
}
}
void LocalInputChannel::acknowledgeAllRecordsProcessed()
{
if (isReleased_.load()) {
throw std::runtime_error("Channel released.");
}
if (subpartitionView) {
subpartitionView->acknowledgeAllDataProcessed();
}
}
void LocalInputChannel::sendTaskEvent(TaskEvent event) {
checkError();
if (!subpartitionView) {
throw std::runtime_error("Tried to send task event to producer before requesting the subpartition.");
}
if (!taskEventPublisher.publish(partitionId, event)) {
throw std::runtime_error("Error while publishing event to producer. The producer could not be found.");
}
}
**/
bool LocalInputChannel::isReleased()
{
return isReleased_.load();
}
void LocalInputChannel::releaseAllResources()
{
if (!isReleased_.load()) {
isReleased_.store(true);
if (subpartitionView) {
subpartitionView->releaseAllResources();
subpartitionView.reset();
}
}
}
void LocalInputChannel::announceBufferSize(int newBufferSize)
{
if (isReleased_.load()) {
throw std::runtime_error("Channel released.");
}
if (subpartitionView) {
subpartitionView->notifyNewBufferSize(newBufferSize);
}
}
int LocalInputChannel::getBuffersInUseCount()
{
if (subpartitionView) {
return subpartitionView->getNumberOfQueuedBuffers();
}
return 0;
}
int LocalInputChannel::unsynchronizedGetNumberOfQueuedBuffers()
{
if (subpartitionView) {
return subpartitionView->unsynchronizedGetNumberOfQueuedBuffers();
}
return 0;
}
std::string LocalInputChannel::toString()
{
return "LocalInputChannel [" + partitionId.toString() + "]";
}
std::shared_ptr<ResultSubpartitionView> LocalInputChannel::getSubpartitionView()
{
return subpartitionView;
}
void LocalInputChannel::notifyBufferAvailable(int subpartitionId)
{}
void LocalInputChannel::sendTaskEvent(std::shared_ptr<TaskEvent> event)
{}
}