* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#pragma once
#include "streaming/api/watermark/Watermark.h"
#include "InternalTimerService.h"
#include "core/typeutils/TypeSerializer.h"
#include "InternalTimerServiceImpl.h"
#include "streaming/api/operators/KeyContext.h"
#include "TimerSerializer.h"
#include "streaming/runtime/tasks/ProcessingTimeService.h"
#include "runtime/state/AbstractKeyedStateBackend.h"
#include "runtime/state/KeyedStateCheckpointOutputStream.h"
#include "runtime/state/KeyGroupsStateHandle.h"
#include "runtime/state/TimerConsistencyCheckControl.h"
#include "runtime/state/restore/RawKeyedStateInputStreamProxy.h"
#include "runtime/state/bridge/OmniTaskBridge.h"
#include "InternalTimerServiceSerializationProxy.h"
#include "InternalTimersSnapshotReaderWriters.h"
#include "core/utils/type_traits_ext.h"
#include <algorithm>
#include <cstddef>
#include <limits>
#include <memory>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <vector>
#include <utility>
template <typename K>
class InternalTimeServiceManager {
public:
inline static std::string TIMER_STATE_PREFIX = "_timer_state";
inline static std::string PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";
inline static std::string EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";
InternalTimeServiceManager(
KeyGroupRange* localKeyGroupRange,
KeyContext<K>* keyContext,
PriorityQueueSetFactory* priorityQueueSetFactory,
ProcessingTimeService* processingTimeService,
int maxNumberOfSubtasks,
const std::vector<std::shared_ptr<KeyedStateHandle>> &rawKeyedStateHandles = {},
std::shared_ptr<omnistream::OmniTaskBridge> omniTaskBridge = nullptr)
:
localKeyGroupRange(localKeyGroupRange),
keyContext(keyContext),
priorityQueueSetFactory(priorityQueueSetFactory),
processingTimeService(processingTimeService),
maxNumberOfSubtasks(maxNumberOfSubtasks),
omniTaskBridge(std::move(omniTaskBridge)) {
if (auto keyedBackend = dynamic_cast<AbstractKeyedStateBackend<K>*>(priorityQueueSetFactory)) {
keySerializerForRestore = keyedBackend->getKeySerializer();
}
restoreRawKeyedState(rawKeyedStateHandles);
}
~InternalTimeServiceManager() {
for (const auto &it : timerServicesInt64) {
delete it.second;
}
timerServicesInt64.clear();
for (const auto &it : timerServicesWindow) {
delete it.second;
}
timerServicesWindow.clear();
for (const auto &it : timerServicesVoidNameSpace) {
delete it.second;
}
timerServicesVoidNameSpace.clear();
}
template <typename N>
InternalTimerServiceImpl<K, N> *getInternalTimerService(
std::string name,
TypeSerializer *keySerializer,
TypeSerializer *namespaceSerializer,
Triggerable<K, N> *triggerable);
void advanceWatermark(Watermark *watermark);
void snapshotToRawKeyedState(
KeyedStateCheckpointOutputStream *stateCheckpointOutputStream,
std::string operatorName,
int64_t checkpointId = -1);
int32_t getRegisteredTimerServiceCount() const;
void writeTimersForKeyGroup(KeyedStateCheckpointOutputStream *out, int32_t keyGroupIdx);
void readTimersForKeyGroup(RawKeyedStateInputStreamProxy *in, int32_t keyGroupIdx, int32_t version);
private:
enum class TimerNamespaceKind {
INT64,
TIME_WINDOW,
VOID_NAMESPACE
};
struct TimerDataSample {
bool present = false;
bool eventTimer = false;
int32_t keyGroupId = -1;
int64_t timestamp = -1;
uint64_t sampleRankHash = std::numeric_limits<uint64_t>::max();
uint64_t timerHash = 0;
uint64_t keyHash = 0;
uint64_t namespaceHash = 0;
int32_t keyBytesLength = 0;
int32_t namespaceBytesLength = 0;
std::string keyPreviewHex;
std::string namespacePreviewHex;
};
struct TimerDataDigest {
int32_t keyGroupCount = 0;
int32_t nonEmptyKeyGroupCount = 0;
int64_t eventTimerCount = 0;
int64_t processingTimerCount = 0;
uint64_t eventXorHash = 0;
uint64_t eventSumHash = 0;
uint64_t processingXorHash = 0;
uint64_t processingSumHash = 0;
uint64_t keyGroupXorHash = 0;
uint64_t keyGroupSumHash = 0;
int64_t eventMinTimestamp = std::numeric_limits<int64_t>::max();
int64_t eventMaxTimestamp = std::numeric_limits<int64_t>::min();
int64_t processingMinTimestamp = std::numeric_limits<int64_t>::max();
int64_t processingMaxTimestamp = std::numeric_limits<int64_t>::min();
TimerDataSample sample;
int64_t totalTimerCount() const
{
return eventTimerCount + processingTimerCount;
}
void addTimer(
bool eventTimer,
int64_t timestamp,
uint64_t timerHash,
uint64_t keyHash,
uint64_t namespaceHash,
int32_t keyBytesLength,
int32_t namespaceBytesLength,
std::string keyPreviewHex,
std::string namespacePreviewHex)
{
if (eventTimer) {
++eventTimerCount;
eventXorHash ^= mixHash(timerHash);
eventSumHash += timerHash;
eventMinTimestamp = std::min(eventMinTimestamp, timestamp);
eventMaxTimestamp = std::max(eventMaxTimestamp, timestamp);
} else {
++processingTimerCount;
processingXorHash ^= mixHash(timerHash);
processingSumHash += timerHash;
processingMinTimestamp = std::min(processingMinTimestamp, timestamp);
processingMaxTimestamp = std::max(processingMaxTimestamp, timestamp);
}
TimerDataSample candidate;
candidate.present = true;
candidate.eventTimer = eventTimer;
candidate.timestamp = timestamp;
candidate.sampleRankHash = mixHash(timerHash);
candidate.timerHash = timerHash;
candidate.keyHash = keyHash;
candidate.namespaceHash = namespaceHash;
candidate.keyBytesLength = keyBytesLength;
candidate.namespaceBytesLength = namespaceBytesLength;
candidate.keyPreviewHex = std::move(keyPreviewHex);
candidate.namespacePreviewHex = std::move(namespacePreviewHex);
considerSample(std::move(candidate));
}
void mergeKeyGroupDigest(int32_t keyGroupId, const TimerDataDigest &keyGroupDigest)
{
++keyGroupCount;
if (keyGroupDigest.totalTimerCount() > 0) {
++nonEmptyKeyGroupCount;
}
uint64_t keyGroupHash = hashKeyGroupDigest(keyGroupId, keyGroupDigest);
keyGroupXorHash ^= mixHash(keyGroupHash);
keyGroupSumHash += keyGroupHash;
eventTimerCount += keyGroupDigest.eventTimerCount;
processingTimerCount += keyGroupDigest.processingTimerCount;
eventXorHash ^= keyGroupDigest.eventXorHash;
eventSumHash += keyGroupDigest.eventSumHash;
processingXorHash ^= keyGroupDigest.processingXorHash;
processingSumHash += keyGroupDigest.processingSumHash;
if (keyGroupDigest.eventTimerCount > 0) {
eventMinTimestamp = std::min(eventMinTimestamp, keyGroupDigest.eventMinTimestamp);
eventMaxTimestamp = std::max(eventMaxTimestamp, keyGroupDigest.eventMaxTimestamp);
}
if (keyGroupDigest.processingTimerCount > 0) {
processingMinTimestamp = std::min(processingMinTimestamp, keyGroupDigest.processingMinTimestamp);
processingMaxTimestamp = std::max(processingMaxTimestamp, keyGroupDigest.processingMaxTimestamp);
}
if (keyGroupDigest.sample.present) {
TimerDataSample candidate = keyGroupDigest.sample;
candidate.keyGroupId = keyGroupId;
considerSample(std::move(candidate));
}
}
private:
void considerSample(TimerDataSample candidate)
{
if (!candidate.present) {
return;
}
if (!sample.present || candidate.sampleRankHash < sample.sampleRankHash) {
sample = std::move(candidate);
}
}
static uint64_t mixHash(uint64_t value)
{
value ^= value >> 33U;
value *= 0xff51afd7ed558ccdULL;
value ^= value >> 33U;
value *= 0xc4ceb9fe1a85ec53ULL;
value ^= value >> 33U;
return value;
}
static uint64_t hashKeyGroupDigest(int32_t keyGroupId, const TimerDataDigest &digest)
{
uint64_t value = mixHash(static_cast<uint64_t>(static_cast<uint32_t>(keyGroupId)));
value ^= mixHash(static_cast<uint64_t>(digest.eventTimerCount));
value ^= mixHash(static_cast<uint64_t>(digest.processingTimerCount));
value ^= mixHash(digest.eventXorHash);
value ^= mixHash(digest.eventSumHash);
value ^= mixHash(digest.processingXorHash);
value ^= mixHash(digest.processingSumHash);
return value;
}
};
emhash7::HashMap<std::string, InternalTimerService<int64_t>*> timerServicesInt64;
emhash7::HashMap<std::string, InternalTimerService<TimeWindow>*> timerServicesWindow;
emhash7::HashMap<std::string, InternalTimerService<VoidNamespace>*> timerServicesVoidNameSpace;
KeyGroupRange* localKeyGroupRange;
KeyContext<K>* keyContext;
PriorityQueueSetFactory* priorityQueueSetFactory;
ProcessingTimeService* processingTimeService;
int maxNumberOfSubtasks;
std::shared_ptr<omnistream::OmniTaskBridge> omniTaskBridge;
TypeSerializer *keySerializerForRestore = nullptr;
int64_t activeSnapshotCheckpointId = -1;
int64_t snapshotEventTimerCount = 0;
int64_t snapshotProcessingTimerCount = 0;
int32_t snapshotWrittenKeyGroupCount = 0;
int32_t snapshotNonEmptyKeyGroupCount = 0;
int32_t snapshotNonEmptyServiceKeyGroupCount = 0;
int64_t restoreEventTimerCount = 0;
int64_t restoreProcessingTimerCount = 0;
int32_t restoreReadKeyGroupCount = 0;
int32_t restoreNonEmptyKeyGroupCount = 0;
int32_t restoreNonEmptyServiceKeyGroupCount = 0;
int32_t restoreSkippedHandleCount = 0;
int32_t activeSnapshotKeyGroupId = -1;
std::unordered_map<std::string, TimerDataDigest> snapshotTimerDataDigests;
std::unordered_map<std::string, TimerDataDigest> restoreTimerDataDigests;
static constexpr int MIN_CAPACITY_OF_TIMER_QUEUE = 128;
template <typename N>
InternalTimerServiceImpl<K, N>* registerOrGetTimerService(std::string name, TimerSerializer<K, N>* timerSerializer);
template <typename N>
InternalTimerServiceImpl<K, N>* registerOrGetTimerServiceForRestore(
const std::string &name,
TypeSerializer *keySerializer,
TypeSerializer *namespaceSerializer);
template <typename N>
void writeTimerServiceMap(KeyedStateCheckpointOutputStream *out,
emhash7::HashMap<std::string, InternalTimerService<N>*> &services,
int32_t keyGroupIdx);
void restoreRawKeyedState(const std::vector<std::shared_ptr<KeyedStateHandle>> &rawKeyedStateHandles);
void restoreRawKeyGroupState(const std::shared_ptr<KeyGroupsStateHandle> &keyGroupsStateHandle);
void restoreStateForKeyGroup(RawKeyedStateInputStreamProxy *in, int32_t keyGroupIdx);
TimerNamespaceKind inferNamespaceKind(
const FlinkTimerSerializerSnapshots::SnapshotDescriptor &namespaceSerializerSnapshot);
template <typename N>
void readTimerServiceForNamespace(
RawKeyedStateInputStreamProxy *in,
const std::string &serviceName,
FlinkTimerSerializerSnapshots::SnapshotDescriptor keySerializerSnapshot,
FlinkTimerSerializerSnapshots::SnapshotDescriptor namespaceSerializerSnapshot,
int32_t keyGroupIdx,
int32_t version);
template <typename N>
TimerDataDigest buildTimerDataDigest(const InternalTimersSnapshot<K, N> &timersSnapshot);
template <typename N>
void addTimersToDigest(
TimerDataDigest &digest,
const std::vector<typename InternalTimersSnapshot<K, N>::TimerPtr> &timers,
TypeSerializer *keySerializer,
TypeSerializer *namespaceSerializer,
bool eventTimer);
template <typename T>
static std::vector<uint8_t> serializeTimerValueForDigest(TypeSerializer *serializer, T value);
static uint64_t hashTimerBytes(
bool eventTimer,
int64_t timestamp,
const std::vector<uint8_t> &keyBytes,
const std::vector<uint8_t> &namespaceBytes);
static uint64_t hashBytesForDigestLog(const std::vector<uint8_t> &bytes);
static std::string bytesPreviewHex(const std::vector<uint8_t> &bytes, size_t maxBytes);
static uint64_t fnv1aUpdate(uint64_t hash, const uint8_t *data, size_t length);
static uint64_t fnv1aUpdateInt64(uint64_t hash, int64_t value);
static uint64_t fnv1aUpdateInt32(uint64_t hash, int32_t value);
static std::string namespaceKindName(TimerNamespaceKind namespaceKind);
static std::string digestMapKey(const std::string &serviceName, TimerNamespaceKind namespaceKind);
static int64_t printableMinTimestamp(int64_t count, int64_t value);
static int64_t printableMaxTimestamp(int64_t count, int64_t value);
void mergeTimerDataDigest(
std::unordered_map<std::string, TimerDataDigest> &target,
const std::string &serviceName,
TimerNamespaceKind namespaceKind,
int32_t keyGroupIdx,
const TimerDataDigest &keyGroupDigest);
void logTimerDataDigest(
const char *phase,
const std::string &operatorName,
int64_t checkpointId,
const std::string &serviceName,
TimerNamespaceKind namespaceKind,
const TimerDataDigest &digest);
void logTimerDataDigests(
const char *phase,
const std::string &operatorName,
int64_t checkpointId,
const std::unordered_map<std::string, TimerDataDigest> &digests);
void logTimerDataSample(
const char *phase,
const std::string &operatorName,
int64_t checkpointId,
const std::string &serviceName,
const std::string &namespaceKind,
const TimerDataDigest &digest);
};
template <typename K>
void InternalTimeServiceManager<K>::advanceWatermark(Watermark *watermark) {
for (auto it = timerServicesWindow.begin(); it != timerServicesWindow.end(); it++) {
it->second->advanceWatermark(watermark->getTimestamp());
}
for (auto it = timerServicesVoidNameSpace.begin(); it != timerServicesVoidNameSpace.end(); it++) {
it->second->advanceWatermark(watermark->getTimestamp());
}
for (auto it = timerServicesInt64.begin(); it != timerServicesInt64.end(); it++) {
it->second->advanceWatermark(watermark->getTimestamp());
}
}
template <typename K>
template <typename N>
InternalTimerServiceImpl<K, N>* InternalTimeServiceManager<K>::getInternalTimerService(std::string name,
TypeSerializer* keySerializer, TypeSerializer* namespaceSerializer, Triggerable<K, N>* triggerable) {
auto timerSerializer = new TimerSerializer<K, N>(keySerializer, namespaceSerializer);
InternalTimerServiceImpl<K, N>* timerService = this->template registerOrGetTimerService<N>(name, timerSerializer);
timerService->startTimerService(keySerializer, namespaceSerializer, triggerable);
return timerService;
}
template <typename K>
template <typename N>
InternalTimerServiceImpl<K, N> *InternalTimeServiceManager<K>::registerOrGetTimerService(
std::string name, TimerSerializer<K, N>* timerSerializer) {
auto& timerServicesMap = [&]() -> auto& {
if constexpr (std::is_same_v<N, int64_t>) {
return timerServicesInt64;
} else if constexpr (std::is_same_v<N, VoidNamespace>) {
return timerServicesVoidNameSpace;
} else if constexpr (std::is_same_v<N, TimeWindow>) {
return timerServicesWindow;
} else {
THROW_LOGIC_EXCEPTION("Unsupported namespace type, timerService name: " << name)
}
}();
InternalTimerServiceImpl<K, N>* timerService;
using TimerType = TimerHeapInternalTimer<K, N>;
using TimerComparator = typename TimerType::MinHeapComparator;
auto it = timerServicesMap.find(name);
if (it == timerServicesMap.end()) {
if (auto factory = dynamic_cast<HeapKeyedStateBackend<K>*>(priorityQueueSetFactory)) {
auto processingTimerQueue = factory->template create<std::shared_ptr<TimerType>, TimerComparator>(
PROCESSING_TIMER_PREFIX + name,
timerSerializer);
auto eventTimerQueue = factory->template create<std::shared_ptr<TimerType>, TimerComparator>(
EVENT_TIMER_PREFIX + name,
timerSerializer);
timerService = new InternalTimerServiceImpl<K, N>(localKeyGroupRange, keyContext, processingTimeService, processingTimerQueue, eventTimerQueue);
timerServicesMap.emplace(name, timerService);
INFO_RELEASE("TIMER_CP_QUEUE_CREATE backend=HEAP_KEYED"
<< ", managerPtr=" << reinterpret_cast<uintptr_t>(this)
<< ", timerServicePtr=" << reinterpret_cast<uintptr_t>(timerService)
<< ", timerServiceName=" << name
<< ", processingStateName=" << PROCESSING_TIMER_PREFIX + name
<< ", eventStateName=" << EVENT_TIMER_PREFIX + name
<< ", registeredTimerServiceCountNow=" << getRegisteredTimerServiceCount());
} else if (auto factory = dynamic_cast<RocksdbKeyedStateBackend<K>*>(priorityQueueSetFactory)) {
auto processingTimerQueue = factory->template create<std::shared_ptr<TimerType>, TimerComparator>(
PROCESSING_TIMER_PREFIX + name,
timerSerializer);
auto eventTimerQueue = factory->template create<std::shared_ptr<TimerType>, TimerComparator>(
EVENT_TIMER_PREFIX + name,
timerSerializer);
timerService = new InternalTimerServiceImpl<K, N>(localKeyGroupRange, keyContext, processingTimeService, processingTimerQueue, eventTimerQueue);
timerServicesMap.emplace(name, timerService);
INFO_RELEASE("TIMER_CP_QUEUE_CREATE backend=ROCKSDB_KEYED"
<< ", managerPtr=" << reinterpret_cast<uintptr_t>(this)
<< ", timerServicePtr=" << reinterpret_cast<uintptr_t>(timerService)
<< ", timerServiceName=" << name
<< ", processingStateName=" << PROCESSING_TIMER_PREFIX + name
<< ", eventStateName=" << EVENT_TIMER_PREFIX + name
<< ", registeredTimerServiceCountNow=" << getRegisteredTimerServiceCount());
} else {
THROW_LOGIC_EXCEPTION("Unsupported priorityQueueSetFactory")
}
} else {
timerService = static_cast<InternalTimerServiceImpl<K, N>*>(it->second);
}
return timerService;
}
template <typename K>
template <typename N>
InternalTimerServiceImpl<K, N>* InternalTimeServiceManager<K>::registerOrGetTimerServiceForRestore(
const std::string &name,
TypeSerializer *keySerializer,
TypeSerializer *namespaceSerializer)
{
auto timerSerializer = new TimerSerializer<K, N>(keySerializer, namespaceSerializer);
return this->template registerOrGetTimerService<N>(name, timerSerializer);
}
template <typename K>
void InternalTimeServiceManager<K>::restoreRawKeyedState(
const std::vector<std::shared_ptr<KeyedStateHandle>> &rawKeyedStateHandles)
{
restoreEventTimerCount = 0;
restoreProcessingTimerCount = 0;
restoreReadKeyGroupCount = 0;
restoreNonEmptyKeyGroupCount = 0;
restoreNonEmptyServiceKeyGroupCount = 0;
restoreSkippedHandleCount = 0;
restoreTimerDataDigests.clear();
if (rawKeyedStateHandles.empty()) {
INFO_RELEASE("TIMER_CP_RESTORE_SUMMARY rawHandleCount=0"
<< ", managerPtr=" << reinterpret_cast<uintptr_t>(this)
<< ", skippedHandleCount=0"
<< ", restoredKeyGroupCount=0"
<< ", nonEmptyKeyGroupCount=0"
<< ", nonEmptyServiceKeyGroupCount=0"
<< ", eventTimerCount=0"
<< ", processingTimerCount=0"
<< ", totalTimerCount=0"
<< ", localKeyGroupRange=" << localKeyGroupRange->ToString());
return;
}
for (const auto &handle : rawKeyedStateHandles) {
if (handle == nullptr) {
++restoreSkippedHandleCount;
continue;
}
auto intersection = handle->GetIntersection(*localKeyGroupRange);
if (intersection == nullptr) {
++restoreSkippedHandleCount;
continue;
}
auto keyGroupsStateHandle = std::dynamic_pointer_cast<KeyGroupsStateHandle>(intersection);
if (keyGroupsStateHandle == nullptr) {
INFO_RELEASE("Error: restoreRawKeyedState Raw keyed timer restore only supports KeyGroupsStateHandle.");
THROW_LOGIC_EXCEPTION("Raw keyed timer restore only supports KeyGroupsStateHandle.")
}
restoreRawKeyGroupState(keyGroupsStateHandle);
}
INFO_RELEASE("TIMER_CP_RESTORE_SUMMARY rawHandleCount=" << rawKeyedStateHandles.size()
<< ", managerPtr=" << reinterpret_cast<uintptr_t>(this)
<< ", skippedHandleCount=" << restoreSkippedHandleCount
<< ", restoredKeyGroupCount=" << restoreReadKeyGroupCount
<< ", nonEmptyKeyGroupCount=" << restoreNonEmptyKeyGroupCount
<< ", nonEmptyServiceKeyGroupCount=" << restoreNonEmptyServiceKeyGroupCount
<< ", eventTimerCount=" << restoreEventTimerCount
<< ", processingTimerCount=" << restoreProcessingTimerCount
<< ", totalTimerCount=" << (restoreEventTimerCount + restoreProcessingTimerCount)
<< ", localKeyGroupRange=" << localKeyGroupRange->ToString());
if (TimerConsistencyCheckControl::timerConsistencyCheckEnabled) {
logTimerDataDigests("restore", "", -1, restoreTimerDataDigests);
}
}
template <typename K>
void InternalTimeServiceManager<K>::restoreRawKeyGroupState(
const std::shared_ptr<KeyGroupsStateHandle> &keyGroupsStateHandle)
{
KeyGroupRange keyGroupRange = keyGroupsStateHandle->GetKeyGroupRange();
RawKeyedStateInputStreamProxy input(omniTaskBridge, keyGroupsStateHandle);
for (int32_t keyGroupIdx = keyGroupRange.getStartKeyGroup();
keyGroupIdx <= keyGroupRange.getEndKeyGroup();
++keyGroupIdx) {
int64_t offset = keyGroupsStateHandle->getOffsetForKeyGroup(keyGroupIdx);
if (offset < 0) {
continue;
}
input.seek(offset);
restoreStateForKeyGroup(&input, keyGroupIdx);
}
}
template <typename K>
void InternalTimeServiceManager<K>::restoreStateForKeyGroup(
RawKeyedStateInputStreamProxy *in,
int32_t keyGroupIdx)
{
InternalTimerServiceSerializationProxy<K> proxy(this, keyGroupIdx);
proxy.read(in);
}
template <typename K>
typename InternalTimeServiceManager<K>::TimerNamespaceKind
InternalTimeServiceManager<K>::inferNamespaceKind(
const FlinkTimerSerializerSnapshots::SnapshotDescriptor &namespaceSerializerSnapshot)
{
const std::string &className = namespaceSerializerSnapshot.className;
if (className == FlinkTimerSerializerSnapshots::VOID_NAMESPACE_SERIALIZER_SNAPSHOT) {
return TimerNamespaceKind::VOID_NAMESPACE;
}
if (className == FlinkTimerSerializerSnapshots::TIME_WINDOW_SERIALIZER_SNAPSHOT) {
return TimerNamespaceKind::TIME_WINDOW;
}
if (className == FlinkTimerSerializerSnapshots::LONG_SERIALIZER_SNAPSHOT) {
return TimerNamespaceKind::INT64;
}
INFO_RELEASE("Error: inferNamespaceKind Unsupported timer namespace serializer snapshot class: " << className);
THROW_LOGIC_EXCEPTION("Unsupported timer namespace serializer snapshot class: " << className)
}
template <typename K>
void InternalTimeServiceManager<K>::readTimersForKeyGroup(
RawKeyedStateInputStreamProxy *in,
int32_t keyGroupIdx,
int32_t version)
{
int32_t serviceCount = in->readInt();
++restoreReadKeyGroupCount;
int64_t keyGroupEventTimerCountBefore = restoreEventTimerCount;
int64_t keyGroupProcessingTimerCountBefore = restoreProcessingTimerCount;
for (int32_t i = 0; i < serviceCount; ++i) {
std::string serviceName = in->readUTF();
auto keySerializerSnapshot = FlinkTimerSerializerSnapshots::readVersionedSnapshot(in);
auto namespaceSerializerSnapshot = FlinkTimerSerializerSnapshots::readVersionedSnapshot(in);
switch (inferNamespaceKind(namespaceSerializerSnapshot)) {
case TimerNamespaceKind::INT64:
readTimerServiceForNamespace<int64_t>(
in,
serviceName,
std::move(keySerializerSnapshot),
std::move(namespaceSerializerSnapshot),
keyGroupIdx,
version);
break;
case TimerNamespaceKind::TIME_WINDOW:
readTimerServiceForNamespace<TimeWindow>(
in,
serviceName,
std::move(keySerializerSnapshot),
std::move(namespaceSerializerSnapshot),
keyGroupIdx,
version);
break;
case TimerNamespaceKind::VOID_NAMESPACE:
readTimerServiceForNamespace<VoidNamespace>(
in,
serviceName,
std::move(keySerializerSnapshot),
std::move(namespaceSerializerSnapshot),
keyGroupIdx,
version);
break;
}
}
int64_t keyGroupEventTimerCount = restoreEventTimerCount - keyGroupEventTimerCountBefore;
int64_t keyGroupProcessingTimerCount = restoreProcessingTimerCount - keyGroupProcessingTimerCountBefore;
if (keyGroupEventTimerCount + keyGroupProcessingTimerCount > 0) {
++restoreNonEmptyKeyGroupCount;
}
}
template <typename K>
template <typename N>
void InternalTimeServiceManager<K>::readTimerServiceForNamespace(
RawKeyedStateInputStreamProxy *in,
const std::string &serviceName,
FlinkTimerSerializerSnapshots::SnapshotDescriptor keySerializerSnapshot,
FlinkTimerSerializerSnapshots::SnapshotDescriptor namespaceSerializerSnapshot,
int32_t keyGroupIdx,
int32_t version)
{
auto reader = InternalTimersSnapshotReaderWriters<K, N>::getReaderForVersion(
version,
std::move(keySerializerSnapshot),
std::move(namespaceSerializerSnapshot),
keySerializerForRestore);
InternalTimersSnapshot<K, N> timersSnapshot = reader->readTimersSnapshot(in);
int64_t eventTimerCount = static_cast<int64_t>(timersSnapshot.getEventTimeTimers().size());
int64_t processingTimerCount = static_cast<int64_t>(timersSnapshot.getProcessingTimeTimers().size());
restoreEventTimerCount += eventTimerCount;
restoreProcessingTimerCount += processingTimerCount;
if (eventTimerCount + processingTimerCount > 0) {
++restoreNonEmptyServiceKeyGroupCount;
}
if (TimerConsistencyCheckControl::timerConsistencyCheckEnabled) {
TimerNamespaceKind namespaceKind = []() {
if constexpr (std::is_same_v<N, int64_t>) {
return TimerNamespaceKind::INT64;
} else if constexpr (std::is_same_v<N, TimeWindow>) {
return TimerNamespaceKind::TIME_WINDOW;
} else {
return TimerNamespaceKind::VOID_NAMESPACE;
}
}();
TimerDataDigest keyGroupDigest = buildTimerDataDigest<N>(timersSnapshot);
mergeTimerDataDigest(restoreTimerDataDigests, serviceName, namespaceKind, keyGroupIdx, keyGroupDigest);
}
auto *timerService = registerOrGetTimerServiceForRestore<N>(
serviceName,
timersSnapshot.getKeySerializer(),
timersSnapshot.getNamespaceSerializer());
timerService->restoreTimersForKeyGroup(timersSnapshot, keyGroupIdx);
}
template <typename K>
inline void InternalTimeServiceManager<K>::snapshotToRawKeyedState(
KeyedStateCheckpointOutputStream *stateCheckpointOutputStream,
std::string operatorName,
int64_t checkpointId)
{
if (stateCheckpointOutputStream == nullptr) {
INFO_RELEASE("Error: snapshotToRawKeyedState Raw keyed state output stream is null for operator " << operatorName);
THROW_LOGIC_EXCEPTION("Raw keyed state output stream is null for operator " << operatorName)
}
try {
activeSnapshotCheckpointId = checkpointId;
snapshotEventTimerCount = 0;
snapshotProcessingTimerCount = 0;
snapshotWrittenKeyGroupCount = 0;
snapshotNonEmptyKeyGroupCount = 0;
snapshotNonEmptyServiceKeyGroupCount = 0;
snapshotTimerDataDigests.clear();
for (int32_t keyGroupIdx : stateCheckpointOutputStream->getKeyGroupList()) {
activeSnapshotKeyGroupId = keyGroupIdx;
stateCheckpointOutputStream->startNewKeyGroup(keyGroupIdx);
InternalTimerServiceSerializationProxy<K> proxy(this, keyGroupIdx);
proxy.write(stateCheckpointOutputStream);
}
} catch (const std::exception &e) {
INFO_RELEASE("TIMER_CP_SNAPSHOT_ABORT checkpointId=" << checkpointId
<< ", managerPtr=" << reinterpret_cast<uintptr_t>(this)
<< ", operatorName=" << operatorName
<< ", phase=writeKeyGroups"
<< ", activeKeyGroupId=" << activeSnapshotKeyGroupId
<< ", writtenKeyGroupCount=" << snapshotWrittenKeyGroupCount
<< ", registeredTimerServiceCount=" << getRegisteredTimerServiceCount()
<< ", int64TimerServiceCount=" << timerServicesInt64.size()
<< ", windowTimerServiceCount=" << timerServicesWindow.size()
<< ", voidTimerServiceCount=" << timerServicesVoidNameSpace.size()
<< ", error=" << e.what());
activeSnapshotCheckpointId = -1;
activeSnapshotKeyGroupId = -1;
INFO_RELEASE("Error: snapshotToRawKeyedState Could not write timer service of operator "
<< operatorName << " to raw keyed checkpoint state stream.");
THROW_LOGIC_EXCEPTION("Could not write timer service of operator " << operatorName
<< " to raw keyed checkpoint state stream. Root cause: " << e.what())
}
try {
stateCheckpointOutputStream->close();
} catch (const std::exception &e) {
INFO_RELEASE("TIMER_CP_SNAPSHOT_ABORT checkpointId=" << checkpointId
<< ", managerPtr=" << reinterpret_cast<uintptr_t>(this)
<< ", operatorName=" << operatorName
<< ", phase=closeRawKeyedStateStream"
<< ", activeKeyGroupId=" << activeSnapshotKeyGroupId
<< ", writtenKeyGroupCount=" << snapshotWrittenKeyGroupCount
<< ", registeredTimerServiceCount=" << getRegisteredTimerServiceCount()
<< ", int64TimerServiceCount=" << timerServicesInt64.size()
<< ", windowTimerServiceCount=" << timerServicesWindow.size()
<< ", voidTimerServiceCount=" << timerServicesVoidNameSpace.size()
<< ", error=" << e.what());
activeSnapshotCheckpointId = -1;
activeSnapshotKeyGroupId = -1;
throw;
}
INFO_RELEASE("TIMER_CP_SNAPSHOT_SUMMARY checkpointId=" << checkpointId
<< ", managerPtr=" << reinterpret_cast<uintptr_t>(this)
<< ", operatorName=" << operatorName
<< ", keyGroupCount=" << snapshotWrittenKeyGroupCount
<< ", nonEmptyKeyGroupCount=" << snapshotNonEmptyKeyGroupCount
<< ", registeredTimerServiceCount=" << getRegisteredTimerServiceCount()
<< ", int64TimerServiceCount=" << timerServicesInt64.size()
<< ", windowTimerServiceCount=" << timerServicesWindow.size()
<< ", voidTimerServiceCount=" << timerServicesVoidNameSpace.size()
<< ", nonEmptyServiceKeyGroupCount=" << snapshotNonEmptyServiceKeyGroupCount
<< ", eventTimerCount=" << snapshotEventTimerCount
<< ", processingTimerCount=" << snapshotProcessingTimerCount
<< ", totalTimerCount=" << (snapshotEventTimerCount + snapshotProcessingTimerCount));
if (TimerConsistencyCheckControl::timerConsistencyCheckEnabled) {
logTimerDataDigests("snapshot", operatorName, checkpointId, snapshotTimerDataDigests);
}
activeSnapshotCheckpointId = -1;
activeSnapshotKeyGroupId = -1;
}
template <typename K>
inline int32_t InternalTimeServiceManager<K>::getRegisteredTimerServiceCount() const
{
return static_cast<int32_t>(timerServicesInt64.size()
+ timerServicesWindow.size()
+ timerServicesVoidNameSpace.size());
}
template <typename K>
inline void InternalTimeServiceManager<K>::writeTimersForKeyGroup(
KeyedStateCheckpointOutputStream *out,
int32_t keyGroupIdx)
{
++snapshotWrittenKeyGroupCount;
int64_t keyGroupEventTimerCountBefore = snapshotEventTimerCount;
int64_t keyGroupProcessingTimerCountBefore = snapshotProcessingTimerCount;
out->writeInt(getRegisteredTimerServiceCount());
writeTimerServiceMap<int64_t>(out, timerServicesInt64, keyGroupIdx);
writeTimerServiceMap<TimeWindow>(out, timerServicesWindow, keyGroupIdx);
writeTimerServiceMap<VoidNamespace>(out, timerServicesVoidNameSpace, keyGroupIdx);
int64_t keyGroupEventTimerCount = snapshotEventTimerCount - keyGroupEventTimerCountBefore;
int64_t keyGroupProcessingTimerCount = snapshotProcessingTimerCount - keyGroupProcessingTimerCountBefore;
if (keyGroupEventTimerCount + keyGroupProcessingTimerCount > 0) {
++snapshotNonEmptyKeyGroupCount;
}
}
template <typename K>
template <typename N>
inline void InternalTimeServiceManager<K>::writeTimerServiceMap(
KeyedStateCheckpointOutputStream *out,
emhash7::HashMap<std::string, InternalTimerService<N>*> &services,
int32_t keyGroupIdx)
{
for (auto &entry : services) {
const std::string &serviceName = entry.first;
auto *timerService = static_cast<InternalTimerServiceImpl<K, N>*>(entry.second);
out->writeUTF(serviceName);
auto timersSnapshot = timerService->snapshotTimersForKeyGroup(keyGroupIdx);
int64_t eventTimerCount = static_cast<int64_t>(timersSnapshot.getEventTimeTimers().size());
int64_t processingTimerCount = static_cast<int64_t>(timersSnapshot.getProcessingTimeTimers().size());
snapshotEventTimerCount += eventTimerCount;
snapshotProcessingTimerCount += processingTimerCount;
if (eventTimerCount + processingTimerCount > 0) {
++snapshotNonEmptyServiceKeyGroupCount;
}
if (TimerConsistencyCheckControl::timerConsistencyCheckEnabled) {
TimerNamespaceKind namespaceKind = [&]() {
if constexpr (std::is_same_v<N, int64_t>) {
return TimerNamespaceKind::INT64;
} else if constexpr (std::is_same_v<N, TimeWindow>) {
return TimerNamespaceKind::TIME_WINDOW;
} else {
return TimerNamespaceKind::VOID_NAMESPACE;
}
}();
TimerDataDigest keyGroupDigest = buildTimerDataDigest<N>(timersSnapshot);
mergeTimerDataDigest(snapshotTimerDataDigests, serviceName, namespaceKind, keyGroupIdx, keyGroupDigest);
}
auto writer = InternalTimersSnapshotReaderWriters<K, N>::getWriterForVersion(
InternalTimerServiceSerializationProxy<K>::VERSION,
std::move(timersSnapshot),
timerService->getKeySerializer(),
timerService->getNamespaceSerializer());
writer->writeTimersSnapshot(out);
}
}
template <typename K>
template <typename N>
typename InternalTimeServiceManager<K>::TimerDataDigest InternalTimeServiceManager<K>::buildTimerDataDigest(
const InternalTimersSnapshot<K, N> &timersSnapshot)
{
TimerDataDigest digest;
addTimersToDigest<N>(
digest,
timersSnapshot.getEventTimeTimers(),
timersSnapshot.getKeySerializer(),
timersSnapshot.getNamespaceSerializer(),
true);
addTimersToDigest<N>(
digest,
timersSnapshot.getProcessingTimeTimers(),
timersSnapshot.getKeySerializer(),
timersSnapshot.getNamespaceSerializer(),
false);
return digest;
}
template <typename K>
template <typename N>
void InternalTimeServiceManager<K>::addTimersToDigest(
TimerDataDigest &digest,
const std::vector<typename InternalTimersSnapshot<K, N>::TimerPtr> &timers,
TypeSerializer *keySerializer,
TypeSerializer *namespaceSerializer,
bool eventTimer)
{
for (const auto &timer : timers) {
if (timer == nullptr) {
continue;
}
std::vector<uint8_t> keyBytes = InternalTimeServiceManager<K>::template serializeTimerValueForDigest<K>(
keySerializer, timer->getKey());
std::vector<uint8_t> namespaceBytes = InternalTimeServiceManager<K>::template serializeTimerValueForDigest<N>(
namespaceSerializer, timer->getNamespace());
uint64_t timerHash = hashTimerBytes(eventTimer, timer->getTimestamp(), keyBytes, namespaceBytes);
digest.addTimer(
eventTimer,
timer->getTimestamp(),
timerHash,
hashBytesForDigestLog(keyBytes),
hashBytesForDigestLog(namespaceBytes),
static_cast<int32_t>(keyBytes.size()),
static_cast<int32_t>(namespaceBytes.size()),
bytesPreviewHex(keyBytes, 32),
bytesPreviewHex(namespaceBytes, 32));
}
}
template <typename K>
template <typename T>
std::vector<uint8_t> InternalTimeServiceManager<K>::serializeTimerValueForDigest(TypeSerializer *serializer, T value)
{
if (serializer == nullptr) {
INFO_RELEASE("Error: serializeTimerValueForDigest Timer serializer is null.");
THROW_LOGIC_EXCEPTION("Timer serializer is null while building timer digest.")
}
DataOutputSerializer target(128);
if constexpr (std::is_same_v<T, int64_t>) {
Long boxed(value);
serializer->serialize(&boxed, target);
} else if constexpr (std::is_same_v<T, int32_t>) {
Integer boxed(value);
serializer->serialize(&boxed, target);
} else if constexpr (std::is_same_v<T, Object *>) {
serializer->serialize(value, target);
} else if constexpr (is_shared_ptr_v<T>) {
if (value == nullptr) {
INFO_RELEASE("Error: serializeTimerValueForDigest Timer shared_ptr value is null.");
THROW_LOGIC_EXCEPTION("Timer shared_ptr value is null while building timer digest.")
}
serializer->serialize(value.get(), target);
} else if constexpr (std::is_pointer_v<T>) {
serializer->serialize(static_cast<void *>(value), target);
} else {
T copy = value;
serializer->serialize(static_cast<void *>(©), target);
}
int32_t length = target.getPosition();
if (length <= 0 || target.getData() == nullptr) {
return {};
}
return std::vector<uint8_t>(target.getData(), target.getData() + length);
}
template <typename K>
uint64_t InternalTimeServiceManager<K>::hashTimerBytes(
bool eventTimer,
int64_t timestamp,
const std::vector<uint8_t> &keyBytes,
const std::vector<uint8_t> &namespaceBytes)
{
static constexpr uint64_t FNV_OFFSET_BASIS = 1469598103934665603ULL;
uint64_t hash = FNV_OFFSET_BASIS;
uint8_t queueType = eventTimer ? 1U : 2U;
hash = fnv1aUpdate(hash, &queueType, 1);
hash = fnv1aUpdateInt64(hash, timestamp);
hash = fnv1aUpdateInt32(hash, static_cast<int32_t>(keyBytes.size()));
hash = fnv1aUpdate(hash, keyBytes.data(), keyBytes.size());
hash = fnv1aUpdateInt32(hash, static_cast<int32_t>(namespaceBytes.size()));
hash = fnv1aUpdate(hash, namespaceBytes.data(), namespaceBytes.size());
return hash;
}
template <typename K>
uint64_t InternalTimeServiceManager<K>::hashBytesForDigestLog(const std::vector<uint8_t> &bytes)
{
static constexpr uint64_t FNV_OFFSET_BASIS = 1469598103934665603ULL;
uint64_t hash = FNV_OFFSET_BASIS;
hash = fnv1aUpdateInt32(hash, static_cast<int32_t>(bytes.size()));
return fnv1aUpdate(hash, bytes.data(), bytes.size());
}
template <typename K>
std::string InternalTimeServiceManager<K>::bytesPreviewHex(const std::vector<uint8_t> &bytes, size_t maxBytes)
{
static constexpr char HEX[] = "0123456789abcdef";
size_t previewLength = std::min(bytes.size(), maxBytes);
std::string result;
result.reserve(previewLength * 2);
for (size_t i = 0; i < previewLength; ++i) {
uint8_t value = bytes[i];
result.push_back(HEX[(value >> 4U) & 0x0fU]);
result.push_back(HEX[value & 0x0fU]);
}
return result;
}
template <typename K>
uint64_t InternalTimeServiceManager<K>::fnv1aUpdate(uint64_t hash, const uint8_t *data, size_t length)
{
static constexpr uint64_t FNV_PRIME = 1099511628211ULL;
if (data == nullptr || length == 0) {
return hash;
}
for (size_t i = 0; i < length; ++i) {
hash ^= static_cast<uint64_t>(data[i]);
hash *= FNV_PRIME;
}
return hash;
}
template <typename K>
uint64_t InternalTimeServiceManager<K>::fnv1aUpdateInt64(uint64_t hash, int64_t value)
{
uint64_t unsignedValue = static_cast<uint64_t>(value);
uint8_t bytes[8];
for (int32_t i = 0; i < 8; ++i) {
bytes[i] = static_cast<uint8_t>((unsignedValue >> ((7 - i) * 8)) & 0xffU);
}
return fnv1aUpdate(hash, bytes, sizeof(bytes));
}
template <typename K>
uint64_t InternalTimeServiceManager<K>::fnv1aUpdateInt32(uint64_t hash, int32_t value)
{
uint32_t unsignedValue = static_cast<uint32_t>(value);
uint8_t bytes[4];
for (int32_t i = 0; i < 4; ++i) {
bytes[i] = static_cast<uint8_t>((unsignedValue >> ((3 - i) * 8)) & 0xffU);
}
return fnv1aUpdate(hash, bytes, sizeof(bytes));
}
template <typename K>
std::string InternalTimeServiceManager<K>::namespaceKindName(TimerNamespaceKind namespaceKind)
{
switch (namespaceKind) {
case TimerNamespaceKind::INT64:
return "INT64";
case TimerNamespaceKind::TIME_WINDOW:
return "TIME_WINDOW";
case TimerNamespaceKind::VOID_NAMESPACE:
return "VOID_NAMESPACE";
default:
return "UNKNOWN";
}
}
template <typename K>
std::string InternalTimeServiceManager<K>::digestMapKey(
const std::string &serviceName,
TimerNamespaceKind namespaceKind)
{
return namespaceKindName(namespaceKind) + "|" + serviceName;
}
template <typename K>
int64_t InternalTimeServiceManager<K>::printableMinTimestamp(int64_t count, int64_t value)
{
return count > 0 ? value : -1;
}
template <typename K>
int64_t InternalTimeServiceManager<K>::printableMaxTimestamp(int64_t count, int64_t value)
{
return count > 0 ? value : -1;
}
template <typename K>
void InternalTimeServiceManager<K>::mergeTimerDataDigest(
std::unordered_map<std::string, TimerDataDigest> &target,
const std::string &serviceName,
TimerNamespaceKind namespaceKind,
int32_t keyGroupIdx,
const TimerDataDigest &keyGroupDigest)
{
target[digestMapKey(serviceName, namespaceKind)].mergeKeyGroupDigest(keyGroupIdx, keyGroupDigest);
}
template <typename K>
void InternalTimeServiceManager<K>::logTimerDataDigest(
const char *phase,
const std::string &operatorName,
int64_t checkpointId,
const std::string &serviceName,
TimerNamespaceKind namespaceKind,
const TimerDataDigest &digest)
{
INFO_RELEASE("TIMER_CP_HEAP_PQ_DATA_DIGEST phase=" << phase
<< ", checkpointId=" << checkpointId
<< ", backend=ROCKSDB_KEYED"
<< ", pqStorage=HEAP"
<< ", managerPtr=" << reinterpret_cast<uintptr_t>(this)
<< ", operatorName=" << operatorName
<< ", serviceName=" << serviceName
<< ", namespaceKind=" << namespaceKindName(namespaceKind)
<< ", keyGroupCount=" << digest.keyGroupCount
<< ", nonEmptyKeyGroupCount=" << digest.nonEmptyKeyGroupCount
<< ", eventTimerCount=" << digest.eventTimerCount
<< ", processingTimerCount=" << digest.processingTimerCount
<< ", totalTimerCount=" << digest.totalTimerCount()
<< ", eventXorHash=" << digest.eventXorHash
<< ", eventSumHash=" << digest.eventSumHash
<< ", processingXorHash=" << digest.processingXorHash
<< ", processingSumHash=" << digest.processingSumHash
<< ", keyGroupXorHash=" << digest.keyGroupXorHash
<< ", keyGroupSumHash=" << digest.keyGroupSumHash
<< ", eventMinTimestamp=" << printableMinTimestamp(digest.eventTimerCount, digest.eventMinTimestamp)
<< ", eventMaxTimestamp=" << printableMaxTimestamp(digest.eventTimerCount, digest.eventMaxTimestamp)
<< ", processingMinTimestamp="
<< printableMinTimestamp(digest.processingTimerCount, digest.processingMinTimestamp)
<< ", processingMaxTimestamp="
<< printableMaxTimestamp(digest.processingTimerCount, digest.processingMaxTimestamp));
logTimerDataSample(phase, operatorName, checkpointId, serviceName, namespaceKindName(namespaceKind), digest);
}
template <typename K>
void InternalTimeServiceManager<K>::logTimerDataDigests(
const char *phase,
const std::string &operatorName,
int64_t checkpointId,
const std::unordered_map<std::string, TimerDataDigest> &digests)
{
for (const auto &entry : digests) {
const std::string &key = entry.first;
size_t separator = key.find('|');
std::string namespaceKind = separator == std::string::npos ? "UNKNOWN" : key.substr(0, separator);
std::string serviceName = separator == std::string::npos ? key : key.substr(separator + 1);
const TimerDataDigest &digest = entry.second;
INFO_RELEASE("TIMER_CP_HEAP_PQ_DATA_DIGEST phase=" << phase
<< ", checkpointId=" << checkpointId
<< ", backend=ROCKSDB_KEYED"
<< ", pqStorage=HEAP"
<< ", managerPtr=" << reinterpret_cast<uintptr_t>(this)
<< ", operatorName=" << operatorName
<< ", serviceName=" << serviceName
<< ", namespaceKind=" << namespaceKind
<< ", keyGroupCount=" << digest.keyGroupCount
<< ", nonEmptyKeyGroupCount=" << digest.nonEmptyKeyGroupCount
<< ", eventTimerCount=" << digest.eventTimerCount
<< ", processingTimerCount=" << digest.processingTimerCount
<< ", totalTimerCount=" << digest.totalTimerCount()
<< ", eventXorHash=" << digest.eventXorHash
<< ", eventSumHash=" << digest.eventSumHash
<< ", processingXorHash=" << digest.processingXorHash
<< ", processingSumHash=" << digest.processingSumHash
<< ", keyGroupXorHash=" << digest.keyGroupXorHash
<< ", keyGroupSumHash=" << digest.keyGroupSumHash
<< ", eventMinTimestamp=" << printableMinTimestamp(digest.eventTimerCount, digest.eventMinTimestamp)
<< ", eventMaxTimestamp=" << printableMaxTimestamp(digest.eventTimerCount, digest.eventMaxTimestamp)
<< ", processingMinTimestamp="
<< printableMinTimestamp(digest.processingTimerCount, digest.processingMinTimestamp)
<< ", processingMaxTimestamp="
<< printableMaxTimestamp(digest.processingTimerCount, digest.processingMaxTimestamp));
logTimerDataSample(phase, operatorName, checkpointId, serviceName, namespaceKind, digest);
}
}
template <typename K>
void InternalTimeServiceManager<K>::logTimerDataSample(
const char *phase,
const std::string &operatorName,
int64_t checkpointId,
const std::string &serviceName,
const std::string &namespaceKind,
const TimerDataDigest &digest)
{
if (!digest.sample.present) {
return;
}
INFO_RELEASE("TIMER_CP_HEAP_PQ_DATA_SAMPLE phase=" << phase
<< ", checkpointId=" << checkpointId
<< ", backend=ROCKSDB_KEYED"
<< ", pqStorage=HEAP"
<< ", managerPtr=" << reinterpret_cast<uintptr_t>(this)
<< ", operatorName=" << operatorName
<< ", serviceName=" << serviceName
<< ", namespaceKind=" << namespaceKind
<< ", keyGroupId=" << digest.sample.keyGroupId
<< ", queue=" << (digest.sample.eventTimer ? "event" : "processing")
<< ", timestamp=" << digest.sample.timestamp
<< ", timerHash=" << digest.sample.timerHash
<< ", sampleRankHash=" << digest.sample.sampleRankHash
<< ", keyHash=" << digest.sample.keyHash
<< ", namespaceHash=" << digest.sample.namespaceHash
<< ", keyBytesLen=" << digest.sample.keyBytesLength
<< ", namespaceBytesLen=" << digest.sample.namespaceBytesLength
<< ", keyPreviewHex=" << digest.sample.keyPreviewHex
<< ", namespacePreviewHex=" << digest.sample.namespacePreviewHex);
}