* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#pragma once
#include "runtime/state/KeyGroupRange.h"
#include "streaming/api/operators/TimerHeapInternalTimer.h"
#include "InternalTimerService.h"
#include "runtime/state/heap/HeapPriorityQueueSet.h"
#include "streaming/api/operators/Triggerable.h"
#include "streaming/runtime/tasks/ProcessingTimeService.h"
#include "streaming/api/operators/KeyContext.h"
#include "InternalTimersSnapshot.h"
#include <cstdint>
#include <functional>
template<typename K, typename N>
class AggregateWindowOperator;
template <typename K, typename N>
class InternalTimerServiceImpl : public InternalTimerService<N>, public ProcessingTimeCallback {
public:
using ProcessingTimeTimersQueueType = KeyGroupedInternalPriorityQueue<std::shared_ptr<TimerHeapInternalTimer<K, N>>>;
using EventTimeTimersQueueType = KeyGroupedInternalPriorityQueue<std::shared_ptr<TimerHeapInternalTimer<K, N>>>;
InternalTimerServiceImpl(
KeyGroupRange *localKeyGroupRange,
KeyContext<K> *keyContext,
ProcessingTimeService *processingTimeService,
std::shared_ptr<ProcessingTimeTimersQueueType> processingTimeTimersQueue,
std::shared_ptr<EventTimeTimersQueueType> eventTimeTimersQueue);
~InternalTimerServiceImpl() override;
int64_t currentProcessingTime() override;
int64_t currentWatermark() override;
void advanceWatermark(int64_t time) override;
void startTimerService(
TypeSerializer *keySerializer, TypeSerializer *namespaceSerializer, Triggerable<K, N> *triggerTarget);
void registerProcessingTimeTimer(N nameSpace, int64_t time);
void deleteProcessingTimeTimer(N nameSpace, int64_t time);
void registerEventTimeTimer(N nameSpace, int64_t time);
void deleteEventTimeTimer(N nameSpace, int64_t time);
void deleteFirstEventTimeTimer();
InternalTimersSnapshot<K, N> snapshotTimersForKeyGroup(int32_t keyGroupId);
void restoreTimersForKeyGroup(const InternalTimersSnapshot<K, N> &timersSnapshot, int32_t keyGroupId);
TypeSerializer *getKeySerializer()
{
return keySerializer;
}
TypeSerializer *getNamespaceSerializer()
{
return namespaceSerializer;
}
private:
KeyContext<K> *keyContext = nullptr;
ProcessingTimeService *processingTimeService = nullptr;
KeyGroupRange *localKeyGroupRange = nullptr;
std::shared_ptr<ProcessingTimeTimersQueueType> processingTimeTimersQueue;
std::shared_ptr<EventTimeTimersQueueType> eventTimeTimersQueue;
InternalTimersSnapshot<K, N> restoredTimersSnapshot;
bool hasRestoredTimersSnapshot = false;
int32_t localKeyGroupRangeStartIndex{};
int64_t currentWatermarkValue = INT64_MIN;
Triggerable<K, N> *triggerTarget = nullptr;
TypeSerializer *keySerializer = nullptr;
TypeSerializer *namespaceSerializer = nullptr;
bool isInitialized{};
void OnProcessingTime(int64_t time) override;
};
template <typename K, typename N>
InternalTimerServiceImpl<K, N>::InternalTimerServiceImpl(
KeyGroupRange *localKeyGroupRange,
KeyContext<K> *keyContext,
ProcessingTimeService *processingTimeService,
std::shared_ptr<ProcessingTimeTimersQueueType> processingTimeTimersQueue,
std::shared_ptr<EventTimeTimersQueueType> eventTimeTimersQueue)
: keyContext(keyContext), processingTimeService(processingTimeService), localKeyGroupRange(localKeyGroupRange),
processingTimeTimersQueue(processingTimeTimersQueue), eventTimeTimersQueue(eventTimeTimersQueue),
isInitialized(false)
{
this->localKeyGroupRangeStartIndex =
localKeyGroupRange->getEndKeyGroup() == -1 ? -1 : localKeyGroupRange->getStartKeyGroup();
this->keySerializer = nullptr;
this->namespaceSerializer = nullptr;
this->triggerTarget = nullptr;
}
template <typename K, typename N>
InternalTimerServiceImpl<K, N>::~InternalTimerServiceImpl() {
}
template <typename K, typename N>
inline int64_t InternalTimerServiceImpl<K, N>::currentProcessingTime()
{
return processingTimeService->getCurrentProcessingTime();
}
template <typename K, typename N>
int64_t InternalTimerServiceImpl<K, N>::currentWatermark()
{
return currentWatermarkValue;
}
template <typename K, typename N>
void InternalTimerServiceImpl<K, N>::advanceWatermark(int64_t time)
{
currentWatermarkValue = time;
auto timer = eventTimeTimersQueue->peek();
while (!eventTimeTimersQueue->isEmpty() && timer->getTimestamp() <= time) {
eventTimeTimersQueue->poll();
keyContext->setCurrentKey(timer->getKey());
triggerTarget->onEventTime(timer.get());
timer = eventTimeTimersQueue->peek();
}
}
template <typename K, typename N>
void InternalTimerServiceImpl<K, N>::startTimerService(
TypeSerializer *keySerializer, TypeSerializer *namespaceSerializer, Triggerable<K, N> *triggerTarget)
{
if (!isInitialized) {
if (keySerializer == nullptr || namespaceSerializer == nullptr) {
THROW_LOGIC_EXCEPTION("The TimersService serializers cannot be null.");
}
if (this->keySerializer != nullptr || this->namespaceSerializer != nullptr || this->triggerTarget != nullptr) {
THROW_LOGIC_EXCEPTION("The TimersService has already been initialized.");
}
if (hasRestoredTimersSnapshot) {
auto restoredKeySerializer = restoredTimersSnapshot.getKeySerializer();
auto restoredNamespaceSerializer = restoredTimersSnapshot.getNamespaceSerializer();
if (restoredKeySerializer != nullptr &&
restoredKeySerializer->getBackendId() != BackendDataType::INVALID_BK &&
restoredKeySerializer->getBackendId() != keySerializer->getBackendId()) {
INFO_RELEASE("Error: startTimerService Restored timer key serializer is incompatible with requested timer service.");
THROW_LOGIC_EXCEPTION("Restored timer key serializer is incompatible with requested timer service.")
}
if (restoredNamespaceSerializer != nullptr &&
restoredNamespaceSerializer->getBackendId() != BackendDataType::INVALID_BK &&
restoredNamespaceSerializer->getBackendId() != namespaceSerializer->getBackendId()) {
INFO_RELEASE("Error: startTimerService Restored timer namespace serializer is incompatible with requested timer service");
THROW_LOGIC_EXCEPTION("Restored timer namespace serializer is incompatible with requested timer service.")
}
}
this->keySerializer = keySerializer;
this->namespaceSerializer = namespaceSerializer;
this->triggerTarget = triggerTarget;
auto headTimer = processingTimeTimersQueue->peek();
if (headTimer != nullptr) {
processingTimeService->registerTimer(headTimer->getTimestamp(), this);
}
this->isInitialized = true;
}
}
template <typename K, typename N>
void InternalTimerServiceImpl<K, N>::registerProcessingTimeTimer(N nameSpace, int64_t time)
{
auto oldHead = processingTimeTimersQueue->peek();
bool newHead = processingTimeTimersQueue->add(
std::make_shared<TimerHeapInternalTimer<K, N>>(time, keyContext->getCurrentKey(), nameSpace));
if (newHead) {
int64_t nextTriggerTime = oldHead != nullptr ? oldHead->getTimestamp() : INT64_MAX;
if (time < nextTriggerTime) {
processingTimeService->registerTimer(time, this);
}
}
}
template <typename K, typename N>
void InternalTimerServiceImpl<K, N>::OnProcessingTime(int64_t time)
{
auto timer = processingTimeTimersQueue->peek();
while (timer != nullptr && timer->getTimestamp() <= time) {
keyContext->setCurrentKey(timer->getKey());
processingTimeTimersQueue->poll();
triggerTarget->onProcessingTime(timer.get());
timer = processingTimeTimersQueue->peek();
}
if (timer != nullptr) {
processingTimeService->registerTimer(timer->getTimestamp(), this);
}
}
template <typename K, typename N>
void InternalTimerServiceImpl<K, N>::deleteProcessingTimeTimer(N nameSpace, int64_t time)
{
auto toRemove = std::make_shared<TimerHeapInternalTimer<K, N>>(time, keyContext->getCurrentKey(), nameSpace);
processingTimeTimersQueue->remove(toRemove);
}
template <typename K, typename N>
void InternalTimerServiceImpl<K, N>::registerEventTimeTimer(N nameSpace, int64_t time)
{
eventTimeTimersQueue->add(
std::make_shared<TimerHeapInternalTimer<K, N>>(time, keyContext->getCurrentKey(), nameSpace));
}
template <typename K, typename N>
void InternalTimerServiceImpl<K, N>::deleteEventTimeTimer(N nameSpace, int64_t time)
{
auto toRemove = std::make_shared<TimerHeapInternalTimer<K, N>>(time, keyContext->getCurrentKey(), nameSpace);
eventTimeTimersQueue->remove(toRemove);
}
template <typename K, typename N>
void InternalTimerServiceImpl<K, N>::deleteFirstEventTimeTimer()
{
eventTimeTimersQueue->poll();
}
template <typename K, typename N>
InternalTimersSnapshot<K, N> InternalTimerServiceImpl<K, N>::snapshotTimersForKeyGroup(int32_t keyGroupId)
{
InternalTimersSnapshot<K, N> snapshot;
snapshot.setKeySerializer(keySerializer);
snapshot.setNamespaceSerializer(namespaceSerializer);
auto eventSubset = eventTimeTimersQueue->getSubsetForKeyGroup(keyGroupId);
if (eventSubset != nullptr) {
for (const auto &timer : *eventSubset) {
snapshot.addEventTimeTimer(timer);
}
}
auto processingSubset = processingTimeTimersQueue->getSubsetForKeyGroup(keyGroupId);
if (processingSubset != nullptr) {
for (const auto &timer : *processingSubset) {
snapshot.addProcessingTimeTimer(timer);
}
}
return snapshot;
}
template <typename K, typename N>
void InternalTimerServiceImpl<K, N>::restoreTimersForKeyGroup(
const InternalTimersSnapshot<K, N> &timersSnapshot,
int32_t keyGroupId)
{
if (localKeyGroupRange != nullptr && !localKeyGroupRange->contains(keyGroupId)) {
INFO_RELEASE(
"Error: restoreTimersForKeyGroup Timer key-group " << keyGroupId << " is outside local key-group range "
<< localKeyGroupRange->ToString());
THROW_LOGIC_EXCEPTION("Timer key-group " << keyGroupId << " is outside local key-group range "
<< localKeyGroupRange->ToString())
}
restoredTimersSnapshot = timersSnapshot;
hasRestoredTimersSnapshot = true;
for (const auto &timer : timersSnapshot.getEventTimeTimers()) {
eventTimeTimersQueue->add(timer);
}
for (const auto &timer : timersSnapshot.getProcessingTimeTimers()) {
processingTimeTimersQueue->add(timer);
}
}