* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* ogsql_vm_hash_table.c
*
*
* IDENTIFICATION
* src/ogsql/executor/ogsql_vm_hash_table.c
*
* -------------------------------------------------------------------------
*/
#include "ogsql_vm_hash_table.h"
#define MAX_HASH_BUCKET_SIZE (uint32)16777216
#define MIN_HASH_BUCKET_SIZE (uint32)16384
#define HASH_FILL_FACTOR (float)0.75
#define HASH_BUCKET_NODE_SIZE (uint32)(sizeof(hash_entry_t))
#define HASH_SEGMENT (seg)
#define HASH_SESSION(seg) ((seg)->sess)
#define HASH_POOL(seg) ((seg)->pool)
#define HASH_PM_POOL(seg) ((seg)->pm_pool)
#define HASH_PAGE_LIST(seg) (&(seg)->vm_list)
#define HASH_LAST_PAGE(seg) ((seg)->last_page)
#define HASH_LAST_PAGE_USED(seg) ((seg)->last_page_used)
#define HASH_BUCKETS_PER_PAGE (OG_VMEM_PAGE_SIZE / HASH_BUCKET_NODE_SIZE)
#define HASH_BUCKETS_LEFT ((OG_VMEM_PAGE_SIZE - HASH_LAST_PAGE_USED(seg)) / HASH_BUCKET_NODE_SIZE)
#define HASH_TABLE_ENTRY_VMID (&hash_table->self.page)
#define HASH_KEY_SCAN_BUF(scan_assit) ((scan_assit)->buf)
#define HASH_KEY_SCAN_SIZE(scan_assit) ((scan_assit)->size)
#define HASH_CALLBACK_CTX(hash_table) ((hash_table)->callback_ctx)
static const page_entry_t g_invalid_page_entry = {
.page_id = OG_INVALID_INT32,
.pm_flag = OG_TRUE
};
#define IS_INVALID_PAGE_ENTRY(entry) ((entry)->page_id == OG_INVALID_INT32)
static inline status_t vm_open_page(hash_segment_t *seg, uint32 page_id, char **buf)
{
vm_page_t *page = NULL;
if (vm_open(HASH_SESSION(seg), HASH_POOL(seg), page_id, &page) != OG_SUCCESS) {
return OG_ERROR;
}
(*buf) = page->data;
return OG_SUCCESS;
}
static inline void vm_close_page(hash_segment_t *seg, uint32 page_id)
{
vm_close(HASH_SESSION(seg), HASH_POOL(seg), page_id, VM_ENQUE_TAIL);
}
static inline status_t pm_open_page(hash_segment_t *seg, uint32 page_id, char **buf)
{
return pm_open(HASH_PM_POOL(seg), page_id, buf);
}
static inline void pm_close_page(hash_segment_t *seg, uint32 page_id) {}
typedef status_t (*page_open_t)(hash_segment_t *seg, uint32 page_id, char **buf);
typedef void (*page_close_t)(hash_segment_t *seg, uint32 page_id);
typedef struct st_page_func {
page_open_t open;
page_close_t close;
} page_func_t;
static page_func_t g_page_funcs[] = {
{ vm_open_page, vm_close_page },
{ pm_open_page, pm_close_page },
};
#define VM_OPEN(seg, entry, buf) (&g_page_funcs[(entry)->pm_flag])->open(seg, (entry)->page_id, buf)
#define VM_CLOSE(seg, entry) (&g_page_funcs[(entry)->pm_flag])->close(seg, (entry)->page_id)
#define VM_CLOSE_TWO_PAGES(first, second) \
do { \
VM_CLOSE(HASH_SEGMENT, (first)); \
VM_CLOSE(HASH_SEGMENT, (second)); \
} while (0)
#define VM_CLOSE_THREE_PAGES(first, second, third) \
do { \
VM_CLOSE(HASH_SEGMENT, (first)); \
VM_CLOSE(HASH_SEGMENT, (second)); \
VM_CLOSE(HASH_SEGMENT, (third)); \
} while (0)
status_t vm_hash_open_page(hash_segment_t *seg, page_entry_t *entry, char **page_buf)
{
return VM_OPEN(seg, entry, page_buf);
}
void vm_hash_close_page(hash_segment_t *seg, page_entry_t *entry)
{
VM_CLOSE(seg, entry);
}
static inline status_t vm_hash_table_get_head(hash_table_t **hash_table, hash_segment_t *seg, hash_table_entry_t *table)
{
char *page_buf = NULL;
OG_RETURN_IFERR(VM_OPEN(HASH_SEGMENT, &table->page, &page_buf));
*hash_table = (hash_table_t *)(page_buf + table->offset);
return OG_SUCCESS;
}
static inline status_t vm_hash_table_get_node(hash_node_t **hash_node, hash_entry_t *node_entry, hash_segment_t *seg)
{
char *page_buf = NULL;
if (VM_OPEN(HASH_SEGMENT, &node_entry->page, &page_buf) != OG_SUCCESS) {
return OG_ERROR;
}
*hash_node = (hash_node_t *)(page_buf + node_entry->offset);
return OG_SUCCESS;
}
static inline status_t vm_hash_get_hash_table(hash_table_t **hash_table, hash_segment_t *seg, hash_table_entry_t *table,
hash_table_iter_t *iter)
{
if (iter->hash_table == NULL) {
OG_RETURN_IFERR(vm_hash_table_get_head(&(iter->hash_table), seg, table));
}
*hash_table = iter->hash_table;
return OG_SUCCESS;
}
static inline status_t vm_hash_alloc_page(hash_segment_t *seg, page_entry_t *page_entry)
{
vm_page_t *page = NULL;
uint32 page_id;
if (HASH_PM_POOL(seg) != NULL && pm_alloc(HASH_PM_POOL(seg), &page_id) == OG_SUCCESS) {
page_entry->page_id = page_id;
page_entry->pm_flag = OG_TRUE;
return OG_SUCCESS;
}
OG_RETURN_IFERR(vm_alloc_and_append(HASH_SESSION(seg), HASH_POOL(seg), HASH_PAGE_LIST(seg)));
page_entry->page_id = HASH_PAGE_LIST(seg)->last;
page_entry->pm_flag = 0;
if (HASH_PAGE_LIST(seg)->count <= seg->pages_hold) {
return vm_open(HASH_SESSION(seg), HASH_POOL(seg), HASH_PAGE_LIST(seg)->last, &page);
}
return OG_SUCCESS;
}
void vm_hash_segment_init(handle_t sess, vm_pool_t *pool, hash_segment_t *segment, pma_t *pma, uint32 pages_hold,
uint64 max_size)
{
segment->sess = sess;
segment->pool = pool;
segment->pages_hold = pages_hold;
segment->last_page_used = OG_VMEM_PAGE_SIZE;
segment->last_page = g_invalid_page_entry;
segment->vm_list.count = 0;
if (pm_create_pool(pma, max_size, &segment->pm_pool) != OG_SUCCESS) {
cm_reset_error();
segment->pm_pool = NULL;
}
}
void vm_hash_segment_deinit(hash_segment_t *segment)
{
uint32 loop;
vm_ctrl_t *ctrl = NULL;
uint32 curr_id;
uint32 next_id;
curr_id = segment->vm_list.first;
for (loop = 0; loop < segment->vm_list.count; ++loop) {
ctrl = vm_get_ctrl(segment->pool, curr_id);
next_id = ctrl->next;
vm_free(segment->sess, segment->pool, curr_id);
curr_id = next_id;
}
segment->last_page_used = OG_VMEM_PAGE_SIZE;
segment->last_page = g_invalid_page_entry;
segment->vm_list.count = 0;
pm_release_pool(segment->pm_pool);
segment->pm_pool = NULL;
}
static inline uint32 vm_hash_table_get_bucket_size(uint32 bucket_num)
{
uint32 bucket_size;
if (bucket_num >= MAX_HASH_BUCKET_SIZE) {
return MAX_HASH_BUCKET_SIZE;
}
bucket_size = MIN_HASH_BUCKET_SIZE;
while (bucket_size < bucket_num) {
bucket_size <<= 1;
}
return bucket_size;
}
static inline uint32 vm_hash_table_calc_bucket(hash_table_t *hash_table, uint32 hash_val)
{
uint32 bucket_size = hash_val & hash_table->high_mask;
if (bucket_size > hash_table->max_bucket) {
bucket_size &= hash_table->low_mask;
}
return bucket_size;
}
static status_t vm_hash_table_set_page_entries(hash_table_t *hash_table, hash_segment_t *seg)
{
errno_t ret_memset;
char *page_buf = NULL;
page_entry_t page_entry;
uint32 loop;
uint32 bucket_pages;
uint32 entry_num = hash_table->nentries;
uint32 buckets_size = HASH_BUCKET_NODE_SIZE * hash_table->bucket_num;
bucket_pages = CM_ALIGN_CEIL(buckets_size, OG_VMEM_PAGE_SIZE);
for (loop = 0; loop < bucket_pages; ++loop) {
if (vm_hash_alloc_page(seg, &page_entry) != OG_SUCCESS) {
return OG_ERROR;
}
hash_table->page_entries[entry_num++] = page_entry;
if (VM_OPEN(HASH_SEGMENT, &page_entry, &page_buf) != OG_SUCCESS) {
return OG_ERROR;
}
ret_memset = memset_sp(page_buf, OG_VMEM_PAGE_SIZE, 0xFF, OG_VMEM_PAGE_SIZE);
if (ret_memset != EOK) {
VM_CLOSE(HASH_SEGMENT, &page_entry);
OG_THROW_ERROR(ERR_SYSTEM_CALL, ret_memset);
return OG_ERROR;
}
VM_CLOSE(HASH_SEGMENT, &page_entry);
}
hash_table->nentries = entry_num;
return OG_SUCCESS;
}
status_t vm_hash_table_alloc(hash_table_entry_t *table, hash_segment_t *seg, uint32 temp_bucket_num)
{
status_t ret;
id_list_t vm_list;
char *page_buffer = NULL;
page_entry_t page_entry;
hash_table_t *hash_table = NULL;
uint32 buckets_size;
uint32 hash_table_head_size;
uint32 bucket_num = temp_bucket_num;
bucket_num = vm_hash_table_get_bucket_size(bucket_num);
buckets_size = HASH_BUCKET_NODE_SIZE * MAX_HASH_BUCKET_SIZE;
hash_table_head_size = (CM_ALIGN_CEIL(buckets_size, OG_VMEM_PAGE_SIZE) + 1) * sizeof(uint32) + sizeof(hash_table_t);
hash_table_head_size = CM_ALIGN_ANY(hash_table_head_size, HASH_BUCKET_NODE_SIZE);
if (hash_table_head_size > OG_VMEM_PAGE_SIZE) {
OG_THROW_ERROR(ERR_HASH_TABLE_TOO_LARGE, bucket_num);
return OG_ERROR;
}
OG_RETURN_IFERR(vm_hash_alloc_page(seg, &page_entry));
HASH_LAST_PAGE(seg) = page_entry;
HASH_LAST_PAGE_USED(seg) = 0;
OG_RETURN_IFERR(VM_OPEN(HASH_SEGMENT, &page_entry, &page_buffer));
hash_table = (hash_table_t *)page_buffer;
MEMS_RETURN_IFERR(memset_sp(hash_table, sizeof(hash_table_t), 0, sizeof(hash_table_t)));
hash_table->rnums = 0;
hash_table->nentries = 0;
hash_table->seg = seg;
hash_table->ffact = HASH_FILL_FACTOR;
hash_table->hash = sql_hash_func;
hash_table->equal = sql_hash_equal_func;
hash_table->bucket_num = bucket_num;
hash_table->max_bucket = bucket_num - 1;
hash_table->high_mask = bucket_num - 1;
hash_table->low_mask = bucket_num - 1;
hash_table->self.page = page_entry;
hash_table->self.offset = HASH_LAST_PAGE_USED(seg);
HASH_LAST_PAGE_USED(seg) += hash_table_head_size;
*table = hash_table->self;
hash_table->has_null_key = OG_FALSE;
hash_table->is_empty = OG_TRUE;
vm_list = seg->vm_list;
seg->vm_list.count = 0;
ret = vm_hash_table_set_page_entries(hash_table, seg);
vm_append_list(HASH_POOL(seg), HASH_PAGE_LIST(seg), &vm_list);
VM_CLOSE(HASH_SEGMENT, HASH_TABLE_ENTRY_VMID);
return ret;
}
status_t vm_hash_table_init(hash_segment_t *seg, hash_table_entry_t *table, oper_func_t i_oper, oper_func_t q_oper,
void *oper_ctx)
{
char *page_buffer = NULL;
hash_table_t *hash_table = NULL;
OG_RETURN_IFERR(VM_OPEN(HASH_SEGMENT, &table->page, &page_buffer));
hash_table = (hash_table_t *)(page_buffer + table->offset);
hash_table->i_oper = i_oper;
hash_table->q_oper = q_oper;
hash_table->callback_ctx = oper_ctx;
VM_CLOSE(HASH_SEGMENT, &table->page);
return OG_SUCCESS;
}
status_t vm_hash_table_set_func(hash_segment_t *seg, hash_table_entry_t *table, hash_func_t hash, equal_func_t equal)
{
hash_table_t *hash_table = NULL;
OG_RETURN_IFERR(vm_hash_table_get_head(&hash_table, seg, table));
hash_table->hash = hash;
hash_table->equal = equal;
VM_CLOSE(HASH_SEGMENT, &table->page);
return OG_SUCCESS;
}
static status_t vm_hash_table_get_bucket(hash_table_t *hash_table, page_entry_t *self, hash_entry_t **next,
hash_segment_t *seg, uint32 idx)
{
char *page_buf = NULL;
uint32 bucket = idx;
*self = hash_table->page_entries[bucket / HASH_BUCKETS_PER_PAGE];
if (VM_OPEN(HASH_SEGMENT, self, &page_buf) != OG_SUCCESS) {
return OG_ERROR;
}
bucket = bucket % HASH_BUCKETS_PER_PAGE;
*next = (hash_entry_t *)(page_buf + bucket * HASH_BUCKET_NODE_SIZE);
return OG_SUCCESS;
}
static status_t vm_hash_table_get_bucket_entry(hash_table_t *hash_table, page_entry_t *self, hash_entry_t **next,
hash_segment_t *seg, uint32 idx)
{
if (vm_hash_table_get_bucket(hash_table, self, next, seg, idx) != OG_SUCCESS) {
VM_CLOSE(HASH_SEGMENT, HASH_TABLE_ENTRY_VMID);
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t vm_hash_table_move_bucket_node(hash_table_t *hash_table, hash_segment_t *seg, uint32 old_bucket,
uint32 new_bucket)
{
uint32 bucket_num;
hash_node_t *hash_node = NULL;
hash_node_t *l_node = NULL;
hash_node_t *h_node = NULL;
hash_entry_t *l_entry = NULL;
hash_entry_t *h_entry = NULL;
hash_entry_t *prev_next = NULL;
page_entry_t l_vmid;
page_entry_t h_vmid;
page_entry_t tmp_vmid;
page_entry_t l_node_vmid = g_invalid_page_entry;
page_entry_t h_node_vmid = g_invalid_page_entry;
status_t status = OG_SUCCESS;
if (vm_hash_table_get_bucket_entry(hash_table, &l_vmid, &l_entry, seg, old_bucket) != OG_SUCCESS) {
return OG_ERROR;
}
if (vm_hash_table_get_bucket_entry(hash_table, &h_vmid, &h_entry, seg, new_bucket) != OG_SUCCESS) {
VM_CLOSE(HASH_SEGMENT, &l_vmid);
return OG_ERROR;
}
prev_next = l_entry;
while (prev_next->vmid != OG_INVALID_ID32) {
if (vm_hash_table_get_node(&hash_node, prev_next, seg) != OG_SUCCESS) {
status = OG_ERROR;
break;
}
tmp_vmid = prev_next->page;
* the node should be moved to the right bucket.
*/
bucket_num = vm_hash_table_calc_bucket(hash_table, hash_node->hash_value);
if (bucket_num != old_bucket) {
if (h_node != NULL) {
h_node->next = *prev_next;
VM_CLOSE(HASH_SEGMENT, &h_node_vmid);
} else {
*h_entry = *prev_next;
}
h_node = hash_node;
h_node_vmid = tmp_vmid;
} else {
if (l_node != NULL) {
l_node->next = *prev_next;
VM_CLOSE(HASH_SEGMENT, &l_node_vmid);
} else {
*l_entry = *prev_next;
}
l_node = hash_node;
l_node_vmid = tmp_vmid;
}
prev_next = &hash_node->next;
}
if (l_node == NULL) {
l_entry->page = g_invalid_page_entry;
}
if (h_node == NULL) {
h_entry->page = g_invalid_page_entry;
}
if (!IS_INVALID_PAGE_ENTRY(&l_node_vmid)) {
l_node->next.page = g_invalid_page_entry;
VM_CLOSE(HASH_SEGMENT, &l_node_vmid);
}
if (!IS_INVALID_PAGE_ENTRY(&h_node_vmid)) {
h_node->next.page = g_invalid_page_entry;
VM_CLOSE(HASH_SEGMENT, &h_node_vmid);
}
VM_CLOSE_TWO_PAGES(&l_vmid, &h_vmid);
return status;
}
static status_t vm_hash_table_extend_entries(hash_table_t *hash_table, hash_segment_t *seg)
{
id_list_t vm_list;
char *page_buf = NULL;
errno_t rc_memset;
page_entry_t page_entry;
vm_list = seg->vm_list;
seg->vm_list.count = 0;
if (vm_hash_alloc_page(seg, &page_entry) != OG_SUCCESS) {
vm_append_list(HASH_POOL(seg), &seg->vm_list, &vm_list);
return OG_ERROR;
}
hash_table->page_entries[hash_table->nentries++] = page_entry;
if (VM_OPEN(HASH_SEGMENT, &page_entry, &page_buf) != OG_SUCCESS) {
vm_append_list(HASH_POOL(seg), &seg->vm_list, &vm_list);
return OG_ERROR;
}
rc_memset = memset_sp(page_buf, OG_VMEM_PAGE_SIZE, 0xFF, OG_VMEM_PAGE_SIZE);
VM_CLOSE(HASH_SEGMENT, &page_entry);
vm_append_list(HASH_POOL(seg), &seg->vm_list, &vm_list);
if (rc_memset != EOK) {
OG_THROW_ERROR(ERR_SYSTEM_CALL, rc_memset);
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t vm_hash_table_extend(hash_table_t *hash_table, hash_segment_t *seg)
{
uint32 old_bucket;
uint32 new_bucket;
uint32 page_entry;
old_bucket = vm_hash_table_calc_bucket(hash_table, hash_table->max_bucket + 1);
page_entry = (hash_table->max_bucket + 1) / HASH_BUCKETS_PER_PAGE;
if (page_entry >= hash_table->nentries) {
OG_RETURN_IFERR(vm_hash_table_extend_entries(hash_table, seg));
}
++hash_table->max_bucket;
if (hash_table->max_bucket > hash_table->high_mask) {
hash_table->bucket_num <<= 1;
hash_table->low_mask = hash_table->high_mask;
hash_table->high_mask = hash_table->max_bucket | hash_table->low_mask;
}
new_bucket = vm_hash_table_calc_bucket(hash_table, hash_table->max_bucket);
if (vm_hash_table_move_bucket_node(hash_table, seg, old_bucket, new_bucket) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static inline bool32 vm_hash_table_need_extend(hash_table_t *hash_table)
{
if (hash_table->rnums < (uint32)(int32)(hash_table->ffact * (hash_table->max_bucket + 1))) {
return OG_FALSE;
}
if (hash_table->bucket_num == MAX_HASH_BUCKET_SIZE && hash_table->max_bucket == hash_table->high_mask) {
return OG_FALSE;
}
return OG_TRUE;
}
static status_t vm_hash_segment_insert(hash_node_t **node_out, hash_entry_t *prev_next, hash_segment_t *seg,
const char *buf, uint32 size, uint32 hash_value, bool32 is_new_key)
{
char *page_buffer = NULL;
page_entry_t page_entry;
hash_node_t *hash_node = NULL;
uint32 node_size;
uint32 remain_size;
int32 ret_memcpy;
remain_size = OG_VMEM_PAGE_SIZE - HASH_LAST_PAGE_USED(seg);
node_size = sizeof(hash_node_t) + size;
if (node_size > remain_size) {
OG_RETURN_IFERR(vm_hash_alloc_page(seg, &page_entry));
HASH_LAST_PAGE_USED(seg) = 0;
HASH_LAST_PAGE(seg) = page_entry;
}
OG_RETURN_IFERR(VM_OPEN(HASH_SEGMENT, &HASH_LAST_PAGE(seg), &page_buffer));
hash_node = (hash_node_t *)(page_buffer + HASH_LAST_PAGE_USED(seg));
hash_node->next.page = prev_next->page;
hash_node->next.offset = prev_next->offset;
prev_next->page = HASH_LAST_PAGE(seg);
prev_next->offset = HASH_LAST_PAGE_USED(seg);
hash_node->size = size;
hash_node->is_new_key = is_new_key;
hash_node->is_deleted = OG_FALSE;
hash_node->unused = 0;
hash_node->hash_value = hash_value;
ret_memcpy = memcpy_sp(hash_node->data, size, buf, size);
if (ret_memcpy != EOK) {
VM_CLOSE(HASH_SEGMENT, &HASH_LAST_PAGE(seg));
OG_THROW_ERROR(ERR_SYSTEM_CALL, ret_memcpy);
return OG_ERROR;
}
HASH_LAST_PAGE_USED(seg) += node_size;
if (node_out == NULL) {
VM_CLOSE(HASH_SEGMENT, &HASH_LAST_PAGE(seg));
return OG_SUCCESS;
}
*node_out = hash_node;
return OG_SUCCESS;
}
status_t vm_hash_table_insert(bool32 *found, hash_segment_t *seg, hash_table_entry_t *table, const char *buf,
uint32 size)
{
hash_table_t *hash_table = NULL;
hash_entry_t *prev_next = NULL;
hash_node_t *hash_node = NULL;
page_entry_t prev_vmid;
page_entry_t tmp_vmid;
uint32 bucket;
uint32 hash_value;
status_t ret;
OG_RETURN_IFERR(vm_hash_table_get_head(&hash_table, seg, table));
hash_table->is_empty = OG_FALSE;
if (buf == NULL) {
hash_table->has_null_key = OG_TRUE;
VM_CLOSE(HASH_SEGMENT, &table->page);
return OG_SUCCESS;
}
if (vm_hash_table_need_extend(hash_table)) {
if (vm_hash_table_extend(hash_table, seg) != OG_SUCCESS) {
VM_CLOSE(HASH_SEGMENT, &table->page);
return OG_ERROR;
}
}
hash_value = hash_table->hash(buf);
bucket = vm_hash_table_calc_bucket(hash_table, hash_value);
OG_RETURN_IFERR(vm_hash_table_get_bucket_entry(hash_table, &prev_vmid, &prev_next, seg, bucket));
*found = OG_FALSE;
while (!IS_INVALID_PAGE_ENTRY(&prev_next->page)) {
if (vm_hash_table_get_node(&hash_node, prev_next, seg) != OG_SUCCESS) {
VM_CLOSE_TWO_PAGES(&prev_vmid, &table->page);
return OG_ERROR;
}
if (hash_value == hash_node->hash_value &&
hash_table->equal(found, HASH_CALLBACK_CTX(hash_table), hash_node->data, hash_node->size, buf, size) !=
OG_SUCCESS) {
VM_CLOSE_THREE_PAGES(&prev_next->page, &prev_vmid, &table->page);
return OG_ERROR;
}
if (*found) {
++hash_table->rnums;
ret = vm_hash_segment_insert(NULL, &hash_node->next, seg, buf, size, hash_value, OG_FALSE);
VM_CLOSE_THREE_PAGES(&prev_next->page, &prev_vmid, &table->page);
return ret;
}
tmp_vmid = prev_next->page;
VM_CLOSE(HASH_SEGMENT, &prev_vmid);
prev_vmid = tmp_vmid;
prev_next = &hash_node->next;
}
*found = OG_FALSE;
++hash_table->rnums;
ret = vm_hash_segment_insert(NULL, prev_next, seg, buf, size, hash_value, OG_TRUE);
VM_CLOSE_TWO_PAGES(&prev_vmid, &table->page);
return ret;
}
status_t vm_hash_table_insert2(bool32 *found, hash_segment_t *seg, hash_table_entry_t *table, const char *buf,
uint32 size)
{
hash_table_t *hash_table = NULL;
hash_entry_t *prev_next = NULL;
hash_node_t *hash_node = NULL;
page_entry_t prev_vmid;
page_entry_t tmp_vmid;
uint32 bucket;
uint32 hash_value;
status_t ret;
OG_RETURN_IFERR(vm_hash_table_get_head(&hash_table, seg, table));
hash_table->is_empty = OG_FALSE;
if (buf == NULL) {
hash_table->has_null_key = OG_TRUE;
VM_CLOSE(HASH_SEGMENT, &table->page);
return OG_SUCCESS;
}
if (vm_hash_table_need_extend(hash_table)) {
if (vm_hash_table_extend(hash_table, seg) != OG_SUCCESS) {
VM_CLOSE(HASH_SEGMENT, &table->page);
return OG_ERROR;
}
}
hash_value = hash_table->hash(buf);
bucket = vm_hash_table_calc_bucket(hash_table, hash_value);
OG_RETURN_IFERR(vm_hash_table_get_bucket_entry(hash_table, &prev_vmid, &prev_next, seg, bucket));
*found = OG_FALSE;
while (!IS_INVALID_PAGE_ENTRY(&prev_next->page)) {
if (vm_hash_table_get_node(&hash_node, prev_next, seg) != OG_SUCCESS) {
VM_CLOSE_TWO_PAGES(&prev_vmid, &table->page);
return OG_ERROR;
}
if (hash_value == hash_node->hash_value &&
hash_table->equal(found, HASH_CALLBACK_CTX(hash_table), hash_node->data, hash_node->size, buf, size) !=
OG_SUCCESS) {
VM_CLOSE_THREE_PAGES(&prev_next->page, &prev_vmid, &table->page);
return OG_ERROR;
}
if (*found) {
ret = hash_table->i_oper == NULL ?
OG_SUCCESS :
hash_table->i_oper(HASH_CALLBACK_CTX(hash_table), buf, size, hash_node->data, hash_node->size, OG_TRUE);
VM_CLOSE_THREE_PAGES(&prev_next->page, &prev_vmid, &table->page);
return ret;
}
tmp_vmid = prev_next->page;
VM_CLOSE(HASH_SEGMENT, &prev_vmid);
prev_vmid = tmp_vmid;
prev_next = &hash_node->next;
}
*found = OG_FALSE;
++hash_table->rnums;
if (vm_hash_segment_insert(&hash_node, prev_next, seg, buf, size, hash_value, OG_TRUE) != OG_SUCCESS) {
VM_CLOSE_TWO_PAGES(&prev_vmid, &table->page);
return OG_ERROR;
}
ret = hash_table->i_oper == NULL ? OG_SUCCESS :
hash_table->i_oper(HASH_CALLBACK_CTX(hash_table), buf, size, hash_node->data, hash_node->size, OG_FALSE);
VM_CLOSE_THREE_PAGES(&prev_next->page, &prev_vmid, &table->page);
return ret;
}
#define IS_VALID_HASH_NODE(iter, node) \
(((iter)->flags == 0) || ((iter)->flags == ITER_IGNORE_DEL && !(node)->is_deleted) || \
((iter)->flags == ITER_FETCH_DEL && (node)->is_deleted))
static status_t vm_hash_match_first_value(bool32 *found, hash_segment_t *seg, hash_table_t *hash_table,
const char *key_buf, uint32 key_sz, hash_table_iter_t *iter)
{
hash_node_t *hash_node = NULL;
hash_entry_t *prev_next = NULL;
page_entry_t prev_vmid;
page_entry_t tmp_vmid;
uint32 bucket;
uint32 hash_value;
hash_value = hash_table->hash(key_buf);
bucket = vm_hash_table_calc_bucket(hash_table, hash_value);
if (vm_hash_table_get_bucket_entry(hash_table, &prev_vmid, &prev_next, seg, bucket) != OG_SUCCESS) {
return OG_ERROR;
}
while (!IS_INVALID_PAGE_ENTRY(&prev_next->page)) {
if (vm_hash_table_get_node(&hash_node, prev_next, seg) != OG_SUCCESS) {
VM_CLOSE(HASH_SEGMENT, &prev_vmid);
return OG_ERROR;
}
if (IS_VALID_HASH_NODE(iter, hash_node)) {
if (hash_table->equal(found, HASH_CALLBACK_CTX(hash_table),
hash_node->data, hash_node->size, key_buf, key_sz) != OG_SUCCESS) {
VM_CLOSE(HASH_SEGMENT, &prev_next->page);
VM_CLOSE(HASH_SEGMENT, &prev_vmid);
return OG_ERROR;
}
if (*found) {
iter->curr_match = *prev_next;
VM_CLOSE(HASH_SEGMENT, &prev_next->page);
VM_CLOSE(HASH_SEGMENT, &prev_vmid);
return OG_SUCCESS;
}
}
tmp_vmid = prev_next->page;
VM_CLOSE(HASH_SEGMENT, &prev_vmid);
prev_vmid = tmp_vmid;
prev_next = &hash_node->next;
}
VM_CLOSE(HASH_SEGMENT, &prev_vmid);
*found = OG_FALSE;
iter->curr_match.page = g_invalid_page_entry;
return OG_SUCCESS;
}
status_t vm_hash_table_probe(bool32 *eof, hash_segment_t *seg, hash_table_entry_t *table,
hash_scan_assist_t *scan_assit)
{
hash_table_t *hash_table = NULL;
bool32 found = OG_FALSE;
hash_table_iter_t iter;
sql_init_hash_iter(&iter, NULL);
OG_RETURN_IFERR(vm_hash_table_get_head(&hash_table, seg, table));
if (scan_assit->scan_mode == HASH_FULL_SCAN) {
*eof = hash_table->is_empty;
VM_CLOSE(HASH_SEGMENT, &table->page);
return OG_SUCCESS;
}
if (vm_hash_match_first_value(&found, seg, hash_table, HASH_KEY_SCAN_BUF(scan_assit),
HASH_KEY_SCAN_SIZE(scan_assit), &iter) != OG_SUCCESS) {
VM_CLOSE(HASH_SEGMENT, &table->page);
return OG_ERROR;
}
VM_CLOSE(HASH_SEGMENT, &table->page);
*eof = !found;
return OG_SUCCESS;
}
status_t vm_hash_table_open(hash_segment_t *seg, hash_table_entry_t *table, hash_scan_assist_t *scan_assit,
bool32 *found, hash_table_iter_t *iter)
{
hash_table_t *hash_table = NULL;
hash_entry_t *prev_next = NULL;
page_entry_t prev_vmid;
OG_RETURN_IFERR(vm_hash_get_hash_table(&hash_table, seg, table, iter));
iter->scan_mode = scan_assit->scan_mode;
if (scan_assit->scan_mode == HASH_FULL_SCAN) {
if (vm_hash_table_get_bucket_entry(hash_table, &prev_vmid, &prev_next, seg, 0) != OG_SUCCESS) {
VM_CLOSE(HASH_SEGMENT, &table->page);
iter->hash_table = NULL;
return OG_ERROR;
}
iter->curr_match = *prev_next;
iter->curr_bucket = 0;
VM_CLOSE(HASH_SEGMENT, &prev_vmid);
return OG_SUCCESS;
}
if (vm_hash_match_first_value(found, seg, hash_table, HASH_KEY_SCAN_BUF(scan_assit),
HASH_KEY_SCAN_SIZE(scan_assit), iter) != OG_SUCCESS) {
VM_CLOSE(HASH_SEGMENT, &table->page);
iter->hash_table = NULL;
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t vm_hash_table_full_fetch(bool32 *eof, hash_segment_t *seg, hash_table_t *hash_table,
hash_table_iter_t *table_iter)
{
hash_node_t *hash_node = NULL;
hash_entry_t *prev_next = NULL;
page_entry_t prev_vmid;
page_entry_t tmp_vmid;
status_t ret;
void *callback_ctx = (table_iter->callback_ctx == NULL) ? HASH_CALLBACK_CTX(hash_table) : table_iter->callback_ctx;
for (;;) {
while (IS_INVALID_PAGE_ENTRY(&table_iter->curr_match.page)) {
++table_iter->curr_bucket;
if (table_iter->curr_bucket > hash_table->max_bucket) {
*eof = OG_TRUE;
return OG_SUCCESS;
}
OG_RETURN_IFERR(vm_hash_table_get_bucket(hash_table, &prev_vmid, &prev_next, seg, table_iter->curr_bucket));
table_iter->curr_match = *prev_next;
VM_CLOSE(HASH_SEGMENT, &prev_vmid);
}
OG_RETURN_IFERR(vm_hash_table_get_node(&hash_node, &table_iter->curr_match, seg));
OG_BREAK_IF_TRUE(IS_VALID_HASH_NODE(table_iter, hash_node));
tmp_vmid = table_iter->curr_match.page;
table_iter->curr_match = hash_node->next;
VM_CLOSE(HASH_SEGMENT, &tmp_vmid);
}
ret = hash_table->q_oper == NULL ?
OG_SUCCESS :
hash_table->q_oper(callback_ctx, NULL, 0, hash_node->data, hash_node->size, OG_TRUE);
if (ret != OG_SUCCESS) {
VM_CLOSE(HASH_SEGMENT, &table_iter->curr_match.page);
return OG_ERROR;
}
*eof = OG_FALSE;
tmp_vmid = table_iter->curr_match.page;
table_iter->curr_match = hash_node->next;
VM_CLOSE(HASH_SEGMENT, &tmp_vmid);
return OG_SUCCESS;
}
static status_t vm_hash_table_key_prefetch(hash_node_t *hash_node, hash_segment_t *seg, hash_table_iter_t *table_iter)
{
page_entry_t tmp_vmid;
hash_entry_t next_entry = hash_node->next;
hash_node_t *next_node = NULL;
while (!IS_INVALID_PAGE_ENTRY(&next_entry.page)) {
if (vm_hash_table_get_node(&next_node, &next_entry, seg) != OG_SUCCESS) {
VM_CLOSE(HASH_SEGMENT, &table_iter->curr_match.page);
table_iter->curr_match.page = g_invalid_page_entry;
return OG_ERROR;
}
if (next_node->is_new_key) {
VM_CLOSE(HASH_SEGMENT, &next_entry.page);
VM_CLOSE(HASH_SEGMENT, &table_iter->curr_match.page);
table_iter->curr_match.page = g_invalid_page_entry;
return OG_SUCCESS;
}
if (IS_VALID_HASH_NODE(table_iter, next_node)) {
VM_CLOSE(HASH_SEGMENT, &next_entry.page);
break;
}
tmp_vmid = next_entry.page;
next_entry = next_node->next;
VM_CLOSE(HASH_SEGMENT, &tmp_vmid);
}
tmp_vmid = table_iter->curr_match.page;
table_iter->curr_match = next_entry;
VM_CLOSE(HASH_SEGMENT, &tmp_vmid);
return OG_SUCCESS;
}
static status_t vm_hash_table_key_fetch(bool32 *eof, hash_segment_t *seg, hash_table_t *hash_table,
hash_table_iter_t *table_iter)
{
hash_node_t *hash_node = NULL;
void *callbak_ctx = (table_iter->callback_ctx == NULL) ? HASH_CALLBACK_CTX(hash_table) : table_iter->callback_ctx;
if (table_iter->curr_match.vmid == OG_INVALID_ID32) {
*eof = OG_TRUE;
return OG_SUCCESS;
}
*eof = OG_FALSE;
if (vm_hash_table_get_node(&hash_node, &table_iter->curr_match, seg) != OG_SUCCESS) {
return OG_ERROR;
}
callback operation function is used to deal with the record matched
e.g., in this callback, hash-join can copy rowid(s) of materialized record(s) out
*/
if (hash_table->q_oper != NULL &&
hash_table->q_oper(callbak_ctx, NULL, 0, hash_node->data, hash_node->size, OG_TRUE) != OG_SUCCESS) {
VM_CLOSE(HASH_SEGMENT, &table_iter->curr_match.page);
return OG_ERROR;
}
return vm_hash_table_key_prefetch(hash_node, seg, table_iter);
}
status_t vm_hash_table_fetch(bool32 *eof, hash_segment_t *seg, hash_table_entry_t *table, hash_table_iter_t *table_iter)
{
hash_table_t *hash_table = NULL;
OG_RETURN_IFERR(vm_hash_get_hash_table(&hash_table, seg, table, table_iter));
if (table_iter->scan_mode == HASH_FULL_SCAN) {
return vm_hash_table_full_fetch(eof, seg, hash_table, table_iter);
} else {
return vm_hash_table_key_fetch(eof, seg, hash_table, table_iter);
}
}
status_t vm_hash_table_has_null_key(bool32 *has_null_key, hash_segment_t *seg, hash_table_entry_t *table)
{
hash_table_t *hash_table = NULL;
OG_RETURN_IFERR(vm_hash_table_get_head(&hash_table, seg, table));
*has_null_key = hash_table->has_null_key;
VM_CLOSE(HASH_SEGMENT, &table->page);
return OG_SUCCESS;
}
status_t vm_hash_table_empty(bool32 *empty, hash_segment_t *seg, hash_table_entry_t *table)
{
hash_table_t *hash_table = NULL;
OG_RETURN_IFERR(vm_hash_table_get_head(&hash_table, seg, table));
*empty = hash_table->is_empty;
VM_CLOSE(HASH_SEGMENT, &table->page);
return OG_SUCCESS;
}
status_t vm_hash_table_get_rows(uint32 *rnums, hash_segment_t *seg, hash_table_entry_t *table)
{
hash_table_t *hash_table = NULL;
OG_RETURN_IFERR(vm_hash_table_get_head(&hash_table, seg, table));
*rnums = hash_table->rnums;
VM_CLOSE(HASH_SEGMENT, &table->page);
return OG_SUCCESS;
}
status_t vm_hash_table_delete(hash_segment_t *seg, hash_table_entry_t *table, hash_table_iter_t *table_iter)
{
hash_node_t *hash_node = NULL;
if (IS_INVALID_PAGE_ENTRY(&table_iter->curr_match.page)) {
return OG_SUCCESS;
}
OG_RETURN_IFERR(vm_hash_table_get_node(&hash_node, &table_iter->curr_match, seg));
hash_node->is_deleted = OG_TRUE;
VM_CLOSE(HASH_SEGMENT, &table_iter->curr_match.page);
return OG_SUCCESS;
}