* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#pragma once
#include <unordered_map>
#include "core/typeutils/TypeSerializer.h"
#include "../AbstractKeyedStateBackend.h"
#include "RocksdbStateTable.h"
#include "state/RocksDbKvStateInfo.h"
#include "runtime/state/rocksdb/falcon/ValueStateFalconKey.h"
#include "runtime/state/rocksdb/falcon/FalconValue.h"
#include "runtime/state/rocksdb/falcon/ValueStateCache.h"
#include "runtime/state/internal/InternalValueState.h"
const float CACHE_SIZE_UPPER_LIMIT = 1.2;
template <typename K, typename N, typename V>
class ValueStateLRUCache;
template <typename K, typename N, typename V>
class RocksdbValueState : public InternalValueState<K, N, V> {
public:
TypeSerializer* getKeySerializer()
{
return keySerializer;
};
TypeSerializer* getNamespaceSerializer()
{
return namespaceSerializer;
};
TypeSerializer* getValueSerializer()
{
return valueSerializer;
};
void setNamespaceSerializer(TypeSerializer* serializer)
{
namespaceSerializer = serializer;
};
void setValueSerializer(TypeSerializer* serializer)
{
valueSerializer = serializer;
};
void setCurrentNamespace(N nameSpace) override
{
currentNamespace = nameSpace;
};
V value() override;
void update(const V& value, bool copyKey = false) override;
void setDefaultValue(V value)
{
defaultValue = value;
};
static RocksdbValueState<K, N, V>* create(
StateDescriptor* stateDesc, RocksdbStateTable<K, N, V>* stateTable, TypeSerializer* keySerializer);
static RocksdbValueState<K, N, V>* update(
StateDescriptor* stateDesc, RocksdbStateTable<K, N, V>* stateTable, RocksdbValueState<K, N, V>* existingState);
RocksdbValueState(
RocksdbStateTable<K, N, V>* stateTable,
TypeSerializer* keySerializer,
TypeSerializer* valueSerializer,
TypeSerializer* namespaceSerializer,
V defaultValue);
~RocksdbValueState()
{
delete stateCache;
stateCache = nullptr;
};
void createTable(
ROCKSDB_NAMESPACE::DB* db,
std::string cfName,
std::unordered_map<std::string, std::shared_ptr<RocksDbKvStateInfo>>* kvStateInformation);
void clear() override;
void addVectorBatch(omnistream::VectorBatch* vectorBatch) override;
omnistream::VectorBatch* getVectorBatch(int batchId) override;
long getVectorBatchesSize() override;
void updateByBatch(std::unordered_map<K, V>& pendingUpdates);
void clearVectors(int64_t currentTimestamp) override;
void clearVectors(std::vector<size_t>& indicesToDelete) override;
V getDefaultValue()
{
return defaultValue;
};
K getCurrentKey()
{
return stateTable->getCurrentKey();
}
N getNamespace()
{
return currentNamespace;
}
void setKeyAndNamespace(K key, N ns)
{
stateTable->setCurrentKey(key);
setCurrentNamespace(ns);
}
V getValue();
void writeValue(const V& value);
void deleteValue();
bool isFalconEnabled() const;
ValueStateLRUCache<K, N, V>* stateCache;
private:
RocksdbStateTable<K, N, V>* stateTable;
TypeSerializer* keySerializer;
TypeSerializer* valueSerializer;
TypeSerializer* namespaceSerializer;
V defaultValue;
N currentNamespace;
};
* ValueStateCache implementation using LRU method, which is bind with a deterministic ColumnFamily.
*
* <p> For states(FalconKey-FalconValue pairs) corresponding to this columnFamily, we use a LinkedHashMap to store them,
* and hot state are moved to the head of link head, while cold state are moved to the tail of link. Using this data
* structure, read、write、hot state insertion and cold state elimination are all O(1) time complexity.
*
* @param [K] Type of ValueState's key
* @param [N] Type of ValueState's namespace
* @param [V] Type of ValueState's value
*/
template <typename K, typename N, typename V>
class ValueStateLRUCache : public ValueStateCache<K, N, V> {
public:
using KeyType = ValueStateFalconKey<K, N>;
using ValType = FalconValue<V>;
explicit ValueStateLRUCache(RocksdbValueState<K, N, V>* dbAccessor)
{
cacheSizeLimit = 0;
valueState = dbAccessor;
}
~ValueStateLRUCache()
{
clearAll();
}
V get(const K& key, const N& ns) override
{
auto falconKey = new KeyType(key, ns);
auto it = cache.find(falconKey);
if (it != cache.end()) {
V value = it->second->getValue();
if constexpr (std::is_same_v<V, Object*>) {
if (value != nullptr) {
reinterpret_cast<Object*>(value)->getRefCount();
}
}
delete falconKey;
return value;
} else {
V value = valueState->getValue();
V defaultValue = valueState->getDefaultValue();
if constexpr (std::is_pointer_v<V>) {
if (value == nullptr) {
delete falconKey;
return value;
}
if (value != nullptr && defaultValue != nullptr) {
if constexpr (std::is_same_v<V, Object*>) {
if (reinterpret_cast<Object*>(value)->equals(defaultValue)) {
delete falconKey;
return value;
}
} else if constexpr (std::is_same_v<V, std::vector<long>*>) {
if (*value == *defaultValue) {
delete falconKey;
return value;
}
} else {
if (value == defaultValue) {
delete falconKey;
return value;
}
}
}
} else {
if (value == defaultValue) {
delete falconKey;
return value;
}
}
auto falconValue = new ValType(value, false);
cache.emplace(falconKey, falconValue);
if (cache.size() > cacheSizeLimit) {
removeEldestState();
}
return value;
}
}
void put(const K& key, const N& ns, const V& value) override
{
auto falconKey = new KeyType(key, ns);
auto falconValue = new ValType(value, false);
auto it = cache.find(falconKey);
if (it != cache.end()) {
delete it->first;
delete it->second;
cache.erase(it);
falconValue->markAsDirty();
cache.emplace(falconKey, falconValue);
} else {
valueState->writeValue(value);
cache.emplace(falconKey, falconValue);
if (cache.size() > cacheSizeLimit) {
removeEldestState();
}
}
}
void remove(const K& key, const N& ns) override
{
auto falconKey = new KeyType(key, ns);
auto it = cache.find(falconKey);
if (it != cache.end()) {
delete it->first;
delete it->second;
cache.erase(it);
}
delete falconKey;
valueState->deleteValue();
}
void removeEldestState() override
{
flush();
clearAll();
}
void flush() override
{
K currentKey = valueState->getCurrentKey();
N currentNamespace = valueState->getNamespace();
if constexpr (std::is_same_v<K, Object*>) {
if (currentKey != nullptr) {
reinterpret_cast<Object*>(currentKey)->getRefCount();
}
}
for (auto& [falconKey, falconValue] : cache) {
if (falconValue->isDirty()) {
valueState->setKeyAndNamespace(falconKey->getKey(), falconKey->getNamespace());
valueState->writeValue(falconValue->getValue());
falconValue->markAsClean();
}
}
if constexpr (std::is_pointer_v<K>) {
if (currentKey == nullptr) {
return;
}
}
if constexpr (std::is_pointer_v<N>) {
if (currentNamespace == nullptr) {
return;
}
}
valueState->setKeyAndNamespace(currentKey, currentNamespace);
if constexpr (std::is_same_v<K, Object*>) {
reinterpret_cast<Object*>(currentKey)->putRefCount();
}
}
void clearAll() override
{
for (auto& [falconKey, falconValue] : cache) {
delete falconKey;
delete falconValue;
}
cache.clear();
}
[[nodiscard]] int getSizeLimit() const override
{
return cacheSizeLimit;
}
void updateSizeLimit(int newSizeLimit) override
{
cacheSizeLimit = newSizeLimit;
if (cache.size() > cacheSizeLimit) {
flush();
clearAll();
}
}
private:
int cacheSizeLimit{};
RocksdbValueState<K, N, V>* valueState = {};
std::unordered_map<KeyType*, ValType*> cache = {};
};
template <typename K, typename N, typename V>
bool RocksdbValueState<K, N, V>::isFalconEnabled() const
{
return stateCache != nullptr && stateCache->getSizeLimit() != 0;
}
template <typename K, typename N, typename V>
V RocksdbValueState<K, N, V>::getValue()
{
auto result = stateTable->get(currentNamespace);
if constexpr (std::is_pointer<V>::value) {
if (result == nullptr) {
return defaultValue;
}
if constexpr (std::is_base_of_v<RowData, std::remove_pointer_t<V>>) {
auto* br = dynamic_cast<BinaryRowData*>(result);
if (br == nullptr) {
return result;
}
auto* copied = br->copy();
return static_cast<V>(copied);
} else {
return result;
}
} else {
return result == std::numeric_limits<V>::max() ? defaultValue : result;
}
}
template <typename K, typename N, typename V>
void RocksdbValueState<K, N, V>::writeValue(const V& value)
{
if constexpr (std::is_pointer_v<V>) {
if (value == nullptr) {
clear();
return;
}
}
stateTable->put(currentNamespace, value);
}
template <typename K, typename N, typename V>
void RocksdbValueState<K, N, V>::deleteValue()
{
stateTable->clear(currentNamespace);
}
template <typename K, typename N, typename V>
RocksdbValueState<K, N, V>* RocksdbValueState<K, N, V>::create(
StateDescriptor* stateDesc, RocksdbStateTable<K, N, V>* stateTable, TypeSerializer* keySerializer)
{
return new RocksdbValueState<K, N, V>(
stateTable, keySerializer, stateTable->getStateSerializer(), stateTable->getNamespaceSerializer(), V());
}
template <typename K, typename N, typename V>
RocksdbValueState<K, N, V>::RocksdbValueState(
RocksdbStateTable<K, N, V>* stateTable,
TypeSerializer* keySerializer,
TypeSerializer* valueSerializer,
TypeSerializer* namespaceSerializer,
V defaultValue)
: stateTable(stateTable),
keySerializer(keySerializer),
valueSerializer(valueSerializer),
namespaceSerializer(namespaceSerializer),
defaultValue(defaultValue)
{
stateCache = new ValueStateLRUCache(this);
}
template <typename K, typename N, typename V>
void RocksdbValueState<K, N, V>::createTable(
ROCKSDB_NAMESPACE::DB* db,
std::string cfName,
std::unordered_map<std::string, std::shared_ptr<RocksDbKvStateInfo>>* kvStateInformation)
{
stateTable->createTable(db, cfName, kvStateInformation);
}
template <typename K, typename N, typename V>
V RocksdbValueState<K, N, V>::value()
{
if (stateCache->getSizeLimit() == 0) {
return getValue();
} else {
return stateCache->get(getCurrentKey(), getNamespace());
}
}
template <typename K, typename N, typename V>
void RocksdbValueState<K, N, V>::update(const V& value, bool copyKey)
{
if (stateCache->getSizeLimit() == 0) {
writeValue(value);
} else {
if constexpr (std::is_pointer_v<V>) {
if (value == nullptr) {
stateCache->remove(getCurrentKey(), getNamespace());
return;
}
}
stateCache->put(getCurrentKey(), getNamespace(), value);
}
}
template <typename K, typename N, typename V>
RocksdbValueState<K, N, V>* RocksdbValueState<K, N, V>::update(
StateDescriptor* stateDesc, RocksdbStateTable<K, N, V>* stateTable, RocksdbValueState<K, N, V>* existingState)
{
existingState->setNamespaceSerializer(stateTable->getNamespaceSerializer());
existingState->setValueSerializer(stateTable->getStateSerializer());
return existingState;
}
template <typename K, typename N, typename V>
void RocksdbValueState<K, N, V>::addVectorBatch(omnistream::VectorBatch* vectorBatch)
{
stateTable->addVectorBatch(vectorBatch);
};
template <typename K, typename N, typename V>
omnistream::VectorBatch* RocksdbValueState<K, N, V>::getVectorBatch(int batchId)
{
return stateTable->getVectorBatch(batchId);
};
template <typename K, typename N, typename V>
long RocksdbValueState<K, N, V>::getVectorBatchesSize()
{
return stateTable->getVectorBatchesSize();
};
template <typename K, typename N, typename V>
void RocksdbValueState<K, N, V>::clear()
{
if (stateCache->getSizeLimit() == 0) {
deleteValue();
} else {
stateCache->remove(getCurrentKey(), getNamespace());
}
}
template <typename K, typename N, typename V>
void RocksdbValueState<K, N, V>::updateByBatch(std::unordered_map<K, V>& pendingUpdates)
{
stateTable->putByBatch(currentNamespace, pendingUpdates);
}
template <typename K, typename N, typename V>
void RocksdbValueState<K, N, V>::clearVectors(int64_t currentTimestamp)
{
return stateTable->clearVectors(currentTimestamp);
}
template <typename K, typename N, typename V>
void RocksdbValueState<K, N, V>::clearVectors(std::vector<size_t>& indicesToDelete)
{
return stateTable->clearVectors(indicesToDelete);
}