* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef FLINK_TNEL_ABSTRACTKEYEDSTATEBACKEND_H
#define FLINK_TNEL_ABSTRACTKEYEDSTATEBACKEND_H
#include <emhash7.hpp>
#include <emhash7.hpp>
#include "InternalKeyContext.h"
#include "core/typeutils/TypeSerializer.h"
#include "core/api/common/state/StateDescriptor.h"
#include "heap/StateTable.h"
#include "KeyedStateBackend.h"
#include "state/ttl/TtlStateFactory.h"
#include "CheckpointableKeyedStateBackend.h"
#include "runtime/checkpoint/SnapshotType.h"
#include "runtime/checkpoint/CheckpointListener.h"
template <typename K>
class AbstractKeyedStateBackend : public CheckpointableKeyedStateBackend<K>, public CheckpointListener {
public:
~AbstractKeyedStateBackend()
{
LOG("AbstractKeyedStateBackend");
delete keySerializer;
keySerializer = nullptr;
}
AbstractKeyedStateBackend(TypeSerializer *keySerializer, InternalKeyContext<K> *context) :context(context), keySerializer(keySerializer) {};
void setCurrentKey(K newKey);
K getCurrentKey();
* N: The data type that the serializer serializes.
* S: The type of the State objects created from this {@code StateDescriptor}.
* V: The type of the value of the state object described by this state descriptor.
* */
template<typename N, typename S, typename V>
S *getOrCreateKeyedState(TypeSerializer *namespaceSerializer, StateDescriptor *stateDescriptor);
TypeSerializer *getKeySerializer()
{
return keySerializer;
};
template<typename N, typename S, typename V>
S *getPartitionedState(N nameSpace, TypeSerializer *namespaceSerializer, StateDescriptor *stateDescriptor);
KeyGroupRange *getKeyGroupRange() override;
void dispose() override;
virtual bool requiresLegacySynchronousTimerSnapshots(SnapshotType *checkpointType)
{
return false;
}
protected:
InternalKeyContext<K> *context;
TypeSerializer * keySerializer = nullptr;
std::string lastName;
uintptr_t lastState;
emhash7::HashMap<std::string, uintptr_t> keyValueStatesByName;
};
template <typename K>
void AbstractKeyedStateBackend<K>::setCurrentKey(K newKey)
{
context->setCurrentKey(newKey);
}
template <typename K>
inline K AbstractKeyedStateBackend<K>::getCurrentKey()
{
return this->context->getCurrentKey();
}
template <typename K>
inline KeyGroupRange *AbstractKeyedStateBackend<K>::getKeyGroupRange()
{
return context->getKeyGroupRange();
}
template <typename K>
inline void AbstractKeyedStateBackend<K>::dispose()
{
lastName = "";
lastState = 0;
keyValueStatesByName.clear();
}
template <typename K>
template<typename N, typename S, typename V>
S *AbstractKeyedStateBackend<K>::getOrCreateKeyedState(TypeSerializer *namespaceSerializer, StateDescriptor *stateDescriptor)
{
S *kvState;
auto it = keyValueStatesByName.find(stateDescriptor->getName());
if (it == keyValueStatesByName.end()) {
kvState = TtlStateFactory<K, N, S, V>::createStateAndWrapWithTtlIfEnabled(
namespaceSerializer, stateDescriptor, this);
keyValueStatesByName[stateDescriptor->getName()] = reinterpret_cast<uintptr_t>(kvState);
} else {
kvState = reinterpret_cast<S *>(it->second);
}
return kvState;
}
template <typename K>
template<typename N, typename S, typename V>
S *AbstractKeyedStateBackend<K>::getPartitionedState(N nameSpace, TypeSerializer *namespaceSerializer, StateDescriptor *stateDescriptor)
{
std::string name = stateDescriptor->getName();
if (!lastName.empty() && lastName == name) {
S *lastStateAsS = reinterpret_cast<S *>(lastState);
lastStateAsS->setCurrentNamespace(nameSpace);
return lastStateAsS;
}
uintptr_t previous;
auto it = keyValueStatesByName.find(name);
if (it != keyValueStatesByName.end()) {
previous = it->second;
lastState = previous;
reinterpret_cast<S *>(lastState)->setCurrentNamespace(nameSpace);
lastName = name;
return reinterpret_cast<S *>(previous);
}
S *state = getOrCreateKeyedState<N, S, V>(namespaceSerializer, stateDescriptor);
S *kvState = reinterpret_cast<S *>(state);
lastName = name;
lastState = reinterpret_cast<uintptr_t>(state);
kvState->setCurrentNamespace(nameSpace);
return state;
}
#endif