* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#pragma once
#include <chrono>
#include <atomic>
#include <exception>
#include "TimerService.h"
#include "core/utils/threads/CompletableFuture.h"
#include "ScheduledTaskExecutor.h"
#include "ProcessingTimeServiceUtil.h"
#include "common.h"
class SystemProcessingTimeService : public omnistream::runtime::TimerService {
public:
SystemProcessingTimeService() : status(0), timeService(1) {
quiesceCompletedFuture = std::make_shared<omnistream::CompletableFuture>();
}
~SystemProcessingTimeService() override {
SystemProcessingTimeService::shutdownService();
}
int64_t getCurrentProcessingTime() override {
return static_cast<int64_t>(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count());
}
bool isTerminated() override { return status.load() == STATUS_SHUTDOWN; }
void shutdownService() override {
if (status.exchange(STATUS_SHUTDOWN) != STATUS_SHUTDOWN) {
timeService.Shutdown();
}
}
bool shutdownServiceUninterruptible(long timeoutMs) override {
return false;
}
ScheduledFutureTask* registerTimer(int64_t timestamp, ProcessingTimeCallback *target) override {
if (status.load() != STATUS_ALIVE) {
return nullptr;
}
int64_t delay = ProcessingTimeServiceUtil::getProcessingTimeDelay(timestamp, getCurrentProcessingTime());
omnistream::Runnable* task = wrapOnTimerCallback(target, timestamp, 0);
return timeService.Schedule(task, delay);
}
ScheduledFutureTask* scheduleWithFixedDelay(ProcessingTimeCallback* callback,
long initialDelay, long period) override {
return scheduleRepeatedly(callback, initialDelay, period, true);
}
omnistream::Runnable* wrapOnTimerCallback(ProcessingTimeCallback* callback, long nextTimestamp, long period) {
return new ScheduledTask(status, callback, nextTimestamp, period);
}
private:
static const int STATUS_ALIVE = 0;
static const int STATUS_QUIESCED = 1;
static const int STATUS_SHUTDOWN = 2;
std::atomic<int> status;
ScheduledTaskExecutor timeService;
std::shared_ptr<omnistream::CompletableFuture> quiesceCompletedFuture;
class ScheduledTask : public omnistream::Runnable {
public:
ScheduledTask(std::atomic<int>& serviceStatus, ProcessingTimeCallback* callback, long timestamp,
long period) : serviceStatus(serviceStatus), callback(callback), nextTimestamp(timestamp), period(period)
{
}
void run()
{
if (serviceStatus.load() != SystemProcessingTimeService::STATUS_ALIVE) {
return;
}
try {
callback->OnProcessingTime(nextTimestamp);
} catch (const std::exception& e) {
LOG("Error: processing timer callback failed, timestamp="
<< nextTimestamp << ", error=" << e.what())
} catch (...) {
LOG("Error: processing timer callback failed, timestamp="
<< nextTimestamp << ", error=unknown")
}
nextTimestamp += period;
}
private:
std::atomic<int>& serviceStatus;
ProcessingTimeCallback* callback;
long nextTimestamp;
const long period;
};
ScheduledFutureTask* scheduleRepeatedly(ProcessingTimeCallback* callback, long initialDelay,
long period, bool fixedDelay)
{
long nextTimestamp = getCurrentProcessingTime() + initialDelay;
omnistream::Runnable* task = wrapOnTimerCallback(callback, nextTimestamp, period);
try {
if (fixedDelay) {
return timeService.ScheduleWithFixedDelay(task, initialDelay, period);
} else {
return timeService.ScheduleAtFixedRate(task, initialDelay, period);
}
} catch (const std::runtime_error &e) {
THROW_RUNTIME_ERROR("failed to schedule given task, current stats of ProcessingTimeService is " +
std::to_string(status.load()));
}
}
};