* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "OmniSourceStreamTask.h"
#include "common.h"
#include "core/utils/threads/CompletableFutureV2.h"
namespace omnistream {
StopMode FinishingReasonToStopMode(FinishingReason reason) {
switch (reason) {
case FinishingReason::END_OF_DATA:
return StopMode::DRAIN;
case FinishingReason::STOP_WITH_SAVEPOINT_DRAIN:
return StopMode::DRAIN;
case FinishingReason::STOP_WITH_SAVEPOINT_NO_DRAIN:
return StopMode::NO_DRAIN;
default:
return StopMode::DRAIN;
}
}
std::string FinishingReasonToString(FinishingReason reason) {
switch (reason) {
case FinishingReason::END_OF_DATA:
return "END_OF_DATA";
case FinishingReason::STOP_WITH_SAVEPOINT_DRAIN:
return "STOP_WITH_SAVEPOINT_DRAIN";
case FinishingReason::STOP_WITH_SAVEPOINT_NO_DRAIN:
return "STOP_WITH_SAVEPOINT_NO_DRAIN";
default:
return "UNKNOWN_FINISHING_REASON";
}
}
OmniSourceStreamTask::OmniSourceStreamTask(std::shared_ptr<RuntimeEnvironmentV2>& env, std::unique_ptr<Object> lockObject, int taskType):
OmniStreamTask(env, SynchronizedStreamTaskActionExecutor::synchronizedExecutor(&lockObject->mutex), taskType) {
this->lockObject_ = std::move(lockObject);
}
void OmniSourceStreamTask::init() {
OmniStreamTask::init();
}
void OmniSourceStreamTask::processInput(MailboxDefaultAction::Controller *controller) {
controller->suspendDefaultAction();
sourceThread_ = std::make_unique<std::thread>([this] {
runSourceInThread();
});
}
void OmniSourceStreamTask::runSourceInThread() {
try {
auto* mainOperator = dynamic_cast<StreamSource<omnistream::VectorBatch>*>(mainOperator_);
if (!mainOperator) {
THROW_RUNTIME_ERROR("mainOperator_ is not of type StreamSource<omnistream::VectorBatch>");
}
mainOperator->run(lockObject_.get());
CompleteProcessing();
auto completionMail = std::make_shared<VoidFunctionRunnable>([this]() {
mailboxProcessor_->suspend();
});
mainMailboxExecutor_->execute(completionMail, "Source completion");
} catch (const std::exception& e) {
auto errorMail = std::make_shared<VoidFunctionRunnable>([this, e]() {
mailboxProcessor_->reportThrowable(std::make_exception_ptr(e));
});
mainMailboxExecutor_->execute(errorMail, "Source error");
}
}
void OmniSourceStreamTask::CompleteProcessing() {
auto stopMode = FinishingReasonToStopMode(finishingReason);
auto completionFuture = std::make_shared<CompletableFutureV2<void>>();
mainMailboxExecutor_->execute(
std::make_shared<VoidFunctionRunnable>(
[this, stopMode, completionFuture]() {
try {
EndData(stopMode);
completionFuture->Complete();
} catch (...) {
completionFuture->CompleteExceptionally(std::current_exception());
}
},
"SourceStreamTask finished processing data"),
"SourceStreamTask finished processing data");
completionFuture->Get();
}
void OmniSourceStreamTask::AdvanceToEndOfEventTime() {
operatorChain->GetMainOperatorOutput()->emitWatermark(new Watermark(LONG_MAX));
}
const std::string OmniSourceStreamTask::getName() const {
return std::string("OmniSourceStreamTask");
}
void OmniSourceStreamTask::cancel() {
if (mainOperator_) {
auto* source = dynamic_cast<StreamSource<omnistream::VectorBatch>*>(mainOperator_);
if (source) {
source->cancel();
}
}
if (sourceThread_ && sourceThread_->joinable()) {
sourceThread_->join();
}
OmniStreamTask::cancel();
recordWriter_->cancel();
}
OmniSourceStreamTask::~OmniSourceStreamTask() {
if (sourceThread_ && sourceThread_->joinable()) {
sourceThread_->join();
}
}
}