* Copyright (c) 2022 Huawei Technologies Co.,Ltd.
*
* DSS is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* dss_session.c
*
*
* IDENTIFICATION
* src/common/dss_session.c
*
* -------------------------------------------------------------------------
*/
#include "dss_session.h"
#include "cm_utils.h"
#include "dss_diskgroup.h"
#include "dss_malloc.h"
#include "dss_file.h"
#include "dss_redo.h"
#include "cm_system.h"
#include "dss_thv.h"
#include "dss_hp_interface.h"
#ifdef __cplusplus
extern "C" {
#endif
dss_session_ctrl_t g_dss_session_ctrl = {0};
status_t dss_extend_session(uint32 extend_num)
{
uint32 objectid;
uint32_t old_alloc_sessions = g_dss_session_ctrl.alloc_sessions;
uint32_t new_alloc_sessions = g_dss_session_ctrl.alloc_sessions + extend_num;
if (new_alloc_sessions > g_dss_session_ctrl.total) {
LOG_RUN_ERR("Failed to extend session, expect new alloc sessions %u, but max is %u.", new_alloc_sessions,
g_dss_session_ctrl.total);
DSS_THROW_ERROR(ERR_DSS_SESSION_EXTEND, "expect new alloc sessions %u, but max is %u.", new_alloc_sessions,
g_dss_session_ctrl.total);
return CM_ERROR;
}
for (uint32_t i = old_alloc_sessions; i < new_alloc_sessions; i++) {
objectid = ga_alloc_object(GA_SESSION_POOL, DSS_INVALID_ID32);
if (objectid == DSS_INVALID_ID32) {
LOG_RUN_ERR("Failed to alloc object for session %u.", i);
DSS_THROW_ERROR(ERR_DSS_SESSION_EXTEND, "Failed to alloc object for session %u.", i);
return CM_ERROR;
}
LOG_DEBUG_INF("Alloc object %u for session %u.", objectid, i);
g_dss_session_ctrl.sessions[i] = (dss_session_t *)ga_object_addr(GA_SESSION_POOL, objectid);
g_dss_session_ctrl.sessions[i]->id = i;
g_dss_session_ctrl.sessions[i]->is_used = CM_FALSE;
g_dss_session_ctrl.sessions[i]->is_closed = CM_TRUE;
g_dss_session_ctrl.sessions[i]->put_log = CM_FALSE;
g_dss_session_ctrl.sessions[i]->objectid = objectid;
g_dss_session_ctrl.sessions[i]->is_holding_hotpatch_latch = CM_FALSE;
g_dss_session_ctrl.alloc_sessions++;
}
LOG_RUN_INF("Succeed to extend sessions to %u.", g_dss_session_ctrl.alloc_sessions);
return CM_SUCCESS;
}
status_t dss_init_session_pool(uint32 max_session_num)
{
uint32 dss_session_size = (uint32)(max_session_num * sizeof(dss_session_t *));
g_dss_session_ctrl.sessions = cm_malloc(dss_session_size);
if (g_dss_session_ctrl.sessions == NULL) {
return ERR_DSS_GA_INIT;
}
errno_t errcode = memset_s(g_dss_session_ctrl.sessions, dss_session_size, 0, dss_session_size);
securec_check_ret(errcode);
g_dss_session_ctrl.alloc_sessions = 0;
uint32 extend_num = max_session_num >= DSS_SESSION_NUM_PER_GROUP ? DSS_SESSION_NUM_PER_GROUP : max_session_num;
g_dss_session_ctrl.total = max_session_num;
status_t status = dss_extend_session(extend_num);
if (status != CM_SUCCESS) {
return status;
}
g_dss_session_ctrl.is_inited = CM_TRUE;
return CM_SUCCESS;
}
dss_session_ctrl_t *dss_get_session_ctrl(void)
{
return &g_dss_session_ctrl;
}
uint32 dss_get_udssession_startid(void)
{
dss_config_t *inst_cfg = dss_get_inst_cfg();
uint32 start_sid = (uint32)DSS_BACKGROUND_TASK_NUM;
if (inst_cfg->params.nodes_list.inst_cnt > 1) {
start_sid = start_sid + inst_cfg->params.channel_num + inst_cfg->params.work_thread_cnt;
}
return start_sid;
}
uint32 dss_get_max_total_session_cnt(void)
{
dss_config_t *inst_cfg = dss_get_inst_cfg();
return dss_get_udssession_startid() + inst_cfg->params.cfg_session_num;
}
uint32 dss_get_recover_task_idx(void)
{
return (dss_get_udssession_startid() - (uint32)DSS_BACKGROUND_TASK_NUM);
}
uint32 dss_get_delay_clean_task_idx(void)
{
return (dss_get_udssession_startid() - (uint32)DSS_BACKGROUND_TASK_NUM) + DSS_DELAY_CLEAN_BACKGROUND_TASK;
}
uint32 dss_get_hashmap_dynamic_extend_task_idx(void)
{
return (dss_get_udssession_startid() - (uint32)DSS_BACKGROUND_TASK_NUM) + DSS_HASHMAP_DYNAMIC_EXTEND_TASK;
}
uint32 dss_get_bg_task_set_idx(uint32 task_id_base, uint32 idx)
{
return (dss_get_udssession_startid() - (uint32)DSS_BACKGROUND_TASK_NUM) + task_id_base + idx;
}
uint32 dss_get_meta_syn_task_idx(uint32 idx)
{
return dss_get_bg_task_set_idx(DSS_META_SYN_BG_TASK_BASE, idx);
}
uint32 dss_get_recycle_meta_task_idx(uint32 idx)
{
return dss_get_bg_task_set_idx(DSS_RECYCLE_META_TASK_BASE, idx);
}
uint32 dss_get_alarm_check_task_idx(void)
{
return (dss_get_udssession_startid() - (uint32)DSS_BACKGROUND_TASK_NUM) + DSS_ALARM_CHECK_TASK;
}
uint32 dss_get_dyn_log_task_idx(void)
{
return (dss_get_udssession_startid() - (uint32)DSS_BACKGROUND_TASK_NUM) + DSS_DYN_LOG_TASK;
}
static status_t dss_init_session(dss_session_t *session, const cs_pipe_t *pipe)
{
dss_latch_stack_t *latch_stack = &session->latch_stack;
errno_t errcode = memset_s(latch_stack, sizeof(dss_latch_stack_t), 0, sizeof(dss_latch_stack_t));
securec_check_ret(errcode);
session->is_direct = CM_TRUE;
session->connected = CM_FALSE;
if (pipe != NULL) {
session->pipe = *pipe;
session->connected = CM_TRUE;
}
session->is_closed = CM_FALSE;
session->proto_type = PROTO_TYPE_UNKNOWN;
session->status = DSS_SESSION_STATUS_IDLE;
session->client_version = DSS_PROTO_VERSION;
session->proto_version = DSS_PROTO_VERSION;
errcode = memset_s(
session->dss_session_stat, DSS_EVT_COUNT * sizeof(dss_stat_item_t), 0, DSS_EVT_COUNT * sizeof(dss_stat_item_t));
securec_check_ret(errcode);
session->is_holding_hotpatch_latch = CM_FALSE;
if (session->thv_read_buf == NULL) {
session->thv_read_buf = (char *)cm_malloc_align(DSS_DISK_UNIT_SIZE, DSS_LOADDISK_BUFFER_SIZE);
if (session->thv_read_buf == NULL) {
DSS_RETURN_IFERR2(
DSS_READ4STANDBY_ERR, DSS_THROW_ERROR(ERR_ALLOC_MEMORY, DSS_LOADDISK_BUFFER_SIZE, "g_thv_read_buf"));
}
}
(void)memset_s(session->thv_read_buf, DSS_LOADDISK_BUFFER_SIZE, 0, DSS_LOADDISK_BUFFER_SIZE);
return CM_SUCCESS;
}
dss_session_t *dss_get_reserv_session(uint32 idx)
{
dss_session_ctrl_t *session_ctrl = dss_get_session_ctrl();
dss_session_t *session = session_ctrl->sessions[idx];
return session;
}
status_t dss_create_session(const cs_pipe_t *pipe, dss_session_t **session)
{
uint32 i, id;
*session = NULL;
id = DSS_INVALID_ID32;
cm_spin_lock(&g_dss_session_ctrl.lock, NULL);
uint32 start_sid = dss_get_udssession_startid();
uint32 end_sid = dss_get_max_total_session_cnt();
status_t status;
for (i = start_sid; i < end_sid; i++) {
if (i >= g_dss_session_ctrl.alloc_sessions) {
uint32 extend_num =
g_dss_session_ctrl.total - g_dss_session_ctrl.alloc_sessions >= DSS_SESSION_NUM_PER_GROUP ?
DSS_SESSION_NUM_PER_GROUP :
g_dss_session_ctrl.total - g_dss_session_ctrl.alloc_sessions;
status = dss_extend_session(extend_num);
if (status != CM_SUCCESS) {
cm_spin_unlock(&g_dss_session_ctrl.lock);
return status;
}
}
if (g_dss_session_ctrl.sessions[i]->is_used == CM_FALSE) {
id = i;
break;
}
}
if (id == DSS_INVALID_ID32) {
LOG_DEBUG_INF("No sessions are available.");
cm_spin_unlock(&g_dss_session_ctrl.lock);
return ERR_DSS_SESSION_CREATE;
}
*session = g_dss_session_ctrl.sessions[i];
LOG_DEBUG_INF("Session[%u] is available.", id);
cm_spin_lock(&(*session)->lock, NULL);
g_dss_session_ctrl.used_count++;
(*session)->is_used = CM_TRUE;
cm_spin_unlock(&(*session)->lock);
cm_spin_unlock(&g_dss_session_ctrl.lock);
DSS_RETURN_IF_ERROR(dss_init_session(*session, pipe));
return CM_SUCCESS;
}
void dss_destroy_session_inner(dss_session_t *session)
{
if (session->connected == CM_TRUE) {
cs_disconnect(&session->pipe);
session->connected = CM_FALSE;
}
g_dss_session_ctrl.used_count--;
session->is_closed = CM_TRUE;
session->is_used = CM_FALSE;
errno_t ret = memset_sp(&session->cli_info, sizeof(session->cli_info), 0, sizeof(session->cli_info));
securec_check_panic(ret);
session->client_version = DSS_PROTO_VERSION;
session->proto_version = DSS_PROTO_VERSION;
session->put_log = CM_FALSE;
session->is_holding_hotpatch_latch = CM_FALSE;
if (session->thv_read_buf) {
free(session->thv_read_buf);
session->thv_read_buf = NULL;
}
}
void dss_destroy_session(dss_session_t *session)
{
cm_spin_lock(&g_dss_session_ctrl.lock, NULL);
cm_spin_lock(&session->shm_lock, NULL);
LOG_DEBUG_INF("Succeed to lock session %u shm lock", session->id);
dss_destroy_session_inner(session);
cm_spin_unlock(&session->shm_lock);
LOG_DEBUG_INF("Succeed to unlock session %u shm lock", session->id);
cm_spin_unlock(&g_dss_session_ctrl.lock);
}
dss_session_t *dss_get_session(uint32 sid)
{
if (sid >= g_dss_session_ctrl.alloc_sessions || sid >= g_dss_session_ctrl.total) {
return NULL;
}
return g_dss_session_ctrl.sessions[sid];
}
static bool32 dss_is_timeout(int32 timeout, int32 sleep_times, int32 sleeps)
{
if ((timeout == SPIN_WAIT_FOREVER) || (sleeps == 0)) {
return CM_FALSE;
}
return (bool32)(((timeout * 1000) / (sleeps)) < sleep_times);
}
status_t dss_lock_shm_meta_s_without_stack(
dss_session_t *session, dss_shared_latch_t *shared_latch, bool32 is_force, int32 timeout)
{
cm_panic_log(dss_is_server(), "can not op shared latch without session latch stack in client");
int32 sleep_times = 0;
latch_statis_t *stat = NULL;
uint32 count = 0;
uint32 sid = DSS_SESSIONID_IN_LOCK(session->id);
do {
cm_spin_lock_by_sid(sid, &shared_latch->latch.lock, (stat != NULL) ? &stat->s_spin : NULL);
if (shared_latch->latch.stat == LATCH_STATUS_IDLE) {
shared_latch->latch.stat = LATCH_STATUS_S;
shared_latch->latch.shared_count = 1;
shared_latch->latch.sid = (uint16)sid;
shared_latch->latch_extent.shared_sid_count += sid;
cm_spin_unlock(&shared_latch->latch.lock);
cm_latch_stat_inc(stat, count);
return CM_SUCCESS;
}
if ((shared_latch->latch.stat == LATCH_STATUS_S) || (shared_latch->latch.stat == LATCH_STATUS_IX && is_force)) {
shared_latch->latch.shared_count++;
shared_latch->latch_extent.shared_sid_count += sid;
cm_spin_unlock(&shared_latch->latch.lock);
cm_latch_stat_inc(stat, count);
return CM_SUCCESS;
}
cm_spin_unlock(&shared_latch->latch.lock);
if (stat != NULL) {
stat->misses++;
}
while (shared_latch->latch.stat != LATCH_STATUS_IDLE && shared_latch->latch.stat != LATCH_STATUS_S) {
count++;
if (count < GS_SPIN_COUNT) {
continue;
}
SPIN_STAT_INC(stat, s_sleeps);
cm_usleep(SPIN_SLEEP_TIME);
sleep_times++;
if (dss_is_timeout(timeout, sleep_times, SPIN_SLEEP_TIME)) {
return CM_ERROR;
}
count = 0;
}
} while (1);
return CM_SUCCESS;
}
status_t dss_lock_shm_meta_s_with_stack(
dss_session_t *session, dss_latch_offset_t *offset, dss_shared_latch_t *shared_latch, int32 timeout)
{
cm_panic_log(!(dss_is_server()), "can not op shared latch with session latch stack in server");
DSS_ASSERT_LOG(session != NULL, "session ptr is NULL");
DSS_ASSERT_LOG(session->latch_stack.stack_top < DSS_MAX_LATCH_STACK_DEPTH, "latch_stack overflow");
session->latch_stack.stack_top_bak = session->latch_stack.stack_top;
session->latch_stack.op = LATCH_SHARED_OP_LATCH_S;
session->latch_stack.latch_offset_stack[session->latch_stack.stack_top] = *offset;
int32 sleep_times = 0;
latch_statis_t *stat = NULL;
uint32 count = 0;
uint32 sid = DSS_SESSIONID_IN_LOCK(session->id);
bool32 is_force = CM_FALSE;
do {
cm_spin_lock_by_sid(sid, &shared_latch->latch.lock, (stat != NULL) ? &stat->s_spin : NULL);
dss_set_latch_extent(&shared_latch->latch_extent, shared_latch->latch.stat, shared_latch->latch.shared_count);
if (shared_latch->latch.stat == LATCH_STATUS_IDLE) {
session->latch_stack.op = LATCH_SHARED_OP_LATCH_S_BEG;
shared_latch->latch.stat = LATCH_STATUS_S;
shared_latch->latch.shared_count = 1;
shared_latch->latch.sid = (uint16)sid;
shared_latch->latch_extent.shared_sid_count += sid;
session->latch_stack.stack_top++;
session->latch_stack.op = LATCH_SHARED_OP_LATCH_S_END;
cm_spin_unlock(&shared_latch->latch.lock);
cm_latch_stat_inc(stat, count);
return CM_SUCCESS;
}
if ((shared_latch->latch.stat == LATCH_STATUS_S) || (shared_latch->latch.stat == LATCH_STATUS_IX && is_force)) {
session->latch_stack.op = LATCH_SHARED_OP_LATCH_S_BEG;
shared_latch->latch.shared_count++;
shared_latch->latch_extent.shared_sid_count += sid;
session->latch_stack.stack_top++;
session->latch_stack.op = LATCH_SHARED_OP_LATCH_S_END;
cm_spin_unlock(&shared_latch->latch.lock);
cm_latch_stat_inc(stat, count);
return CM_SUCCESS;
}
cm_spin_unlock(&shared_latch->latch.lock);
if (stat != NULL) {
stat->misses++;
}
while (shared_latch->latch.stat != LATCH_STATUS_IDLE && shared_latch->latch.stat != LATCH_STATUS_S) {
count++;
if (count < GS_SPIN_COUNT) {
continue;
}
SPIN_STAT_INC(stat, s_sleeps);
cm_usleep(SPIN_SLEEP_TIME);
sleep_times++;
if (dss_is_timeout(timeout, sleep_times, SPIN_SLEEP_TIME)) {
if (session != NULL) {
session->latch_stack.latch_offset_stack[session->latch_stack.stack_top].type =
DSS_LATCH_OFFSET_INVALID;
session->latch_stack.op = LATCH_SHARED_OP_NONE;
}
return CM_ERROR;
}
count = 0;
}
} while (1);
return CM_SUCCESS;
}
status_t dss_lock_shm_meta_bucket_s(dss_session_t *session, uint32 id, dss_shared_latch_t *shared_latch)
{
CM_ASSERT(session != NULL);
if (dss_is_server()) {
return dss_lock_shm_meta_s_without_stack(session, shared_latch, CM_FALSE, SPIN_WAIT_FOREVER);
}
dss_latch_offset_t latch_offset;
latch_offset.type = DSS_LATCH_OFFSET_SHMOFFSET;
cm_shm_key_t key = ga_object_key(GA_SEGMENT_POOL, id);
latch_offset.offset.shm_offset = cm_trans_shm_offset(key, &shared_latch->latch);
return dss_lock_shm_meta_s_with_stack(session, &latch_offset, shared_latch, SPIN_WAIT_FOREVER);
}
status_t dss_cli_lock_shm_meta_s(
dss_session_t *session, dss_latch_offset_t *offset, dss_shared_latch_t *shared_latch, latch_should_exit should_exit)
{
for (int i = 0; i < DSS_CLIENT_TIMEOUT_COUNT; i++) {
if (session->is_closed) {
DSS_THROW_ERROR(ERR_DSS_SHM_LOCK, "uds connection is closed.");
LOG_RUN_ERR("[DSS] ABORT INFO: Failed to lock vg share memery because uds connection is closed.");
cm_fync_logfile();
dss_exit(1);
}
if (dss_lock_shm_meta_s_with_stack(session, offset, shared_latch, SPIN_WAIT_FOREVER) == CM_SUCCESS) {
return CM_SUCCESS;
}
if (should_exit && should_exit()) {
LOG_RUN_ERR("Caller want to exit when waiting for shared_latch!!");
return ERR_DSS_LOCK_TIMEOUT;
}
}
LOG_RUN_ERR("The client want to lock meta timeout.");
return ERR_DSS_LOCK_TIMEOUT;
}
void dss_lock_shm_meta_x(const dss_session_t *session, dss_shared_latch_t *shared_latch)
{
CM_ASSERT(session != NULL);
cm_panic_log(dss_is_server(), "can not op x latch in client");
latch_statis_t *stat = NULL;
uint32 count = 0;
uint32 sid = DSS_SESSIONID_IN_LOCK(session->id);
do {
cm_spin_lock_by_sid(sid, &shared_latch->latch.lock, (stat != NULL) ? &stat->x_spin : NULL);
if (shared_latch->latch.stat == LATCH_STATUS_IDLE) {
shared_latch->latch.sid = (uint16)sid;
shared_latch->latch.stat = LATCH_STATUS_X;
cm_spin_unlock(&shared_latch->latch.lock);
cm_latch_stat_inc(stat, count);
return;
}
if (shared_latch->latch.stat == LATCH_STATUS_S) {
shared_latch->latch.stat = LATCH_STATUS_IX;
cm_spin_unlock(&shared_latch->latch.lock);
cm_latch_ix2x(&shared_latch->latch, sid, stat);
return;
}
cm_spin_unlock(&shared_latch->latch.lock);
if (stat != NULL) {
stat->misses++;
}
while (shared_latch->latch.stat != LATCH_STATUS_IDLE && shared_latch->latch.stat != LATCH_STATUS_S) {
count++;
if (count >= GS_SPIN_COUNT) {
SPIN_STAT_INC(stat, x_sleeps);
cm_spin_sleep();
count = 0;
}
}
} while (CM_TRUE);
}
bool32 dss_lock_shm_meta_timed_x(const dss_session_t *session, dss_shared_latch_t *shared_latch, uint32 wait_ticks)
{
CM_ASSERT(session != NULL);
cm_panic_log(dss_is_server(), "can not op x latch in client");
latch_statis_t *stat = NULL;
uint32 count = 0;
uint32 sid = DSS_SESSIONID_IN_LOCK(session->id);
uint32 actual_ticks = 0;
do {
cm_spin_lock_by_sid(sid, &shared_latch->latch.lock, (stat != NULL) ? &stat->x_spin : NULL);
if (shared_latch->latch.stat == LATCH_STATUS_IDLE) {
shared_latch->latch.sid = (uint16)sid;
shared_latch->latch.stat = LATCH_STATUS_X;
cm_spin_unlock(&shared_latch->latch.lock);
cm_latch_stat_inc(stat, count);
return CM_TRUE;
}
if (shared_latch->latch.stat == LATCH_STATUS_S) {
shared_latch->latch.stat = LATCH_STATUS_IX;
cm_spin_unlock(&shared_latch->latch.lock);
if (!cm_latch_timed_ix2x(&shared_latch->latch, sid, wait_ticks, stat)) {
cm_spin_lock_by_sid(sid, &shared_latch->latch.lock, (stat != NULL) ? &stat->x_spin : NULL);
shared_latch->latch.stat = shared_latch->latch.shared_count > 0 ? LATCH_STATUS_S : LATCH_STATUS_IDLE;
cm_spin_unlock(&shared_latch->latch.lock);
return CM_FALSE;
}
return CM_TRUE;
}
cm_spin_unlock(&shared_latch->latch.lock);
if (stat != NULL) {
stat->misses++;
}
while (shared_latch->latch.stat != LATCH_STATUS_IDLE && shared_latch->latch.stat != LATCH_STATUS_S) {
if (actual_ticks >= wait_ticks) {
return CM_FALSE;
}
count++;
if (count >= GS_SPIN_COUNT) {
SPIN_STAT_INC(stat, x_sleeps);
cm_spin_sleep();
count = 0;
actual_ticks++;
}
}
} while (CM_TRUE);
return CM_FALSE;
}
void dss_lock_shm_meta_x2ix(dss_session_t *session, dss_shared_latch_t *shared_latch)
{
CM_ASSERT(session != NULL);
cm_panic_log(dss_is_server(), "can not op x latch in client");
CM_ASSERT(shared_latch->latch.stat == LATCH_STATUS_X);
latch_statis_t *stat = NULL;
uint32 sid = DSS_SESSIONID_IN_LOCK(session->id);
dss_latch_x2ix(&shared_latch->latch, sid, stat);
}
void dss_lock_shm_meta_ix2x(dss_session_t *session, dss_shared_latch_t *shared_latch)
{
CM_ASSERT(session != NULL);
cm_panic_log(dss_is_server(), "can not op x latch in client");
CM_ASSERT(shared_latch->latch.stat == LATCH_STATUS_IX);
latch_statis_t *stat = NULL;
uint32 sid = DSS_SESSIONID_IN_LOCK(session->id);
dss_latch_ix2x(&shared_latch->latch, sid, stat);
}
void dss_lock_shm_meta_degrade(dss_session_t *session, dss_shared_latch_t *shared_latch)
{
CM_ASSERT(session != NULL);
cm_panic_log(dss_is_server(), "can not op x latch degradation in client.");
uint32 sid = (session == NULL) ? DSS_DEFAULT_SESSIONID : DSS_SESSIONID_IN_LOCK(session->id);
cm_panic_log(sid == shared_latch->latch.sid && shared_latch->latch.stat == LATCH_STATUS_X,
"Invalid degradation: sid:%u, sid on latch:%u, latch status:%u.", sid, shared_latch->latch.sid,
shared_latch->latch.stat);
cm_spin_lock_by_sid(sid, &shared_latch->latch.lock, NULL);
shared_latch->latch.stat = LATCH_STATUS_S;
shared_latch->latch.shared_count = 1;
shared_latch->latch_extent.shared_sid_count += sid;
cm_spin_unlock(&shared_latch->latch.lock);
}
void dss_lock_shm_meta_bucket_x(dss_session_t *session, dss_shared_latch_t *shared_latch)
{
CM_ASSERT(session != NULL);
dss_lock_shm_meta_x(session, shared_latch);
}
void dss_unlock_shm_meta_without_stack(dss_session_t *session, dss_shared_latch_t *shared_latch)
{
CM_ASSERT(session != NULL);
cm_panic_log(dss_is_server(), "can not op shared latch without session latch stack in client");
CM_ASSERT(shared_latch->latch.stat != LATCH_STATUS_IDLE);
spin_statis_t *stat_spin = NULL;
uint32 sid = DSS_SESSIONID_IN_LOCK(session->id);
cm_spin_lock_by_sid(sid, &shared_latch->latch.lock, stat_spin);
if (shared_latch->latch.stat == LATCH_STATUS_S || shared_latch->latch.stat == LATCH_STATUS_IX) {
CM_ASSERT(shared_latch->latch.shared_count > 0);
shared_latch->latch.shared_count--;
if (shared_latch->latch.shared_count == 0) {
if (shared_latch->latch.stat == LATCH_STATUS_S) {
shared_latch->latch.stat = LATCH_STATUS_IDLE;
}
shared_latch->latch.sid = 0;
}
shared_latch->latch_extent.shared_sid_count -= sid;
} else if (shared_latch->latch.stat == LATCH_STATUS_X) {
CM_ASSERT(shared_latch->latch.shared_count == 0);
shared_latch->latch.stat = LATCH_STATUS_IDLE;
shared_latch->latch.sid = 0;
}
cm_spin_unlock(&shared_latch->latch.lock);
}
bool32 dss_unlock_shm_meta_s_with_stack(dss_session_t *session, dss_shared_latch_t *shared_latch, bool32 is_try_lock)
{
CM_ASSERT(session != NULL);
CM_ASSERT(shared_latch->latch.stat != LATCH_STATUS_IDLE);
session->latch_stack.stack_top_bak = session->latch_stack.stack_top;
session->latch_stack.op = LATCH_SHARED_OP_UNLATCH;
spin_statis_t *stat_spin = NULL;
uint32 sid = DSS_SESSIONID_IN_LOCK(session->id);
if (!is_try_lock) {
cm_spin_lock_by_sid(sid, &shared_latch->latch.lock, stat_spin);
} else {
bool32 is_locked = cm_spin_try_lock(&shared_latch->latch.lock);
if (!is_locked) {
return CM_FALSE;
}
}
dss_set_latch_extent(&shared_latch->latch_extent, shared_latch->latch.stat, shared_latch->latch.shared_count);
session->latch_stack.op = LATCH_SHARED_OP_UNLATCH_BEG;
CM_ASSERT(shared_latch->latch.shared_count > 0);
shared_latch->latch.shared_count--;
if (shared_latch->latch.shared_count == 0) {
if (shared_latch->latch.stat == LATCH_STATUS_S) {
shared_latch->latch.stat = LATCH_STATUS_IDLE;
}
shared_latch->latch.sid = 0;
}
shared_latch->latch_extent.shared_sid_count -= sid;
cm_spin_unlock(&shared_latch->latch.lock);
CM_ASSERT(session->latch_stack.stack_top);
session->latch_stack.latch_offset_stack[session->latch_stack.stack_top - 1].type = DSS_LATCH_OFFSET_INVALID;
session->latch_stack.stack_top--;
session->latch_stack.op = LATCH_SHARED_OP_UNLATCH_END;
return CM_TRUE;
}
void dss_unlock_shm_meta_bucket(dss_session_t *session, dss_shared_latch_t *shared_latch)
{
CM_ASSERT(session != NULL);
if (dss_is_server()) {
dss_unlock_shm_meta_without_stack(session, shared_latch);
return;
} else {
(void)dss_unlock_shm_meta_s_with_stack(session, shared_latch, CM_FALSE);
}
}
static void dss_clean_latch_s_without_bak(dss_session_t *session, dss_shared_latch_t *shared_latch)
{
LOG_DEBUG_INF("Clean sid:%u latch_stack old stack_top:%u.", DSS_SESSIONID_IN_LOCK(session->id),
session->latch_stack.stack_top);
session->latch_stack.latch_offset_stack[session->latch_stack.stack_top].type = DSS_LATCH_OFFSET_INVALID;
LOG_DEBUG_INF("Clean sid:%u latch_stack new stack_top:%u.", DSS_SESSIONID_IN_LOCK(session->id),
session->latch_stack.stack_top);
}
static void dss_clean_latch_s_with_bak(dss_session_t *session, dss_shared_latch_t *shared_latch)
{
LOG_DEBUG_INF("Clean sid:%u shared_latch old count:%hu, old stat:%u.", DSS_SESSIONID_IN_LOCK(session->id),
shared_latch->latch.shared_count, shared_latch->latch.stat);
shared_latch->latch.shared_count = shared_latch->latch_extent.shared_count_bak;
shared_latch->latch.stat = shared_latch->latch_extent.stat_bak;
shared_latch->latch_extent.shared_sid_count = shared_latch->latch_extent.shared_sid_count_bak;
if (shared_latch->latch.shared_count == 0) {
if (shared_latch->latch.stat == LATCH_STATUS_S) {
shared_latch->latch.stat = LATCH_STATUS_IDLE;
}
shared_latch->latch.sid = 0;
}
LOG_DEBUG_INF("Clean sid:%u latch_stack old stack_top:%u.", DSS_SESSIONID_IN_LOCK(session->id),
session->latch_stack.stack_top);
LOG_DEBUG_INF("Clean sid:%u shared_latch new count:%hu, new stat:%u.", DSS_SESSIONID_IN_LOCK(session->id),
shared_latch->latch.shared_count, shared_latch->latch.stat);
session->latch_stack.stack_top = session->latch_stack.stack_top_bak;
if (session->latch_stack.stack_top > 0) {
session->latch_stack.latch_offset_stack[session->latch_stack.stack_top - 1].type = DSS_LATCH_OFFSET_INVALID;
session->latch_stack.stack_top--;
} else {
session->latch_stack.latch_offset_stack[session->latch_stack.stack_top].type = DSS_LATCH_OFFSET_INVALID;
}
LOG_DEBUG_INF("Clean sid:%u latch_stack new stack_top:%u.", DSS_SESSIONID_IN_LOCK(session->id),
session->latch_stack.stack_top);
}
static void dss_clean_unlatch_without_bak(dss_session_t *session, dss_shared_latch_t *shared_latch)
{
LOG_DEBUG_INF("Clean sid:%u unlatch shared_latch without bak, old count:%hu, old stat:%u.",
DSS_SESSIONID_IN_LOCK(session->id), shared_latch->latch.shared_count, shared_latch->latch.stat);
CM_ASSERT(shared_latch->latch.shared_count > 0);
shared_latch->latch.shared_count--;
shared_latch->latch_extent.shared_sid_count -= DSS_SESSIONID_IN_LOCK(session->id);
if (shared_latch->latch.shared_count == 0) {
if (shared_latch->latch.stat == LATCH_STATUS_S) {
shared_latch->latch.stat = LATCH_STATUS_IDLE;
}
shared_latch->latch.sid = 0;
}
LOG_DEBUG_INF("Clean sid:%u shared_latch new count:%hu, new stat:%u.", DSS_SESSIONID_IN_LOCK(session->id),
shared_latch->latch.shared_count, shared_latch->latch.stat);
LOG_DEBUG_INF("Clean sid:%u latch_stack old stack_top:%u.", DSS_SESSIONID_IN_LOCK(session->id),
session->latch_stack.stack_top);
CM_ASSERT(session->latch_stack.stack_top > 0);
session->latch_stack.latch_offset_stack[session->latch_stack.stack_top - 1].type = DSS_LATCH_OFFSET_INVALID;
session->latch_stack.stack_top--;
LOG_DEBUG_INF("Clean sid:%u latch_stack new stack_top:%u.", DSS_SESSIONID_IN_LOCK(session->id),
session->latch_stack.stack_top);
}
static void dss_clean_unlatch_with_bak(dss_session_t *session, dss_shared_latch_t *shared_latch)
{
LOG_DEBUG_INF("Clean sid:%u unlatch shared_latch with bak, old count:%hu, old stat:%u.",
DSS_SESSIONID_IN_LOCK(session->id), shared_latch->latch.shared_count, shared_latch->latch.stat);
shared_latch->latch.shared_count = shared_latch->latch_extent.shared_count_bak;
shared_latch->latch.stat = shared_latch->latch_extent.stat_bak;
shared_latch->latch_extent.shared_sid_count = shared_latch->latch_extent.shared_sid_count_bak;
CM_ASSERT(shared_latch->latch.shared_count > 0);
shared_latch->latch.shared_count--;
shared_latch->latch_extent.shared_sid_count -= DSS_SESSIONID_IN_LOCK(session->id);
if (shared_latch->latch.shared_count == 0) {
if (shared_latch->latch.stat == LATCH_STATUS_S) {
shared_latch->latch.stat = LATCH_STATUS_IDLE;
}
shared_latch->latch.sid = 0;
}
LOG_DEBUG_INF("Clean sid:%u shared_latch new count:%hu, new stat:%u.", DSS_SESSIONID_IN_LOCK(session->id),
shared_latch->latch.shared_count, shared_latch->latch.stat);
LOG_DEBUG_INF("Clean sid:%u latch_stack old stack_top:%u.", DSS_SESSIONID_IN_LOCK(session->id),
session->latch_stack.stack_top);
session->latch_stack.stack_top = session->latch_stack.stack_top_bak;
CM_ASSERT(session->latch_stack.stack_top > 0);
session->latch_stack.latch_offset_stack[session->latch_stack.stack_top - 1].type = DSS_LATCH_OFFSET_INVALID;
session->latch_stack.stack_top--;
LOG_DEBUG_INF("Clean sid:%u latch_stack new stack_top:%u.", DSS_SESSIONID_IN_LOCK(session->id),
session->latch_stack.stack_top);
}
static void dss_clean_last_op_with_lock(dss_session_t *session, dss_shared_latch_t *shared_latch)
{
CM_ASSERT(DSS_SESSIONID_IN_LOCK(session->id) == shared_latch->latch.lock);
LOG_DEBUG_INF("Clean sid:%u last op with lock latch_stack op:%u, stack_top_bak:%hu.",
DSS_SESSIONID_IN_LOCK(session->id), session->latch_stack.op, session->latch_stack.stack_top_bak);
LOG_DEBUG_INF("Clean sid:%u latch_extent stat_bak:%hu, shared_count_bak:%hu.", DSS_SESSIONID_IN_LOCK(session->id),
shared_latch->latch_extent.stat_bak, shared_latch->latch_extent.shared_count_bak);
if (session->latch_stack.op == LATCH_SHARED_OP_LATCH_S) {
dss_clean_latch_s_without_bak(session, shared_latch);
} else if (session->latch_stack.op == LATCH_SHARED_OP_LATCH_S_BEG ||
session->latch_stack.op == LATCH_SHARED_OP_LATCH_S_END) {
dss_clean_latch_s_with_bak(session, shared_latch);
} else if (session->latch_stack.op == LATCH_SHARED_OP_UNLATCH) {
dss_clean_unlatch_without_bak(session, shared_latch);
} else if (session->latch_stack.op == LATCH_SHARED_OP_UNLATCH_BEG) {
dss_clean_unlatch_with_bak(session, shared_latch);
}
session->latch_stack.op = LATCH_SHARED_OP_UNLATCH_END;
cm_spin_unlock(&shared_latch->latch.lock);
}
static void dss_clean_last_op_without_lock(dss_session_t *session, dss_shared_latch_t *shared_latch)
{
if (session->latch_stack.op == LATCH_SHARED_OP_NONE || session->latch_stack.op == LATCH_SHARED_OP_LATCH_S) {
session->latch_stack.latch_offset_stack[session->latch_stack.stack_top].type = DSS_LATCH_OFFSET_INVALID;
session->latch_stack.op = LATCH_SHARED_OP_UNLATCH_END;
LOG_DEBUG_INF("Clean sid:%u reset to latch_stack op:%u, stack_top:%hu.", DSS_SESSIONID_IN_LOCK(session->id),
session->latch_stack.op, session->latch_stack.stack_top);
} else if (session->latch_stack.op == LATCH_SHARED_OP_UNLATCH_BEG) {
CM_ASSERT(session->latch_stack.stack_top > 0);
session->latch_stack.latch_offset_stack[session->latch_stack.stack_top - 1].type = DSS_LATCH_OFFSET_INVALID;
session->latch_stack.stack_top--;
session->latch_stack.op = LATCH_SHARED_OP_UNLATCH_END;
LOG_DEBUG_INF("Clean sid:%u reset to latch_stack op:%u, stack_top:%hu.", DSS_SESSIONID_IN_LOCK(session->id),
session->latch_stack.op, session->latch_stack.stack_top);
}
}
static bool32 dss_clean_lock_for_shm_meta(dss_session_t *session, dss_shared_latch_t *shared_latch, bool32 is_daemon)
{
LOG_DEBUG_INF("Clean sid:%u latch_stack op:%u, stack_top:%hu.", DSS_SESSIONID_IN_LOCK(session->id),
session->latch_stack.op, session->latch_stack.stack_top);
if (DSS_SESSIONID_IN_LOCK(session->id) == shared_latch->latch.lock) {
dss_clean_last_op_with_lock(session, shared_latch);
} else if (session->latch_stack.op == LATCH_SHARED_OP_NONE || session->latch_stack.op == LATCH_SHARED_OP_LATCH_S ||
session->latch_stack.op == LATCH_SHARED_OP_UNLATCH_BEG) {
dss_clean_last_op_without_lock(session, shared_latch);
} else {
if (is_daemon) {
LOG_DEBUG_INF("Clean sid:%u latch_stack op:%u, stack_top:%hu wait next try.",
DSS_SESSIONID_IN_LOCK(session->id), session->latch_stack.op, session->latch_stack.stack_top);
return dss_unlock_shm_meta_s_with_stack(session, shared_latch, CM_TRUE);
}
(void)dss_unlock_shm_meta_s_with_stack(session, shared_latch, CM_FALSE);
}
return CM_TRUE;
}
static bool32 dss_need_clean_session_latch(dss_session_t *session, uint64 cli_pid, int64 start_time)
{
if (cli_pid == 0 || !session->is_used || !session->connected || cm_sys_process_alived(cli_pid, start_time)) {
return CM_FALSE;
}
return CM_TRUE;
}
void dss_clean_session_latch(dss_session_t *session, bool32 is_daemon)
{
int32 i = 0;
sh_mem_p offset;
int32 latch_place;
dss_latch_offset_type_e offset_type;
dss_shared_latch_t *shared_latch = NULL;
if (!session->is_direct) {
LOG_DEBUG_INF("Clean sid:%u is not direct.", DSS_SESSIONID_IN_LOCK(session->id));
return;
}
uint64 cli_pid = session->cli_info.cli_pid;
int64 start_time = session->cli_info.start_time;
if (is_daemon && !dss_need_clean_session_latch(session, cli_pid, start_time)) {
LOG_RUN_INF("[CLEAN_LATCH]session id %u, pid %llu, start_time %lld, process name:%s need check next time.",
session->id, cli_pid, start_time, session->cli_info.process_name);
return;
}
LOG_RUN_INF("[CLEAN_LATCH]session id %u, pid %llu, start_time %lld, process name:%s in lock.", session->id, cli_pid,
start_time, session->cli_info.process_name);
LOG_DEBUG_INF("Clean sid:%u latch_stack op:%u, stack_top:%hu.", DSS_SESSIONID_IN_LOCK(session->id),
session->latch_stack.op, session->latch_stack.stack_top);
for (i = (int32)session->latch_stack.stack_top; i >= DSS_MAX_LATCH_STACK_BOTTON; i--) {
if (i == DSS_MAX_LATCH_STACK_DEPTH) {
latch_place = i - 1;
} else {
latch_place = i;
}
offset_type = session->latch_stack.latch_offset_stack[latch_place].type;
if (offset_type != DSS_LATCH_OFFSET_SHMOFFSET) {
LOG_DEBUG_ERR("Clean sid:%u shared_latch offset type is invalid %u,latch_place:%d.",
DSS_SESSIONID_IN_LOCK(session->id), session->latch_stack.latch_offset_stack[latch_place].type,
latch_place);
if (session->latch_stack.op == LATCH_SHARED_OP_UNLATCH_BEG && i != (int32)session->latch_stack.stack_top) {
session->latch_stack.stack_top = latch_place;
session->latch_stack.op = LATCH_SHARED_OP_UNLATCH_END;
}
LOG_DEBUG_INF("Clean sid:%u reset to latch_stack op:%u, stack_top:%hu.", DSS_SESSIONID_IN_LOCK(session->id),
session->latch_stack.op, session->latch_stack.stack_top);
continue;
} else {
offset = session->latch_stack.latch_offset_stack[latch_place].offset.shm_offset;
CM_ASSERT(offset != SHM_INVALID_ADDR);
shared_latch = (dss_shared_latch_t *)OFFSET_TO_ADDR(offset);
LOG_DEBUG_INF("Clean sid:%u shared_latch,latch_place:%d, offset:%llu.", DSS_SESSIONID_IN_LOCK(session->id),
latch_place, (uint64)offset);
}
if (is_daemon && shared_latch->latch.lock != 0 &&
DSS_SESSIONID_IN_LOCK(session->id) != shared_latch->latch.lock) {
LOG_DEBUG_INF("Clean sid:%u daemon wait next time to clean.", DSS_SESSIONID_IN_LOCK(session->id));
return;
} else {
bool32 is_clean = dss_clean_lock_for_shm_meta(session, shared_latch, is_daemon);
if (!is_clean) {
LOG_DEBUG_INF("Clean sid:%u daemon wait next time to clean.", DSS_SESSIONID_IN_LOCK(session->id));
return;
}
}
}
session->latch_stack.op = LATCH_SHARED_OP_NONE;
session->latch_stack.stack_top = DSS_MAX_LATCH_STACK_BOTTON;
}
void dss_server_session_lock(dss_session_t *session)
{
cm_spin_lock(&session->lock, NULL);
while (!cm_spin_timed_lock(&session->shm_lock, DSS_SERVER_SESS_TIMEOUT)) {
bool32 alived = cm_sys_process_alived(session->cli_info.cli_pid, session->cli_info.start_time);
if (!alived) {
LOG_DEBUG_INF("Process:%s is not alive, pid:%llu, start_time:%lld.", session->cli_info.process_name,
session->cli_info.cli_pid, session->cli_info.start_time);
cm_spin_unlock(&session->shm_lock);
LOG_DEBUG_INF("Succeed to unlock session %u shm lock", session->id);
continue;
}
LOG_DEBUG_INF("Process:%s is alive, pid:%llu, start_time:%lld.", session->cli_info.process_name,
session->cli_info.cli_pid, session->cli_info.start_time);
cm_sleep(CM_SLEEP_500_FIXED);
}
LOG_DEBUG_INF("Succeed to lock session %u shm lock", session->id);
}
void dss_server_session_unlock(dss_session_t *session)
{
cm_spin_unlock(&session->shm_lock);
LOG_DEBUG_INF("Succeed to unlock session %u shm lock", session->id);
cm_spin_unlock(&session->lock);
LOG_DEBUG_INF("Succeed to unlock session %u lock", session->id);
}
#ifdef __cplusplus
}
#endif