* Copyright (c) 2022 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* executor_lease_expire.c
* executor lease expire
*
* IDENTIFICATION
* src/executor/executor_lease_expire.c
*
* -------------------------------------------------------------------------
*/
#include "cm_hash_pool.h"
#include "cm_hash.h"
#include "cm_text.h"
#include "util_defs.h"
#include "dcf_interface.h"
#include "executor_lease_expire.h"
#ifdef __cplusplus
extern "C" {
#endif
#define EXC_PQUE_MIN_ELE_IDX 1
#define EXC_PQUE_2_FIXED 2
static inline uint32 exc_pque_child(uint32 i)
{
return i * EXC_PQUE_2_FIXED;
}
static inline uint32 exc_pque_parent(uint32 i)
{
return i / EXC_PQUE_2_FIXED;
}
status_t exc_pque_init(lease_expire_pque_t *pque, uint32 maxnum)
{
CM_CHECK_NULL_PTR(pque);
uint32 size = (maxnum + 1) * (uint32)sizeof(lease_expire_ele_t *);
pque->eles = (lease_expire_ele_t **)exc_alloc(size);
MEMS_RETURN_IFERR(memset_s(pque->eles, size, 0, size));
pque->size = 0;
pque->capacity = maxnum;
return CM_SUCCESS;
}
void exc_pque_deinit(lease_expire_pque_t *pque)
{
if (pque == NULL) {
return;
}
for (uint32 i = EXC_PQUE_MIN_ELE_IDX; i <= pque->size; i++) {
exc_free(pque->eles[i]);
}
exc_free(pque->eles);
return;
}
static inline bool32 exc_pque_is_full(const lease_expire_pque_t *pque)
{
if (pque == NULL) {
return CM_FALSE;
}
return (pque->capacity == pque->size);
}
static inline bool32 exc_pque_is_empty(const lease_expire_pque_t *pque)
{
if (pque == NULL) {
return CM_FALSE;
}
return (pque->size == 0);
}
static inline void exc_pque_exch(lease_expire_pque_t *pque, uint32 i, uint32 j)
{
lease_expire_ele_t *tmp = pque->eles[j];
pque->eles[j] = pque->eles[i];
pque->eles[i] = tmp;
pque->eles[j]->idx = j;
pque->eles[i]->idx = i;
}
static bool32 exc_pque_sink(lease_expire_pque_t *pque, uint32 idx)
{
uint32 tmpidx = idx;
uint32 size = pque->size;
do {
uint32 lchild = 2 * tmpidx;
if (lchild == 0 || lchild > size) {
break;
}
uint32 minchild = lchild;
uint32 rchild = lchild + 1;
if (rchild <= size && pque->eles[rchild]->expire_time < pque->eles[lchild]->expire_time) {
minchild = rchild;
}
if (pque->eles[minchild]->expire_time >= pque->eles[tmpidx]->expire_time) {
break;
}
exc_pque_exch(pque, tmpidx, minchild);
tmpidx = minchild;
} while (CM_TRUE);
return (tmpidx != idx);
}
static bool32 exc_pque_swim(lease_expire_pque_t *pque, uint32 idx)
{
uint32 tmpidx = idx;
do {
uint32 parent = tmpidx / 2 ;
if (parent == 0 || pque->eles[parent]->expire_time <= pque->eles[tmpidx]->expire_time) {
break;
}
exc_pque_exch(pque, parent, tmpidx);
tmpidx = parent;
} while (CM_TRUE);
return (tmpidx != idx);
}
status_t exc_pque_adjust(lease_expire_pque_t *pque, uint32 idx)
{
CM_CHECK_NULL_PTR(pque);
if (!exc_pque_sink(pque, idx)) {
(void)exc_pque_swim(pque, idx);
}
LOG_DEBUG_INF("[EXC LEASE] pque adjust success for idx:%u", idx);
return CM_SUCCESS;
}
status_t exc_pque_insert(lease_expire_pque_t *pque, lease_expire_ele_t *ele)
{
CM_CHECK_NULL_PTR(pque);
CM_CHECK_NULL_PTR(ele);
if (exc_pque_is_full(pque)) {
LOG_DEBUG_ERR("[EXC LEASE] pque is full");
return CM_ERROR;
}
uint32 i = pque->size + 1;
pque->eles[i] = ele;
pque->eles[i]->idx = i;
for (; i > EXC_PQUE_MIN_ELE_IDX && pque->eles[exc_pque_parent(i)]->expire_time > ele->expire_time;) {
exc_pque_exch(pque, i, exc_pque_parent(i));
i = exc_pque_parent(i);
}
pque->size++;
LOG_DEBUG_INF("[EXC LEASE] pque insert success for ele, lease:%s expire_time:%llu ", ele->name, ele->expire_time);
return CM_SUCCESS;
}
status_t exc_pque_delete(lease_expire_pque_t *pque, uint32 idx)
{
CM_CHECK_NULL_PTR(pque);
uint32 last_idx = pque->size;
if (idx < EXC_PQUE_MIN_ELE_IDX || idx > last_idx) {
LOG_DEBUG_ERR("[EXC LEASE] pque delete with invalid idx:%u last_idx:%u", idx, last_idx);
return CM_ERROR;
}
lease_expire_ele_t *ele = pque->eles[idx];
LOG_DEBUG_INF("[EXC LEASE] pque delete with lease name:%s expire_time:%llu", ele->name, ele->expire_time);
if (idx == last_idx) {
ele->idx = 0;
pque->size--;
return CM_SUCCESS;
}
exc_pque_exch(pque, idx, last_idx);
pque->size--;
ele->idx = 0;
return exc_pque_adjust(pque, idx);
}
void exc_pque_get_min(lease_expire_pque_t *pque, lease_expire_ele_t **min)
{
if (exc_pque_is_empty(pque)) {
LOG_DEBUG_INF("[EXC LEASE] pque is empty when get min");
return;
}
uint32 idx = 1;
*min = pque->eles[idx];
return;
}
status_t exc_pque_delete_min(lease_expire_pque_t *pque, lease_expire_ele_t **min)
{
CM_CHECK_NULL_PTR(pque);
uint32 i;
uint32 minChild;
if (exc_pque_is_empty(pque)) {
LOG_DEBUG_ERR("[EXC LEASE] priority queue is empty\n");
return CM_ERROR;
}
if (pque->eles == NULL || EXC_PQUE_MIN_ELE_IDX > pque->size) {
LOG_DEBUG_ERR("[EXC LEASE] pque->eles is NULL or pque->size is invalid\n");
return CM_ERROR;
}
*min = pque->eles[EXC_PQUE_MIN_ELE_IDX];
lease_expire_ele_t *last = pque->eles[pque->size];
pque->size--;
if (pque->size == 0) {
LOG_DEBUG_INF("[EXC LEASE] pque deleted last ele, name:%s expire_time:%llu", (*min)->name, (*min)->expire_time);
(*min)->idx = 0;
pque->eles[EXC_PQUE_MIN_ELE_IDX] = NULL;
return CM_SUCCESS;
}
for (i = EXC_PQUE_MIN_ELE_IDX; exc_pque_child(i) <= pque->size; i = minChild) {
minChild = exc_pque_child(i);
if (minChild != pque->size && pque->eles[minChild + 1]->expire_time < pque->eles[minChild]->expire_time) {
minChild += 1;
}
if (pque->eles[minChild]->expire_time >= last->expire_time) {
break;
}
pque->eles[i] = pque->eles[minChild];
pque->eles[i]->idx = i;
}
pque->eles[i] = last;
pque->eles[i]->idx = i;
if (*min != NULL) {
(*min)->idx = 0;
LOG_DEBUG_INF("[EXC LEASE] pque deleted min ele, name:%s expire_time:%llu", (*min)->name, (*min)->expire_time);
}
return CM_SUCCESS;
}
void exc_proc_lease_expire(lease_expire_pque_t *pque, spinlock_t *lock)
{
lease_expire_ele_t *min = NULL;
timespec_t now = cm_clock_now_ms();
cm_spin_lock(lock, NULL);
exc_pque_get_min(pque, &min);
cm_spin_unlock(lock);
while (min != NULL && min->expire_time <= now) {
LOG_DEBUG_INF("[EXC LEASE] get min pque ele expire, name:%s expire_time:%llu", min->name, min->expire_time);
uint32 offset = 0;
uint32 size = (uint32)(2 * sizeof(uint32) + strlen(min->name));
size = CM_ALIGN4(size);
char *buff = (char *)exc_alloc(size);
text_t leaseid = {
.str = min->name, .len = strlen(min->name) };
uint32 cmd = DCC_CMD_LEASE_EXPIRE;
exc_put_uint32(buff, cmd, &offset);
if (exc_put_text(buff, size, &leaseid, &offset) != CM_SUCCESS) {
exc_free(buff);
return;
}
uint64 index;
if (dcf_universal_write(EXC_STREAM_ID_DEFAULT, buff, size, 0, &index) != CM_SUCCESS) {
exc_free(buff);
return;
}
LOG_DEBUG_INF("[EXC LEASE] ele expire proposed, ele's name:%s", min->name);
exc_free(buff);
cm_spin_lock(lock, NULL);
status_t ret = exc_pque_delete_min(pque, &min);
cm_spin_unlock(lock);
if (ret != CM_SUCCESS) {
return;
}
min = NULL;
cm_spin_lock(lock, NULL);
exc_pque_get_min(pque, &min);
cm_spin_unlock(lock);
}
}
#ifdef __cplusplus
}
#endif