* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*
* Modify this part of the code to implement native execution of Flink operators.
*/
#ifndef COPYONWRITESTATEMAP_H
#define COPYONWRITESTATEMAP_H
under random hashCodes, the frequency of nodes in bins follows a Poisson
distribution(http://en.wikipedia.org/wiki/Poisson_distribution) with a parameter of about 0.5
on average for the default resizing threshold of 0.75, although with a large variance because
of resizing granularity. Ignoring variance, the expected occurrences of list size k are
(exp(-0.5) * pow(0.5, k)/factorial(k)). The first values are:
0: 0.60653066
1: 0.30326533
2: 0.07581633
3: 0.01263606
4: 0.00157952
5: 0.00015795
6: 0.00001316
7: 0.00000094
8: 0.00000006
============== buckets size ration ========
1 1543981 0.36884964|0.36787944 36.885
2 768655 0.36725597|0.36787944 73.611
3 256236 0.18364065|0.18393972 91.975
4 64126 0.06127757|0.06131324 98.102
5 12907 0.01541710|0.01532831 99.644
6 2050 0.00293841|0.00306566 99.938
7 310 0.00051840|0.00051094 99.990
8 49 0.00009365|0.00007299 99.999
9 4 0.00000860|0.00000913 100.000
========== collision miss ration ===========
_num_filled aver_size k.v size_kv = 4185936, 1.58, x.x 24
collision,possion,cache_miss hit_find|hit_miss, load_factor = 36.73%,36.74%,31.31% 1.50|2.00, 1.00
============== buckets size ration ========
*******************************************************/
#pragma once
#include <cstring>
#include <string>
#include <cmath>
#include <cstdlib>
#include <type_traits>
#include <cassert>
#include <utility>
#include <cstdint>
#include <functional>
#include <iterator>
#include <algorithm>
#include <emhash7.hpp>
#include "StateMap.h"
#include "core/typeutils/TypeSerializer.h"
#include "../internal/InternalKvState.h"
#include "table/data/binary/BinaryRowData.h"
namespace cowstatemap_detail {
template<typename T> struct IsEmhashMapPtrType : std::false_type {};
template<typename UK, typename UV>
struct IsEmhashMapPtrType<emhash7::HashMap<UK, UV>*> : std::true_type {};
template<typename T> struct HasObjectKeyInMap : std::false_type {};
template<typename UV>
struct HasObjectKeyInMap<emhash7::HashMap<Object*, UV>*> : std::true_type {};
template<typename T> struct HasObjectValueInMap : std::false_type {};
template<typename UK>
struct HasObjectValueInMap<emhash7::HashMap<UK, Object*>*> : std::true_type {};
* Clean up Object* entries inside an inner emhash7::HashMap before deleting it.
* This handles the case where HeapMapState<K, N, Object*, Object*>'s inner
* HashMap is destroyed — the emhash7 destructor doesn't call putRefCount.
*/
template<typename V>
inline void cleanupInnerMapAndDelete(V mapPtr)
{
if (mapPtr == nullptr) return;
if constexpr (IsEmhashMapPtrType<V>::value) {
if constexpr (HasObjectKeyInMap<V>::value || HasObjectValueInMap<V>::value) {
for (auto &entry : *mapPtr) {
if constexpr (HasObjectKeyInMap<V>::value) {
Object *key = static_cast<Object*>(entry.first);
if (key != nullptr) {
key->putRefCount();
}
}
if constexpr (HasObjectValueInMap<V>::value) {
Object *value = static_cast<Object*>(entry.second);
if (value != nullptr) {
value->putRefCount();
}
}
}
mapPtr->clear();
}
}
delete mapPtr;
}
}
#if EMH_WY_HASH
#include "wyhash.h"
#endif
#ifdef EMH_NEW
#undef EMH_KEY
#undef EMH_VAL
#undef EMH_PKV
#undef EMH_NEW
#undef EMH_SET
#undef EMH_BUCKET
#undef EMH_EMPTY
#undef EMH_MASK
#endif
#if (__GNUC__ >= 4 || __clang__)
# define EMH_LIKELY(condition) __builtin_expect(condition, 1)
# define EMH_UNLIKELY(condition) __builtin_expect(condition, 0)
#else
# define EMH_LIKELY(condition) condition
# define EMH_UNLIKELY(condition) condition
#endif
#define EMH_KEY(p, n) p[n].first
#define EMH_VAL(p, n) p[n].second
#define EMH_NMSPACE(p, n) p[n].third
#define EMH_BUCKET(p, n) p[n].bucket
#define EMH_PKV(p, n) p[n]
#define EMH_NEW(key, val, nmspace, bucket, version) \
new(_pairs + (bucket)) PairT(key, val, nmspace, bucket, version, version); _num_filled ++; EMH_SET(bucket)
#ifndef EMH_MASK
#define EMH_MASK(n) uint8_t(1 << (n % MASK_BIT))
#endif
#ifndef EMH_SET
#define EMH_SET(n) _bitmask[n / MASK_BIT] &= ~(EMH_MASK(n))
#endif
#ifndef EMH_CLS
#define EMH_CLS(n) _bitmask[n / MASK_BIT] |= EMH_MASK(n)
#endif
#ifndef EMH_EMPTY
#define EMH_EMPTY(n) (_bitmask[n / MASK_BIT] & (EMH_MASK(n))) != 0
#endif
namespace omnistream {
#ifdef EMH_SIZE_TYPE_16BIT
static constexpr size_type INACTIVE = 0xFFFF;
#elif EMH_SIZE_TYPE_64BIT
static constexpr size_type INACTIVE = 0 - 0x1ull;
#else
static constexpr size_type INACTIVE = -1u;
#endif
#ifndef EMH_MALIGN
static constexpr uint32_t EMH_MALIGN = 16;
#endif
static_assert(EMH_MALIGN >= 16 && 0 == (EMH_MALIGN & (EMH_MALIGN - 1)));
#ifndef EMH_SIZE_TYPE_16BIT
static_assert((int)INACTIVE < 0, "INACTIVE must negative (to int)");
#endif
static inline size_type CTZ(size_t n)
{
#if defined(__x86_64__) || (__BYTE_ORDER__ && __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__)
#elif __BIG_ENDIAN__ || (__BYTE_ORDER__ && __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__)
n = __builtin_bswap64(n);
#else
static uint32_t endianness = 0x12345678;
const auto is_big = *(const char *)&endianness == 0x12;
if (is_big)
n = __builtin_bswap64(n);
#endif
#if defined (__LP64__) || (SIZE_MAX == UINT64_MAX) || defined (__x86_64__)
auto index = __builtin_ctzll(n);
#elif 1
auto index = __builtin_ctzl(n);
#else
#if defined (__LP64__) || (SIZE_MAX == UINT64_MAX) || defined (__x86_64__)
size_type index;
__asm__("bsfq %1, %0\n" : "=r" (index) : "rm" (n) : "cc");
#else
size_type index;
__asm__("bsf %1, %0\n" : "=r" (index) : "rm" (n) : "cc");
#endif
#endif
return (size_type)index;
}
template <typename K, typename N, typename S>
struct CopyOnWriteMapEntry {
CopyOnWriteMapEntry(const CopyOnWriteMapEntry& rhs, int entryV)
: second(rhs.second), first(rhs.first), third(rhs.third)
{
bucket = rhs.bucket;
stateVersion = rhs.stateVersion;
entryVersion = entryV;
}
CopyOnWriteMapEntry(CopyOnWriteMapEntry&& rhs, int entryV) noexcept
: second(std::move(rhs.second)), first(std::move(rhs.first)), third(std::move(rhs.third))
{
bucket = rhs.bucket;
stateVersion = rhs.stateVersion;
entryVersion = entryV;
}
CopyOnWriteMapEntry(const K& key, const S& val, const N& nmspace, size_type ibucket, int stateV, int entryV)
: second(val), first(key), third(nmspace)
{
bucket = ibucket;
stateVersion = stateV;
entryVersion = entryV;
}
CopyOnWriteMapEntry(K&& key, S&& val, N&& nmspace, size_type ibucket, int stateV, int entryV)
: second(std::move(val)), first(std::move(key)), third(std::move(nmspace))
{
bucket = ibucket;
stateVersion = stateV;
entryVersion = entryV;
}
void setState(S state, int mapVersion)
{
if (state != second) {
second = state;
stateVersion = mapVersion;
}
}
bool equals(const CopyOnWriteMapEntry<K, N, S>& rhs)
{
return second == rhs.second && first == rhs.first && third == rhs.third;
}
void swap(CopyOnWriteMapEntry<K, N, S> &o)
{
std::swap(second, o.second);
std::swap(first, o.first);
std::swap(third, o.third);
}
#if EMH_ORDER_KV || EMH_SIZE_TYPE_64BIT
K first;
size_type bucket;
S second;
#else
S second;
size_type bucket;
K first;
#endif
N third;
int entryVersion;
int stateVersion;
};
struct CombineHash {
template<typename K, typename N>
size_t operator()(const K &_key, const N &_nmspace) const
{
return std::hash<K>()(_key) ^ std::hash<N>()(_nmspace);
}
};
struct CombineEqual {
template<typename K, typename N>
size_t operator()(const K &_key1, const K &_key2, const N &_nmspace1, const N &_nmspace2) const
{
if constexpr (std::is_pointer_v<K>) {
if (_key1 == nullptr && _key2 == nullptr) {
return std::equal_to<N>{}(_nmspace1, _nmspace2);
}else if (_key1 == nullptr || _key2 == nullptr){
return false;
}
}
return std::equal_to<K>{}(_key1, _key2) && std::equal_to<N>{}(_nmspace1, _nmspace2);
}
};
template<typename T>
struct NeedClone {
static constexpr bool value = false;
};
template<>
struct NeedClone<Object*> {
static constexpr bool value = true;
};
template<>
struct NeedClone<BinaryRowData*> {
static constexpr bool value = true;
};
template<>
struct NeedClone<RowData*> {
static constexpr bool value = true;
};
template <typename KeyT, typename N, typename ValueT, typename HashT = CombineHash, typename EqT = CombineEqual>
class CopyOnWriteStateMap : public StateMap<KeyT, N, ValueT> {
#ifndef EMH_DEFAULT_LOAD_FACTOR
constexpr static float EMH_DEFAULT_LOAD_FACTOR = 0.80f;
#endif
constexpr static float EMH_MIN_LOAD_FACTOR = 0.25f;
public:
typedef CopyOnWriteStateMap<KeyT, N, ValueT> htype;
typedef CopyOnWriteMapEntry<KeyT, N, ValueT> value_pair;
typedef CopyOnWriteMapEntry<KeyT, N, ValueT> PairT;
CopyOnWriteStateMap(TypeSerializer* serializer, size_type bucket = 2, float mlf = EMH_DEFAULT_LOAD_FACTOR) noexcept
{
stateSerializer = serializer;
init(bucket, mlf);
}
class iterator : public InternalKvState<KeyT, N, ValueT>::StateIncrementalVisitor {
public:
typedef std::forward_iterator_tag iterator_category;
typedef std::ptrdiff_t difference_type;
typedef value_pair value_type;
typedef value_pair* pointer;
typedef value_pair& reference;
iterator() = default;
iterator(const iterator &it) : _map(it._map), _bucket(it._bucket), _from(it._from), _bmask(it._bmask) {}
#if EMH_ITER_SAFE
iterator(const htype* hash_map, size_type bucket) : _map(hash_map), _bucket(bucket) { init(); }
#else
iterator(const htype* hash_map, size_type bucket) : _map(hash_map), _bucket(bucket), _bmask(0) { _from = size_type(-1); }
#endif
bool hasNext() override
{
return _bucket != _map->_num_buckets;
}
ValueT nextEntries() override
{
auto& val = _map->EMH_VAL(_pairs, _bucket);
this->operator++();
return val;
}
void init()
{
_from = (_bucket / SIZE_BIT) * SIZE_BIT;
if (_bucket < _map->bucket_count()) {
_bmask = *(size_t*)((size_t*)_map->_bitmask + _from / SIZE_BIT);
_bmask |= (1ull << _bucket % SIZE_BIT) - 1;
_bmask = ~_bmask;
} else {
_bmask = 0;
}
}
size_type bucket() const
{
return _bucket;
}
void clear(size_type bucket)
{
if (_bucket / SIZE_BIT == bucket / SIZE_BIT)
_bmask &= ~(1ull << (bucket % SIZE_BIT));
}
iterator& next()
{
goto_next_element();
return *this;
}
iterator& operator++()
{
#ifndef EMH_ITER_SAFE
if (_from == (size_type)-1) init();
#endif
_bmask &= _bmask - 1;
goto_next_element();
return *this;
}
iterator operator++(int)
{
#ifndef EMH_ITER_SAFE
if (_from == (size_type)-1) init();
#endif
iterator old = *this;
_bmask &= _bmask - 1;
goto_next_element();
return old;
}
reference operator*() const
{
return _map->EMH_PKV(_pairs, _bucket);
}
pointer operator->() const
{
return &(_map->EMH_PKV(_pairs, _bucket));
}
bool operator==(const iterator& rhs) const { return _bucket == rhs._bucket; }
bool operator!=(const iterator& rhs) const { return _bucket != rhs._bucket; }
public:
const htype* _map;
size_type _bucket;
size_type _from;
size_t _bmask;
private:
void goto_next_element()
{
if (_bmask != 0) {
_bucket = _from + CTZ(_bmask);
return;
}
do {
_bmask = ~*(size_t*)((size_t*)_map->_bitmask + (_from += SIZE_BIT) / SIZE_BIT);
} while (_bmask == 0);
_bucket = _from + CTZ(_bmask);
}
};
iterator* getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) override
{
iterator* it = new iterator(this->begin());
return it;
}
inline size_type size() const { return _num_filled; }
inline bool empty() const { return _num_filled == 0; }
ValueT get(const KeyT& key, const N& nmspace) noexcept override
{
const auto bucket = find_filled_bucket(key, nmspace);
if constexpr(std::is_pointer<ValueT>::value) {
return bucket == _num_buckets ? nullptr : EMH_VAL(_pairs, bucket);
} else {
return bucket == _num_buckets ? std::numeric_limits<ValueT>::max() : EMH_VAL(_pairs, bucket);
}
}
ValueT get(const KeyT& key, const N& nmspace) const noexcept override
{
const auto bucket = find_filled_bucket(key, nmspace);
if constexpr(std::is_pointer<ValueT>::value) {
return bucket == _num_buckets ? nullptr : EMH_VAL(_pairs, bucket);
} else {
return bucket == _num_buckets ? std::numeric_limits<ValueT>::max() : EMH_VAL(_pairs, bucket);
}
}
void put(const KeyT& key, const N& nmspace, const ValueT& value) noexcept override
{
check_expand_need();
bool isempty;
const auto bucket = find_or_allocate(key, nmspace, isempty);
if (isempty) {
if constexpr (NeedClone<KeyT>::value) {
if constexpr (std::is_same<KeyT, Object*>::value) {
Object *cloned = (key != nullptr) ? reinterpret_cast<Object *>(key)->clone() : nullptr;
EMH_NEW(cloned, std::move(ValueT()), nmspace, bucket, stateMapVersion);
}
if constexpr (std::is_same<KeyT, BinaryRowData*>::value || std::is_same<KeyT, RowData*>::value) {
BinaryRowData *cloned = (key != nullptr) ? reinterpret_cast<BinaryRowData*>
(reinterpret_cast<BinaryRowData *>(key)->copy()) : nullptr;
EMH_NEW(cloned, std::move(ValueT()), nmspace, bucket, stateMapVersion);
}
} else {
EMH_NEW(key, std::move(ValueT()), nmspace, bucket, stateMapVersion);
}
}
if constexpr (std::is_same_v<ValueT, Object*>) {
Object *oldValue = static_cast<Object*>(EMH_VAL(_pairs, bucket));
Object *newValue = static_cast<Object*>(value);
if (newValue != nullptr) {
newValue->getRefCount();
}
EMH_VAL(_pairs, bucket) = value;
if (oldValue != nullptr) {
oldValue->putRefCount();
}
} else {
EMH_VAL(_pairs, bucket) = value;
}
}
void put(KeyT&& key, N&& nmspace, ValueT&& value) noexcept override
{
check_expand_need();
bool isempty;
const auto bucket = find_or_allocate(key, nmspace, isempty);
if (isempty) {
if constexpr (NeedClone<KeyT>::value) {
if constexpr (std::is_same<KeyT, Object*>::value) {
Object *cloned = (key != nullptr) ? reinterpret_cast<Object *>(key)->clone() : nullptr;
EMH_NEW(cloned, std::move(ValueT()), std::move(nmspace), bucket, stateMapVersion);
}
if constexpr (std::is_same<KeyT, BinaryRowData*>::value || std::is_same<KeyT, RowData*>::value) {
BinaryRowData *cloned = (key != nullptr) ? reinterpret_cast<BinaryRowData*>
(reinterpret_cast<BinaryRowData *>(key)->copy()) : nullptr;
EMH_NEW(cloned, std::move(ValueT()), std::move(nmspace), bucket, stateMapVersion);
}
} else {
EMH_NEW(std::move(key), std::move(ValueT()), std::move(nmspace), bucket, stateMapVersion);
}
}
if constexpr (std::is_same_v<ValueT, Object*>) {
Object *oldValue = static_cast<Object*>(EMH_VAL(_pairs, bucket));
Object *newValue = static_cast<Object*>(value);
if (newValue != nullptr) {
newValue->getRefCount();
}
EMH_VAL(_pairs, bucket) = std::move(value);
if (oldValue != nullptr) {
oldValue->putRefCount();
}
} else {
EMH_VAL(_pairs, bucket) = std::move(value);
}
}
ValueT putAndGetOld(const KeyT &key, const N &nameSpace, const ValueT &state) override
{
const auto bucket = find_filled_bucket(key, nameSpace);
if (bucket == _num_buckets) {
put(key, nameSpace, state);
if constexpr (std::is_pointer_v<ValueT>) {
return nullptr;
} else {
return {};
}
}
ValueT oldValue = EMH_VAL(_pairs, bucket);
put(key, nameSpace, state);
return oldValue;
}
ValueT removeAndGetOld(const KeyT &key, const N &nameSpace) override
{
const auto bucket = find_filled_bucket(key, nameSpace);
if (bucket == _num_buckets) {
if constexpr (std::is_pointer_v<ValueT>) {
return nullptr;
} else {
return {};
}
}
KeyT savedKey = EMH_KEY(_pairs, bucket);
ValueT savedValue = EMH_VAL(_pairs, bucket);
const auto erased = erase_key(key, nameSpace);
clear_bucket(erased);
if constexpr (std::is_same_v<KeyT, Object*>) {
if (savedKey != nullptr) {
static_cast<Object*>(savedKey)->putRefCount();
}
} else if constexpr (NeedClone<KeyT>::value && std::is_pointer_v<KeyT>) {
delete savedKey;
}
return savedValue;
}
std::pair<iterator, bool> do_insert(KeyT&& key, N&& nmspace, ValueT&& defaultV)
{
bool isempty;
const auto bucket = find_or_allocate(key, nmspace, isempty);
if (isempty) {
EMH_NEW(std::forward<KeyT>(key), std::forward<ValueT>(defaultV), std::forward<N>(nmspace), bucket, stateMapVersion);
}
return { {this, bucket}, isempty };
}
void remove(const KeyT& key, const N& nmspace) override
{
const auto bucket = find_filled_bucket(key, nmspace);
if (bucket == _num_buckets) {
return;
}
KeyT savedKey = EMH_KEY(_pairs, bucket);
ValueT savedValue = EMH_VAL(_pairs, bucket);
const auto erased = erase_key(key, nmspace);
clear_bucket(erased);
cleanupSavedRefs(savedKey, savedValue);
}
size_type erase(const KeyT& key, const N& nmspace)
{
const auto bucket = find_filled_bucket(key, nmspace);
if (bucket == _num_buckets)
return 0;
KeyT savedKey = EMH_KEY(_pairs, bucket);
ValueT savedValue = EMH_VAL(_pairs, bucket);
const auto erased = erase_key(key, nmspace);
if (erased == INACTIVE)
return 0;
clear_bucket(erased);
cleanupSavedRefs(savedKey, savedValue);
return 1;
}
inline bool containsKey(const KeyT& key, const N& nmspace) const noexcept override
{
return find_filled_bucket(key, nmspace) != _num_buckets;
}
inline size_type count(const KeyT& key, const N& nmspace) const noexcept
{
return find_filled_bucket(key, nmspace) != _num_buckets ? 1u : 0u;
}
inline iterator find(const KeyT& key, const N& nmspace, size_t key_hash) noexcept
{
return {this, find_filled_hash(key, nmspace, key_hash)};
}
inline iterator find(const KeyT& key, const N& nmspace) noexcept
{
return {this, find_filled_bucket(key, nmspace)};
}
~CopyOnWriteStateMap() noexcept
{
if (is_triviall_destructable() && _num_filled) {
for (auto it = begin(); _num_filled; ++it) {
_num_filled --;
it->~value_pair();
}
} else {
for (auto it = begin(); _num_filled; ++it) {
_num_filled --;
if constexpr (std::is_same_v<KeyT, Object*>) {
static_cast<Object*>(it->first)->putRefCount();
} else if constexpr (std::is_pointer_v<KeyT>) {
delete it->first;
}
if constexpr (std::is_same_v<ValueT, Object*>) {
static_cast<Object*>(it->second)->putRefCount();
} else if constexpr (cowstatemap_detail::IsEmhashMapPtrType<ValueT>::value) {
cowstatemap_detail::cleanupInnerMapAndDelete(it->second);
} else if constexpr (std::is_pointer_v<ValueT>) {
delete it->second;
}
if constexpr (std::is_pointer_v<N>) {
delete it->third;
}
}
}
free(_pairs);
_pairs = nullptr;
}
iterator begin() noexcept
{
#ifdef EMH_ZERO_MOVE
if (0 == _num_filled)
return {this, _num_buckets};
#endif
const auto bmask = ~(*(size_t*)_bitmask);
if (bmask != 0)
return {this, (size_type)CTZ(bmask)};
iterator it(this, sizeof(bmask) * 8 - 1);
it.init();
return it.next();
}
iterator last() const
{
if (_num_filled == 0)
return end();
auto bucket = _num_buckets - 1;
while (EMH_EMPTY(bucket)) bucket--;
return {this, bucket};
}
inline iterator end() noexcept { return {this, _num_buckets}; }
inline size_type bucket_count() const { return _num_buckets; }
private:
void init(size_type bucket, float mlf = EMH_DEFAULT_LOAD_FACTOR)
{
_pairs = nullptr;
_bitmask = nullptr;
_num_buckets = _num_filled = 0;
_mlf = (uint32_t)((1 << 27) / EMH_DEFAULT_LOAD_FACTOR);
max_load_factor(mlf);
rehash(bucket);
}
static size_t AllocSize(uint64_t num_buckets)
{
return (num_buckets + EPACK_SIZE) * sizeof(PairT) + (num_buckets + 7) / 8 + BIT_PACK;
}
static PairT* alloc_bucket(size_type num_buckets)
{
#ifdef EMH_ALLOC
auto* new_pairs = (PairT*)aligned_alloc(EMH_MALIGN, AllocSize(num_buckets));
#else
auto* new_pairs = (PairT*)malloc(AllocSize(num_buckets));
#endif
return new_pairs;
}
inline void max_load_factor(float mlf)
{
if (mlf <= 0.999f && mlf > EMH_MIN_LOAD_FACTOR)
_mlf = (uint32_t)((1 << 28) / mlf);
}
inline constexpr float max_load_factor() const { return (1 << 27) / (float)_mlf; }
inline constexpr size_type max_size() const { return 1ull << (sizeof(size_type) * 8 - 1); }
size_type bucket_main() const
{
auto main_size = 0;
for (size_type bucket = 0; bucket < _num_buckets; ++bucket) {
if (EMH_BUCKET(_pairs, bucket) == bucket)
main_size ++;
}
return main_size;
}
static constexpr bool is_triviall_destructable()
{
#if __cplusplus >= 201402L || _MSC_VER > 1600
return !(std::is_trivially_destructible<KeyT>::value && std::is_trivially_destructible<ValueT>::value);
#else
return !(std::is_pod<KeyT>::value && std::is_pod<ValueT>::value);
#endif
}
static constexpr bool is_copy_trivially()
{
#if __cplusplus >= 201402L || _MSC_VER > 1600
return (std::is_trivially_copyable<KeyT>::value && std::is_trivially_copyable<ValueT>::value);
#else
return (std::is_pod<KeyT>::value && std::is_pod<ValueT>::value);
#endif
}
size_type erase_key(const KeyT &key, const N &nmspace)
{
const auto bucket = hash_key(key, nmspace) & _mask;
if (EMH_EMPTY(bucket))
return INACTIVE;
auto next_bucket = EMH_BUCKET(_pairs, bucket);
const auto eqkey = _eq(key, EMH_KEY(_pairs, bucket), nmspace, EMH_NMSPACE(_pairs, bucket));
if (eqkey) {
if (next_bucket == bucket)
return bucket;
const auto nbucket = EMH_BUCKET(_pairs, next_bucket);
if (is_copy_trivially())
EMH_PKV(_pairs, bucket) = EMH_PKV(_pairs, next_bucket);
else
EMH_PKV(_pairs, bucket).swap(EMH_PKV(_pairs, next_bucket));
EMH_BUCKET(_pairs, bucket) = (nbucket == next_bucket) ? bucket : nbucket;
return next_bucket;
} else if (next_bucket == bucket) {
return INACTIVE;
}
auto prev_bucket = bucket;
while (true) {
const auto nbucket = EMH_BUCKET(_pairs, next_bucket);
if (_eq(key, EMH_KEY(_pairs, next_bucket), nmspace, EMH_NMSPACE(_pairs, next_bucket))) {
EMH_BUCKET(_pairs, prev_bucket) = (nbucket == next_bucket) ? prev_bucket : nbucket;
return next_bucket;
}
if (nbucket == next_bucket) {
break;
}
prev_bucket = next_bucket;
next_bucket = nbucket;
}
return INACTIVE;
}
void clearkv()
{
if (is_triviall_destructable()) {
auto it = begin();
it.init();
for (; _num_filled; ++it)
clear_bucket(it.bucket());
}
}
void clear()
{
if (!is_triviall_destructable() && _num_filled) {
memset_s(_bitmask, (_num_buckets + 7) / 8, (int) 0xFFFFFFFF, (_num_buckets + 7) / 8);
if (_num_buckets < 8) _bitmask[0] = uint8_t((1 << _num_buckets) - 1);
} else if (_num_filled) {
clearkv();
}
_num_filled = 0;
}
void shrink_to_fit()
{
rehash(_num_filled + 1);
}
bool reserve(uint64_t num_elems)
{
const auto required_buckets = (num_elems * _mlf >> 28);
if (EMH_LIKELY(required_buckets < _num_buckets))
return false;
#if EMH_HIGH_LOAD
if (required_buckets < 64 && _num_filled < _num_buckets)
return false;
#endif
#if EMH_STATIS
if (_num_filled > EMH_STATIS) dump_statics(true);
#endif
rehash(required_buckets + 2);
return true;
}
void rehash(uint64_t required_buckets)
{
if (required_buckets < _num_filled)
return;
uint64_t buckets = _num_filled > (1u << 16) ? (1u << 16) : 2u;
while (buckets < required_buckets) { buckets *= 2; }
if (buckets >= max_size() || buckets <= _num_filled) {
throw std::runtime_error("CopyOnWriteStateMap too big");
}
auto num_buckets = (size_type)buckets;
auto old_num_filled = _num_filled;
auto old_mask = _num_buckets - 1;
auto old_pairs = _pairs;
auto* obmask = _bitmask;
_num_filled = 0;
_num_buckets = num_buckets;
_mask = num_buckets - 1;
_pairs = alloc_bucket(_num_buckets);
memset_s((char*)(_pairs + _num_buckets), sizeof(PairT) * EPACK_SIZE, 0, sizeof(PairT) * EPACK_SIZE);
_bitmask = decltype(_bitmask)(_pairs + EPACK_SIZE + num_buckets);
const auto mask_byte = (num_buckets + 7) / 8;
memset_s(_bitmask, mask_byte, (int)0xFFFFFFFF, mask_byte);
memset_s(((char*)_bitmask) + mask_byte, BIT_PACK, 0, BIT_PACK);
if (num_buckets < 8)
_bitmask[0] = (uint8_t)((1 << num_buckets) - 1);
for (size_type src_bucket = old_mask; _num_filled < old_num_filled; src_bucket --) {
if (obmask[src_bucket / MASK_BIT] & (EMH_MASK(src_bucket)))
continue;
auto& key = EMH_KEY(old_pairs, src_bucket);
auto& nmspace = EMH_NMSPACE(old_pairs, src_bucket);
const auto bucket = find_unique_bucket(key, nmspace);
EMH_NEW(std::move(key), std::move(EMH_VAL(old_pairs, src_bucket)),
std::move(EMH_NMSPACE(old_pairs, src_bucket)), bucket, stateMapVersion);
if (is_triviall_destructable())
old_pairs[src_bucket].~PairT();
}
#if EMH_REHASH_LOG
if (_num_filled > EMH_REHASH_LOG) {
auto mbucket = bucket_main();
char buff[255] = {0};
sprintf_s(buff, " _num_filled/collision/main/K.V/pack/ = %u/%.2lf%%(%.2lf%%)/%s.%s/%zd",
_num_filled, 200.0f * (_num_filled - mbucket) / _mask, 100.0f * mbucket / _mask,
typeid(KeyT).name(), typeid(ValueT).name(), sizeof(_pairs[0]));
#ifdef EMH_LOG
static size_t ihashs = 0;
EMH_LOG << "rhash_nums = " << ihashs ++ << "|" <<__FUNCTION__ << "|" << buff << endl;
#else
puts(buff);
#endif
}
#endif
free(old_pairs);
if (old_num_filled != _num_filled) {
throw std::runtime_error("CopyOnWriteStateMap rehash failed");
}
}
private:
inline bool check_expand_need()
{
return reserve(_num_filled);
}
void clear_bucket(size_type bucket)
{
EMH_CLS(bucket);
_num_filled--;
if (is_triviall_destructable())
_pairs[bucket].~PairT();
}
* Clean up saved key/value refs AFTER the entry has been erased from the table.
* This handles refcounting for Object* and ownership for cloned pointer keys.
* For inner HashMap values (e.g. emhash7::HashMap<Object*, Object*>*), only
* deletes the HashMap structure — inner entry refcounting is HeapMapState's
* responsibility (its clear()/remove() already handle that before calling
* stateTable->remove()). cleanupInnerMapAndDelete is only used in the
* destructor for final shutdown cleanup.
*/
void cleanupSavedRefs(const KeyT &savedKey, const ValueT &savedValue)
{
if constexpr (std::is_same_v<KeyT, Object*>) {
if (savedKey != nullptr) {
static_cast<Object*>(savedKey)->putRefCount();
}
} else if constexpr (NeedClone<KeyT>::value && std::is_pointer_v<KeyT>) {
delete savedKey;
}
if constexpr (std::is_same_v<ValueT, Object*>) {
if (savedValue != nullptr) {
static_cast<Object*>(savedValue)->putRefCount();
}
} else if constexpr (std::is_pointer_v<ValueT>) {
delete savedValue;
}
}
size_type find_filled_hash(const KeyT& key, const N& nmspace, const size_t key_hash) const
{
const auto bucket = key_hash & _mask;
if (EMH_EMPTY(bucket))
return _num_buckets;
auto next_bucket = bucket;
while (true) {
if (_eq(key, EMH_KEY(_pairs, next_bucket), nmspace, EMH_NMSPACE(_pairs, next_bucket)))
return next_bucket;
const auto nbucket = EMH_BUCKET(_pairs, next_bucket);
if (nbucket == next_bucket)
break;
next_bucket = nbucket;
}
return _num_buckets;
}
size_type find_filled_bucket(const KeyT& key, const N& nmspace) const
{
const auto bucket = hash_key(key, nmspace) & _mask;
if (EMH_EMPTY(bucket))
return _num_buckets;
auto next_bucket = bucket;
while (true) {
if (_eq(key, EMH_KEY(_pairs, next_bucket), nmspace, EMH_NMSPACE(_pairs, next_bucket)))
return next_bucket;
const auto nbucket = EMH_BUCKET(_pairs, next_bucket);
if (nbucket == next_bucket)
return _num_buckets;
next_bucket = nbucket;
}
return 0;
}
size_type kickout_bucket(const size_type kmain, const size_type kbucket)
{
const auto next_bucket = EMH_BUCKET(_pairs, kbucket);
const auto new_bucket = find_empty_bucket(next_bucket, kbucket);
const auto prev_bucket = find_prev_bucket(kmain, kbucket);
new(_pairs + new_bucket) PairT(std::move(_pairs[kbucket]));
if (is_triviall_destructable())
_pairs[kbucket].~PairT();
if (next_bucket == kbucket)
EMH_BUCKET(_pairs, new_bucket) = new_bucket;
EMH_BUCKET(_pairs, prev_bucket) = new_bucket;
EMH_SET(new_bucket);
return kbucket;
}
** inserts a new key into a hash table; first check whether key's main
** bucket/position is free. If not, check whether colliding node/bucket is in its main
** position or not: if it is not, move colliding bucket to an empty place and
** put new key in its main position; otherwise (colliding bucket is in its main
** position), new key goes to an empty position. ***/
size_type find_or_allocate(const KeyT& key, const N& nmspace, bool& isempty)
{
const auto bucket = hash_key(key, nmspace) & _mask;
const auto& bucket_key = EMH_KEY(_pairs, bucket);
const auto& bucket_nmspace = EMH_NMSPACE(_pairs, bucket);
if (EMH_EMPTY(bucket)) {
isempty = true;
return bucket;
} else if (_eq(key, bucket_key, nmspace, bucket_nmspace)) {
isempty = false;
return bucket;
}
isempty = true;
auto next_bucket = EMH_BUCKET(_pairs, bucket);
const auto kmain_bucket = hash_key(bucket_key, bucket_nmspace) & _mask;
if (kmain_bucket != bucket)
return kickout_bucket(kmain_bucket, bucket);
else if (next_bucket == bucket)
return EMH_BUCKET(_pairs, next_bucket) = find_empty_bucket(next_bucket, bucket);
#if EMH_LRU_SET
auto prev_bucket = bucket;
#endif
while (true) {
if (EMH_UNLIKELY(_eq(key, EMH_KEY(_pairs, next_bucket), nmspace, EMH_NMSPACE(_pairs, next_bucket)))) {
isempty = false;
#if EMH_LRU_SET
EMH_PKV(_pairs, next_bucket).swap(EMH_PKV(_pairs, prev_bucket));
return prev_bucket;
#else
return next_bucket;
#endif
}
#if EMH_LRU_SET
prev_bucket = next_bucket;
#endif
const auto nbucket = EMH_BUCKET(_pairs, next_bucket);
if (nbucket == next_bucket)
break;
next_bucket = nbucket;
}
const auto new_bucket = find_empty_bucket(next_bucket, bucket);
return EMH_BUCKET(_pairs, next_bucket) = new_bucket;
}
size_type find_empty_bucket(const size_type bucket_from, const size_type main_bucket)
{
#if EMH_ITER_SAFE
const auto boset = bucket_from % 8;
auto* const align = (uint8_t*)_bitmask + bucket_from / 8;
(void)main_bucket;
size_t bmask;
memcpy_s(&bmask, align + 0, sizeof(bmask));
bmask >>= boset;
if (EMH_LIKELY(bmask != 0))
return bucket_from + CTZ(bmask);
#else
const auto boset = bucket_from % 8;
auto* const align = (uint8_t*)_bitmask + bucket_from / 8;
(void)bucket_from;
const size_t bmask = (*(size_t*)(align) >> boset);
if (EMH_LIKELY(bmask != 0))
return bucket_from + CTZ(bmask);
#endif
const auto qmask = _mask / SIZE_BIT;
auto& last = EMH_BUCKET(_pairs, _num_buckets);
for (; ;) {
last &= qmask;
const auto bmask2 = *((size_t*)_bitmask + last);
if (bmask2 != 0)
return last * SIZE_BIT + CTZ(bmask2);
#if 1
const auto next1 = (qmask / 2 + last) & qmask;
const auto bmask1 = *((size_t*)_bitmask + next1);
if (bmask1 != 0) {
last = next1;
return next1 * SIZE_BIT + CTZ(bmask1);
}
last += 1;
#endif
}
}
size_type find_last_bucket(size_type main_bucket) const
{
auto next_bucket = EMH_BUCKET(_pairs, main_bucket);
if (next_bucket == main_bucket)
return main_bucket;
while (true) {
const auto nbucket = EMH_BUCKET(_pairs, next_bucket);
if (nbucket == next_bucket)
return next_bucket;
next_bucket = nbucket;
}
}
size_type find_prev_bucket(size_type main_bucket, const size_type bucket) const
{
auto next_bucket = EMH_BUCKET(_pairs, main_bucket);
if (next_bucket == bucket)
return main_bucket;
while (true) {
const auto nbucket = EMH_BUCKET(_pairs, next_bucket);
if (nbucket == bucket)
return next_bucket;
next_bucket = nbucket;
}
}
size_type find_unique_bucket(const KeyT& key, const N& nmspace)
{
const size_type bucket = hash_key(key, nmspace) & _mask;
if (EMH_EMPTY(bucket))
return bucket;
const auto kmain_bucket = hash_key(EMH_KEY(_pairs, bucket), EMH_NMSPACE(_pairs, bucket)) & _mask;
if (EMH_UNLIKELY(kmain_bucket != bucket))
return kickout_bucket(kmain_bucket, bucket);
auto next_bucket = EMH_BUCKET(_pairs, bucket);
if (next_bucket != bucket)
next_bucket = find_last_bucket(next_bucket);
return EMH_BUCKET(_pairs, next_bucket) = find_empty_bucket(next_bucket, bucket);
}
template<typename UType, typename std::enable_if<std::is_integral<UType>::value, size_type>::type = 0>
inline size_type hash_key(const UType key, const N nmspace) const
{
#if EMH_INT_HASH
return hash64(key);
#elif EMH_IDENTITY_HASH
return key + (key >> 24);
#else
return (size_type)_hasher(key, nmspace);
#endif
}
template<typename UType, typename std::enable_if<std::is_same<UType, std::string>::value, size_type>::type = 0>
inline size_type hash_key(const UType& key, const N nmspace) const
{
#if EMH_WY_HASH
return wyhash(key.data(), key.size(), 0);
#else
return (size_type)_hasher(key, nmspace);
#endif
}
template<typename UType, typename std::enable_if<!std::is_integral<UType>::value && !std::is_same<UType, std::string>::value, size_type>::type = 0>
inline size_type hash_key(const UType& key, const N& nmspace) const
{
return (size_type)_hasher(key, nmspace);
}
private:
uint8_t* _bitmask;
PairT* _pairs;
HashT _hasher;
EqT _eq;
size_type _mask;
size_type _num_buckets;
size_type _num_filled;
uint32_t _mlf;
private:
int stateMapVersion = 0;
TypeSerializer* stateSerializer;
private:
static constexpr uint32_t BIT_PACK = sizeof(uint64_t);
static constexpr uint32_t MASK_BIT = sizeof(_bitmask[0]) * 8;
static constexpr uint32_t SIZE_BIT = sizeof(size_t) * 8;
static constexpr uint32_t EPACK_SIZE = sizeof(PairT) >= sizeof(size_t) == 0 ? 1 : 2;
};
}
#if __cplusplus >= 201103L
#endif
#endif