* Copyright (c) 2022 Huawei Technologies Co.,Ltd.
*
* DSS is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* dss_instance.c
*
*
* IDENTIFICATION
* src/service/dss_instance.c
*
* -------------------------------------------------------------------------
*/
#include "dss_ga.h"
#include "dss_shm.h"
#include "cm_timer.h"
#include "cm_error.h"
#include "cm_iofence.h"
#include "cm_stack.h"
#include "dss_errno.h"
#include "dss_defs.h"
#include "dss_api.h"
#include "dss_file.h"
#include "dss_malloc.h"
#include "dss_mes.h"
#include "dss_lsnr.h"
#include "dss_handshake_pool.h"
#include "dss_redo_recovery.h"
#include "dss_service.h"
#include "dss_instance.h"
#include "dss_reactor.h"
#include "dss_service.h"
#include "dss_vtable.h"
#include "dss_zero.h"
#include "cm_utils.h"
#include "dss_thv.h"
#include "dss_hp_interface.h"
#ifdef ENABLE_DSSTEST
#include "dss_simulation_cm.h"
#endif
#include "dss_fault_injection.h"
#include "dss_nodes_list.h"
#include "dss_delete_file.h"
#define DSS_MAINTAIN_ENV "DSS_MAINTAIN"
dss_instance_t g_dss_instance;
char *g_delete_buf = NULL;
static const char *const g_dss_lock_file = "dss.lck";
static void instance_set_pool_def(ga_pool_id_e pool_id, uint32 obj_count, uint32 obj_size, uint32 ex_max)
{
ga_pool_def_t pool_def;
CM_ASSERT(ex_max <= ((uint32)GA_MAX_EXTENDED_POOLS));
pool_def.object_count = obj_count;
pool_def.object_size = obj_size;
pool_def.ex_max = ex_max;
ga_set_pool_def(pool_id, &pool_def);
}
status_t dss_lock_instance(void)
{
char file_name[CM_FILE_NAME_BUFFER_SIZE];
int iret_snprintf;
iret_snprintf = snprintf_s(file_name, CM_FILE_NAME_BUFFER_SIZE, CM_FILE_NAME_BUFFER_SIZE - 1, "%s/%s",
g_dss_instance.inst_cfg.home, g_dss_lock_file);
DSS_SECUREC_SS_RETURN_IF_ERROR(iret_snprintf, CM_ERROR);
if (cm_open_file(file_name, O_CREAT | O_RDWR | O_BINARY, &g_dss_instance.lock_fd) != CM_SUCCESS) {
return CM_ERROR;
}
if (cm_lock_fd(g_dss_instance.lock_fd, SPIN_SLEEP_TIME) != CM_SUCCESS) {
cm_close_file(g_dss_instance.lock_fd);
g_dss_instance.lock_fd = CM_INVALID_INT32;
return CM_ERROR;
}
return CM_SUCCESS;
}
static status_t instance_init_ga(dss_instance_t *inst)
{
int32 ret;
ga_destroy_global_area();
instance_set_pool_def(GA_INSTANCE_POOL, 1, sizeof(dss_share_vg_item_t), DSS_MAX_VOLUME_GROUP_NUM - 1);
instance_set_pool_def(
GA_SESSION_POOL, DSS_SESSION_NUM_PER_GROUP, sizeof(dss_session_t), GA_MAX_SESSION_EXTENDED_POOLS);
instance_set_pool_def(GA_8K_POOL, DSS_MAX_MEM_BLOCK_SIZE / (DSS_BLOCK_SIZE + DSS_BLOCK_CTRL_SIZE),
DSS_BLOCK_SIZE + DSS_BLOCK_CTRL_SIZE, GA_MAX_8K_EXTENDED_POOLS);
instance_set_pool_def(GA_16K_POOL, DSS_MAX_MEM_BLOCK_SIZE / (DSS_FILE_SPACE_BLOCK_SIZE + DSS_BLOCK_CTRL_SIZE),
DSS_FILE_SPACE_BLOCK_SIZE + DSS_BLOCK_CTRL_SIZE, GA_MAX_EXTENDED_POOLS);
instance_set_pool_def(GA_FS_AUX_POOL, DSS_MAX_MEM_BLOCK_SIZE / (DSS_FS_AUX_SIZE + DSS_BLOCK_CTRL_SIZE),
DSS_FS_AUX_SIZE + DSS_BLOCK_CTRL_SIZE, GA_MAX_EXTENDED_POOLS);
instance_set_pool_def(
GA_SEGMENT_POOL, DSS_MAX_VOLUME_GROUP_NUM, DSS_BUCKETS_SIZE_PER_SEGMENT, DSS_MAX_SEGMENT_NUM - 1);
ret = ga_create_global_area();
DSS_RETURN_IF_ERROR(ret);
LOG_RUN_INF("Init GA pool and area successfully.");
return CM_SUCCESS;
}
static status_t dss_init_thread(dss_instance_t *inst)
{
uint32 size = dss_get_udssession_startid();
inst->threads = (thread_t *)cm_malloc(size * (uint32)sizeof(thread_t));
if (inst->threads == NULL) {
return CM_ERROR;
}
errno_t errcode =
memset_s(inst->threads, (size * (uint32)sizeof(thread_t)), 0x00, (size * (uint32)sizeof(thread_t)));
securec_check_ret(errcode);
return CM_SUCCESS;
}
status_t dss_check_vg_ctrl_valid(dss_vg_info_item_t *vg_item)
{
dss_ctrl_t *dss_ctrl = vg_item->dss_ctrl;
if (!DSS_VG_IS_VALID(dss_ctrl)) {
DSS_RETURN_IFERR2(CM_ERROR, DSS_THROW_ERROR(ERR_DSS_VG_CHECK_NOT_INIT));
}
return CM_SUCCESS;
}
status_t dss_recover_from_offset(dss_session_t *session, dss_vg_info_item_t *vg_item)
{
bool8 need_recovery = CM_FALSE;
LOG_RUN_INF("[RECOVERY]Try to load log buf to recover");
if (dss_load_log_buffer_from_offset(vg_item, &need_recovery) != CM_SUCCESS) {
return CM_ERROR;
}
if (need_recovery) {
char *log_buf = vg_item->log_file_ctrl.log_buf;
status_t status = dss_recover_from_offset_inner(session, vg_item, log_buf);
if (status != CM_SUCCESS) {
return CM_ERROR;
}
}
return CM_SUCCESS;
}
status_t dss_recover_from_slot(dss_session_t *session, dss_vg_info_item_t *vg_item)
{
bool8 need_recovery = CM_FALSE;
if (dss_load_log_buffer_from_slot(vg_item, &need_recovery) != CM_SUCCESS) {
return CM_ERROR;
}
if (need_recovery) {
char *log_buf = vg_item->log_file_ctrl.log_buf;
status_t status = dss_recover_from_slot_inner(session, vg_item, log_buf);
if (status != CM_SUCCESS) {
return CM_ERROR;
}
return status;
}
return CM_SUCCESS;
}
status_t dss_recover_from_instance(dss_session_t *session, dss_instance_t *inst)
{
status_t status;
for (uint32 i = 0; i < g_vgs_info->group_num; i++) {
dss_vg_info_item_t *vg_item = &g_vgs_info->volume_group[i];
if (dss_check_vg_ctrl_valid(vg_item) != CM_SUCCESS) {
LOG_RUN_ERR("[RECOVERY]Failed to check valid of vg %s.", vg_item->vg_name);
return CM_ERROR;
}
uint32 software_version = dss_get_software_version(&vg_item->dss_ctrl->vg_info);
if (software_version < DSS_SOFTWARE_VERSION_2) {
status = dss_recover_from_slot(session, vg_item);
if (status != CM_SUCCESS) {
LOG_RUN_ERR("[RECOVERY]Failed to recover from vg %s.", vg_item->vg_name);
return CM_ERROR;
}
} else {
status = dss_load_redo_ctrl(vg_item);
if (status != CM_SUCCESS) {
LOG_RUN_ERR("[RECOVERY]Failed to load redo ctrl of vg %s.", vg_item->vg_name);
return CM_ERROR;
}
status = dss_recover_from_offset(session, vg_item);
if (status != CM_SUCCESS) {
LOG_RUN_ERR("[RECOVERY]Failed to recover from vg %s.", vg_item->vg_name);
return CM_ERROR;
}
}
}
return status;
}
bool32 dss_config_cm()
{
dss_config_t *inst_cfg = dss_get_inst_cfg();
char *value = cm_get_config_value(&inst_cfg->config, "DSS_CM_SO_NAME");
if (value == NULL || strlen(value) == 0 || strlen(value) >= DSS_MAX_NAME_LEN) {
LOG_RUN_INF("dss cm config of DSS_CM_SO_NAME is empty.");
return CM_FALSE;
}
return CM_TRUE;
}
static status_t dss_init_inst_handle_session(dss_instance_t *inst)
{
status_t status = dss_create_session(NULL, &inst->handle_session);
DSS_RETURN_IFERR2(status, LOG_RUN_ERR("DSS instance init create handle session fail!"));
return CM_SUCCESS;
}
static status_t instance_init_core(dss_instance_t *inst)
{
status_t status = dss_get_vg_info();
DSS_RETURN_IFERR2(status, DSS_THROW_ERROR(ERR_DSS_GA_INIT, "DSS instance failed to get vg info."));
status = dss_init_session_pool(dss_get_max_total_session_cnt());
DSS_RETURN_IFERR2(status, DSS_THROW_ERROR(ERR_DSS_GA_INIT, "DSS instance failed to initialize sessions."));
status = dss_init_thread(inst);
DSS_RETURN_IFERR2(status, DSS_THROW_ERROR(ERR_DSS_GA_INIT, "DSS instance failed to initialize thread."));
status = dss_startup_mes();
DSS_RETURN_IFERR2(status, DSS_THROW_ERROR(ERR_DSS_GA_INIT, "DSS instance failed to startup mes"));
status = dss_create_reactors();
DSS_RETURN_IFERR2(status, LOG_RUN_ERR("DSS instance failed to start reactors!"));
status = dss_start_lsnr(inst);
DSS_RETURN_IFERR2(status, LOG_RUN_ERR("DSS instance failed to start lsnr!"));
status = dss_init_inst_handle_session(inst);
DSS_RETURN_IFERR2(status, LOG_RUN_ERR("DSS instance int handle session!"));
return CM_SUCCESS;
}
static void dss_init_maintain(dss_instance_t *inst, dss_srv_args_t dss_args)
{
if (dss_args.is_maintain) {
inst->is_maintain = true;
} else {
char *maintain_env = getenv(DSS_MAINTAIN_ENV);
inst->is_maintain = (maintain_env != NULL && cm_strcmpi(maintain_env, "TRUE") == 0);
}
if (inst->is_maintain) {
LOG_RUN_INF("DSS_MAINTAIN is TRUE");
} else {
LOG_RUN_INF("DSS_MAINTAIN is FALSE");
}
}
static status_t instance_init(dss_instance_t *inst)
{
status_t status = dss_lock_instance();
DSS_RETURN_IFERR2(status, LOG_RUN_ERR("Another dssinstance is running"));
uint32 shm_key =
(uint32)(inst->inst_cfg.params.shm_key << (uint8)DSS_MAX_SHM_KEY_BITS) + (uint32)inst->inst_cfg.params.inst_id;
status = cm_init_shm(shm_key);
DSS_RETURN_IFERR2(status, LOG_RUN_ERR("DSS instance failed to initialize shared memory!"));
status = instance_init_ga(inst);
DSS_RETURN_IFERR4(status, (void)del_shm_by_key(CM_SHM_CTRL_KEY), cm_destroy_shm(),
LOG_RUN_ERR("DSS instance failed to initialize ga!"));
status = instance_init_core(inst);
if (status != CM_SUCCESS) {
(void)del_shm_by_key(CM_SHM_CTRL_KEY);
ga_detach_area();
cm_destroy_shm();
CM_FREE_PTR(g_dss_session_ctrl.sessions);
return CM_ERROR;
}
LOG_RUN_INF("DSS instance begin to run.");
return CM_SUCCESS;
}
static void dss_init_cluster_proto_ver(dss_instance_t *inst)
{
for (uint32 i = 0; i < DSS_MAX_INSTANCES; i++) {
inst->cluster_proto_vers[i] = DSS_INVALID_VERSION;
}
}
dss_instance_status_e dss_get_instance_status(void)
{
return g_dss_instance.status;
}
#if defined(_DEBUG) || defined(DEBUG) || defined(DB_DEBUG_VERSION)
static status_t dss_init_fi_ctx(dss_instance_t *inst)
{
int32 fi_ctx_size = ddes_fi_get_context_size();
if (fi_ctx_size <= 0) {
LOG_RUN_ERR("Failed to get fi context size.");
return CM_ERROR;
}
inst->fi_run_ctx = malloc((uint32)fi_ctx_size);
if (inst->fi_run_ctx == NULL) {
LOG_RUN_ERR("Failed to alloc fi context.");
return CM_ERROR;
}
ddes_fi_set_and_init_context(inst->fi_run_ctx);
return CM_SUCCESS;
}
#endif
static status_t dss_save_process_pid(dss_config_t *inst_cfg)
{
#ifndef WIN32
char file_name[CM_FILE_NAME_BUFFER_SIZE] = {0};
char dir_name[CM_FILE_NAME_BUFFER_SIZE] = {0};
PRTS_RETURN_IFERR(
snprintf_s(dir_name, CM_FILE_NAME_BUFFER_SIZE, CM_FILE_NAME_BUFFER_SIZE - 1, "%s/process", inst_cfg->home));
if (!cm_dir_exist(dir_name)) {
DSS_RETURN_IF_ERROR(cm_create_dir(dir_name));
}
PRTS_RETURN_IFERR(snprintf_s(
file_name, CM_FILE_NAME_BUFFER_SIZE, CM_FILE_NAME_BUFFER_SIZE - 1, "%s/%s", dir_name, "dss.process"));
pid_t pid = getpid();
if (strlen(file_name) == 0) {
LOG_RUN_ERR("dssserver process path not existed");
return CM_ERROR;
}
FILE *fp;
CM_RETURN_IFERR(cm_fopen(file_name, "w+", S_IRUSR | S_IWUSR, &fp));
(void)cm_truncate_file(fp->_fileno, 0);
(void)cm_seek_file(fp->_fileno, 0, SEEK_SET);
int32 size = fprintf(fp, "%d", pid);
(void)fflush(stdout);
if (size < 0) {
LOG_RUN_ERR("write dssserver process failed, write size is %d.", size);
(void)fclose(fp);
return CM_ERROR;
}
(void)fclose(fp);
#endif
return CM_SUCCESS;
}
status_t dss_startup(dss_instance_t *inst, dss_srv_args_t dss_args)
{
status_t status;
errno_t errcode = memset_s(inst, sizeof(dss_instance_t), 0, sizeof(dss_instance_t));
securec_check_ret(errcode);
status = dss_init_zero_buf();
DSS_RETURN_IFERR2(status, DSS_PRINT_RUN_ERROR("Dss init zero buf fail.\n"));
#if defined(_DEBUG) || defined(DEBUG) || defined(DB_DEBUG_VERSION)
status = dss_init_fi_ctx(inst);
DSS_RETURN_IFERR2(status, DSS_PRINT_RUN_ERROR("Dss init fi ctx fail.\n"));
#endif
dss_init_cluster_proto_ver(inst);
inst->lock_fd = CM_INVALID_INT32;
dss_set_server_flag();
regist_get_instance_status_proc(dss_get_instance_status);
status = dss_set_cfg_dir(dss_args.dss_home, &inst->inst_cfg);
DSS_RETURN_IFERR2(status, DSS_PRINT_RUN_ERROR("Environment variant DSS_HOME not found!\n"));
status = cm_start_timer(g_timer());
DSS_RETURN_IFERR2(status, DSS_PRINT_RUN_ERROR("Aborted due to starting timer thread.\n"));
status = dss_load_config(&inst->inst_cfg);
DSS_RETURN_IFERR2(status, DSS_PRINT_RUN_ERROR("Failed to load parameters!\n"));
status = dss_save_process_pid(&inst->inst_cfg);
DSS_RETURN_IFERR2(status, DSS_PRINT_RUN_ERROR("Save dssserver pid failed!\n"));
#ifdef DISABLE_SYN_META
dss_set_syn_meta_enable(CM_FALSE);
#endif
if (inst->inst_cfg.params.disk_type == DISK_VTABLE) {
status = dss_init_vtable();
DSS_RETURN_IFERR2(status, (void)printf("DSS init vtable failed!\n"));
}
dss_init_maintain(inst, dss_args);
LOG_RUN_INF("DSS instance begin to initialize.");
#ifndef OPENGAUSS
if (dss_hp_init(inst->inst_cfg.home) != CM_SUCCESS) {
LOG_RUN_WAR("[HotPatch] hotpatch module failed to be initialized");
cm_reset_error();
}
if (dss_hp_is_inited()) {
if (dss_hp_patched_load() != CM_SUCCESS) {
LOG_RUN_WAR("[HotPatch] Failed to restore hotpatches during startup, please check patch files.");
cm_reset_error();
}
}
#endif
status = instance_init(inst);
DSS_RETURN_IFERR2(status, LOG_RUN_ERR("DSS instance failed to initialized!"));
cm_set_shm_ctrl_flag(CM_SHM_CTRL_FLAG_TRUE);
inst->abort_status = CM_FALSE;
return CM_SUCCESS;
}
static status_t dss_handshake_core(dss_session_t *session)
{
dss_init_packet(&session->recv_pack, CM_FALSE);
dss_init_packet(&session->send_pack, CM_FALSE);
session->pipe.socket_timeout = (int32)CM_NETWORK_IO_TIMEOUT;
status_t status = dss_process_handshake_cmd(session, DSS_CMD_HANDSHAKE);
return status;
}
static status_t dss_handshake(dss_session_t *session)
{
LOG_RUN_INF("[DSS_CONNECT]session %u begin check protocal type.", session->id);
status_t status = dss_diag_proto_type(session);
if (status != CM_SUCCESS) {
LOG_RUN_ERR("[DSS_CONNECT]Failed to get protocol type!");
return CM_ERROR;
}
status = dss_handshake_core(session);
if (status != CM_SUCCESS) {
LOG_RUN_ERR("[DSS_CONNECT]Failed to process get server info!");
return CM_ERROR;
}
return status;
}
static status_t dss_process_accepted_connection(const cs_pipe_t *pipe, bool32 disconnect_on_create_fail,
bool32 link_ready_done)
{
dss_session_t *session = NULL;
status_t status;
status = dss_create_session(pipe, &session);
if (status != CM_SUCCESS) {
LOG_RUN_ERR("[DSS_CONNECT] create session failed, conn_sock=%d", (int)pipe->link.uds.sock);
if (disconnect_on_create_fail) {
cs_disconnect((cs_pipe_t *)pipe);
}
return CM_ERROR;
}
if (link_ready_done) {
session->proto_type = PROTO_TYPE_GS;
status = dss_handshake_core(session);
} else {
status = dss_handshake(session);
}
if (status != CM_SUCCESS) {
LOG_RUN_ERR("[DSS_CONNECT] handshake failed, session=%u, conn_sock=%d, status=%d, err_code=%d", session->id,
(int)pipe->link.uds.sock, status, cm_get_error_code());
dss_destroy_session(session);
return CM_ERROR;
}
status = dss_reactors_add_session(session);
if (status != CM_SUCCESS) {
LOG_RUN_ERR("[DSS_CONNECT]Session:%u socket:%u closed.", session->id, pipe->link.uds.sock);
dss_destroy_session(session);
return CM_ERROR;
}
LOG_RUN_INF("[DSS_CONNECT]The client has connected, session %u.", session->id);
return CM_SUCCESS;
}
static status_t dss_handshake_pool_worker_cb(const cs_pipe_t *pipe)
{
return dss_process_accepted_connection(pipe, CM_TRUE, CM_TRUE);
}
static status_t dss_lsnr_proc(bool32 is_emerg, uds_lsnr_t *lsnr, cs_pipe_t *pipe)
{
status_t status;
(void)lsnr;
LOG_DEBUG_INF("[DSS_CONNECT] server begin handle connection, conn_sock=%d, is_emerg=%u",
(int)pipe->link.uds.sock, (uint32)is_emerg);
if (is_emerg) {
status = dss_handshake_pool_submit(pipe);
if (status != CM_SUCCESS) {
return CM_ERROR;
}
return CM_SUCCESS;
}
return dss_process_accepted_connection(pipe, CM_FALSE, CM_FALSE);
}
status_t dss_start_lsnr(dss_instance_t *inst)
{
errno_t ret;
ret = snprintf_s(
inst->lsnr.names[0], DSS_MAX_PATH_BUFFER_SIZE, DSS_MAX_PATH_BUFFER_SIZE - 1, inst->inst_cfg.params.lsnr_path);
if (ret == -1) {
DSS_THROW_ERROR(ERR_DSS_INVALID_PARAM, "invalid DSS lsnr host");
return CM_ERROR;
}
inst->lsnr.permissions = DSS_USOCKET_PERMSSION;
status_t status = dss_handshake_pool_start(dss_handshake_pool_worker_cb);
if (status != CM_SUCCESS) {
LOG_RUN_ERR("DSS instance failed to start handshake pool!");
return CM_ERROR;
}
status = cs_start_uds_lsnr(&inst->lsnr, dss_lsnr_proc);
if (status != CM_SUCCESS) {
dss_handshake_pool_stop();
return CM_ERROR;
}
return CM_SUCCESS;
}
status_t dss_write_global_version_to_disk(dss_vg_info_item_t *vg_item, uint32 min_version)
{
dss_lock_vg_mem_and_shm_x(g_dss_instance.handle_session, vg_item);
vg_item->dss_ctrl->vg_info.proto_version = min_version;
vg_item->dss_ctrl->vg_info.checksum = dss_get_checksum(vg_item->dss_ctrl->vg_data, DSS_VG_DATA_SIZE);
LOG_RUN_INF("write proto_version %u to disk.", min_version);
status_t status =
dss_write_ctrl_to_disk(vg_item, (int64)DSS_CTRL_VG_DATA_OFFSET, vg_item->dss_ctrl->vg_data, DSS_VG_DATA_SIZE);
if (status != CM_SUCCESS) {
LOG_RUN_ERR("write proto_version %u to disk failed.", min_version);
dss_unlock_vg_mem_and_shm(g_dss_instance.handle_session, vg_item);
return CM_ERROR;
}
dss_unlock_vg_mem_and_shm(g_dss_instance.handle_session, vg_item);
LOG_RUN_INF("write proto_version to disk successful.");
return CM_SUCCESS;
}
status_t dss_init_cm(dss_instance_t *inst)
{
inst->cm_res.is_valid = CM_FALSE;
inst->inst_work_status_map = 0;
dss_config_t *inst_cfg = dss_get_inst_cfg();
char *value = cm_get_config_value(&inst_cfg->config, "DSS_CM_SO_NAME");
if (value == NULL || strlen(value) == 0) {
LOG_RUN_INF("dss cm config of DSS_CM_SO_NAME is empty.");
return CM_SUCCESS;
}
if (strlen(value) >= DSS_MAX_NAME_LEN) {
LOG_RUN_ERR("dss cm config of DSS_CM_SO_NAME is exceeds the max len %u.", DSS_MAX_NAME_LEN - 1);
return CM_ERROR;
}
#ifdef ENABLE_DSSTEST
DSS_RETURN_IF_ERROR(dss_simulation_cm_res_mgr_init(value, &inst->cm_res.mgr, NULL));
#else
#ifdef OPENGAUSS
if (cm_res_mgr_init(value, &inst->cm_res.mgr, NULL) != CM_SUCCESS) {
LOG_RUN_ERR("dss init cm failed, error info:%s, OS errno:%d, OS errmsg:%s.",
cm_get_errormsg(cm_get_error_code()), errno, strerror(errno));
dss_exit(1);
}
#else
DSS_RETURN_IF_ERROR(cm_res_mgr_init(value, &inst->cm_res.mgr, NULL));
#endif
#endif
status_t status =
(status_t)cm_res_init(&inst->cm_res.mgr, (unsigned int)inst->inst_cfg.params.inst_id, DSS_CMS_RES_TYPE, NULL);
#ifdef ENABLE_DSSTEST
DSS_RETURN_IFERR2(status, dss_simulation_cm_res_mgr_uninit(&inst->cm_res.mgr));
#else
DSS_RETURN_IFERR2(status, cm_res_mgr_uninit(&inst->cm_res.mgr));
#endif
inst->cm_res.is_valid = CM_TRUE;
return CM_SUCCESS;
}
void dss_uninit_cm(dss_instance_t *inst)
{
if (inst->cm_res.is_valid) {
#ifdef ENABLE_DSSTEST
dss_simulation_cm_res_mgr_uninit(&inst->cm_res.mgr);
#else
cm_res_mgr_uninit(&inst->cm_res.mgr);
#endif
inst->cm_res.is_valid = CM_FALSE;
}
}
void dss_free_log_ctrl()
{
if (g_vgs_info == NULL) {
return;
}
for (uint32 i = 0; i < g_vgs_info->group_num; i++) {
dss_vg_info_item_t *vg_item = &g_vgs_info->volume_group[i];
if (vg_item != NULL && vg_item->log_file_ctrl.log_buf != NULL) {
DSS_FREE_POINT(vg_item->log_file_ctrl.log_buf);
}
}
}
void dss_check_peer_by_inst(dss_instance_t *inst, uint64 inst_id)
{
dss_config_t *inst_cfg = dss_get_inst_cfg();
if (inst_id == (uint64)inst_cfg->params.inst_id) {
return;
}
uint64 inst_mask = ((uint64)0x1 << inst_id);
if ((inst_cfg->params.nodes_list.inst_map & inst_mask) == 0) {
return;
}
uint64 cur_inst_map = dss_get_inst_work_status();
if ((cur_inst_map & inst_mask) != 0) {
return;
}
dss_check_peer_inst(inst, inst_id);
}
static void dss_check_peer_by_cm(dss_instance_t *inst)
{
cm_res_mem_ctx_t res_mem_ctx;
if (cm_res_init_memctx(&res_mem_ctx) != CM_SUCCESS) {
return;
}
cm_res_stat_ptr_t res = cm_res_get_stat(&inst->cm_res.mgr, &res_mem_ctx);
if (res == NULL) {
cm_res_uninit_memctx(&res_mem_ctx);
return;
}
dss_config_t *inst_cfg = dss_get_inst_cfg();
uint64 cur_inst_map = 0;
uint32 instance_count = 0;
if (cm_res_get_instance_count(&instance_count, &inst->cm_res.mgr, res) != CM_SUCCESS) {
cm_res_free_stat(&inst->cm_res.mgr, res);
cm_res_uninit_memctx(&res_mem_ctx);
return;
}
for (uint32_t idx = 0; idx < instance_count; idx++) {
const cm_res_inst_info_ptr_t inst_res = cm_res_get_instance_info(&inst->cm_res.mgr, res, idx);
if (inst_res == NULL) {
cm_res_free_stat(&inst->cm_res.mgr, res);
cm_res_uninit_memctx(&res_mem_ctx);
return;
}
int res_instance_id = cm_res_get_inst_instance_id(&inst->cm_res.mgr, inst_res);
int is_work_member = cm_res_get_inst_is_work_member(&inst->cm_res.mgr, inst_res);
if (is_work_member == 0) {
LOG_RUN_INF("dss instance [%d] is not work member. May be kicked off by cm.", res_instance_id);
continue;
}
uint64_t inst_mask = ((uint64)0x1 << res_instance_id);
if ((inst_cfg->params.nodes_list.inst_map & inst_mask) == 0) {
LOG_RUN_INF("dss instance [%d] is not in mes nodes cfg lists.", res_instance_id);
continue;
}
int stat = cm_res_get_inst_stat(&inst->cm_res.mgr, inst_res);
if (stat != CM_RES_STATUS_ONLINE) {
LOG_RUN_INF("dss instance [%d] work stat [%d] not online.", res_instance_id, stat);
}
cur_inst_map |= ((uint64)0x1 << res_instance_id);
}
dss_check_mes_conn(cur_inst_map);
cm_res_free_stat(&inst->cm_res.mgr, res);
cm_res_uninit_memctx(&res_mem_ctx);
}
#ifdef ENABLE_DSSTEST
static void dss_check_peer_by_simulation_cm(dss_instance_t *inst)
{
if (g_simulation_cm.simulation) {
char *bitmap_online = inst->cm_res.mgr.cm_get_res_stat();
uint64 cur_inst_map = 0;
(void)cm_str2bigint(bitmap_online, (int64 *)&cur_inst_map);
dss_check_mes_conn(cur_inst_map);
return;
}
dss_check_peer_by_cm(inst);
return;
}
#endif
static void dss_check_peer_default()
{
dss_check_mes_conn(DSS_INVALID_ID64);
}
void dss_init_cm_res(dss_instance_t *inst)
{
dss_cm_res *cm_res = &inst->cm_res;
cm_spin_lock(&cm_res->init_lock, NULL);
if (cm_res->is_init) {
cm_spin_unlock(&cm_res->init_lock);
return;
}
status_t status = dss_init_cm(inst);
if (status == CM_SUCCESS) {
cm_res->is_init = CM_TRUE;
}
cm_spin_unlock(&cm_res->init_lock);
return;
}
#ifdef ENABLE_DSSTEST
status_t dss_get_cm_res_lock_owner(dss_cm_res *cm_res, uint32 *master_id)
{
if (g_simulation_cm.simulation) {
int ret = cm_res_get_lock_owner(&cm_res->mgr, DSS_CM_LOCK, master_id);
if (ret != CM_SUCCESS) {
return ret;
}
} else {
dss_config_t *inst_cfg = dss_get_inst_cfg();
for (int i = 0; i < DSS_MAX_INSTANCES; i++) {
if (inst_cfg->params.nodes_list.ports[i] != 0) {
*master_id = i;
LOG_RUN_INF_INHIBIT(LOG_INHIBIT_LEVEL5, "Set min id %u as master id.", i);
break;
}
}
LOG_RUN_INF_INHIBIT(LOG_INHIBIT_LEVEL5, "master_id is %u when get cm lock.", *master_id);
}
return CM_SUCCESS;
}
#else
status_t dss_get_cm_res_lock_owner(dss_cm_res *cm_res, uint32 *master_id)
{
int ret = cm_res_get_lock_owner(&cm_res->mgr, DSS_CM_LOCK, master_id);
if (ret == CM_RES_TIMEOUT) {
LOG_RUN_ERR("Try to get lock owner failed, cm error : %d.", ret);
return CM_ERROR;
} else if (ret == CM_RES_SUCCESS) {
return CM_SUCCESS;
} else {
*master_id = CM_INVALID_ID32;
LOG_RUN_ERR("Try to get lock owner failed, cm error : %d.", ret);
}
return CM_SUCCESS;
}
#endif
status_t dss_get_cm_lock_owner(dss_instance_t *inst, bool32 *grab_lock, bool32 try_lock, uint32 *master_id)
{
dss_config_t *inst_cfg = dss_get_inst_cfg();
*master_id = DSS_INVALID_ID32;
if (inst->is_maintain || inst->inst_cfg.params.nodes_list.inst_cnt <= 1) {
*grab_lock = CM_TRUE;
LOG_RUN_INF_INHIBIT(LOG_INHIBIT_LEVEL5,
"[RECOVERY]Set curr_id %u to be primary when dssserver is maintain or just one inst.",
(uint32)inst_cfg->params.inst_id);
*master_id = (uint32)inst_cfg->params.inst_id;
return CM_SUCCESS;
}
dss_cm_res *cm_res = &inst->cm_res;
if (!cm_res->is_init) {
return CM_SUCCESS;
}
status_t ret = CM_SUCCESS;
ret = dss_get_cm_res_lock_owner(cm_res, master_id);
DSS_RETURN_IFERR2(ret, LOG_RUN_WAR("Failed to get cm lock owner, if DSS is normal open ignore the log."));
if (*master_id == DSS_INVALID_ID32) {
if (!try_lock) {
return CM_ERROR;
}
if (inst->no_grab_lock) {
LOG_RUN_INF_INHIBIT(LOG_INHIBIT_LEVEL5, "[RECOVERY]No need to grab lock when inst %u is set no grab lock.",
(uint32)inst_cfg->params.inst_id);
dss_set_master_id(DSS_INVALID_ID32);
return CM_ERROR;
}
ret = cm_res_lock(&cm_res->mgr, DSS_CM_LOCK);
*grab_lock = ((int)ret == CM_RES_SUCCESS);
if (*grab_lock) {
*master_id = (uint32)inst->inst_cfg.params.inst_id;
LOG_RUN_INF("[RECOVERY]inst id %u succeed to get lock owner.", *master_id);
return CM_SUCCESS;
}
return CM_ERROR;
}
return CM_SUCCESS;
}
void dss_recovery_when_primary(dss_session_t *session, dss_instance_t *inst, uint32 curr_id, bool32 grab_lock)
{
bool32 first_start = CM_FALSE;
if (!grab_lock) {
first_start = (inst->status == DSS_STATUS_PREPARE);
}
if (first_start) {
LOG_RUN_INF("[RECOVERY]inst %u is old main inst to do recovery.", curr_id);
} else {
LOG_RUN_INF("[RECOVERY]master_id is %u when get cm lock to do recovery.", curr_id);
}
dss_instance_status_e old_status = inst->status;
inst->status = DSS_STATUS_RECOVERY;
CM_MFENCE;
if (old_status == DSS_STATUS_PREPARE) {
status_t status = dss_load_vg_info_and_recover(CM_TRUE);
if (status != CM_SUCCESS) {
LOG_RUN_ERR("[RECOVERY]ABORT INFO: Failed to get vg info when instance start.");
cm_fync_logfile();
dss_exit(1);
}
}
if (old_status == DSS_STATUS_OPEN && !first_start) {
dss_wait_session_pause(inst);
}
dss_wait_background_pause(inst);
status_t ret = dss_recover_from_instance(session, inst);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("[RECOVERY]ABORT INFO: Recover failed when get cm lock.");
cm_fync_logfile();
dss_exit(1);
}
if (!first_start) {
if (dss_refresh_meta_info(session) != CM_SUCCESS) {
LOG_RUN_ERR("[RECOVERY]ABORT INFO: Refresh meta info failed after recovery.");
cm_fync_logfile();
dss_exit(1);
}
dss_set_session_running(inst, session->id);
}
dss_set_master_id(curr_id);
dss_set_server_status_flag(DSS_STATUS_READWRITE);
LOG_RUN_INF("[RECOVERY]inst %u set status flag %u when get cm lock.", curr_id, DSS_STATUS_READWRITE);
g_dss_instance.is_join_cluster = CM_TRUE;
inst->status = DSS_STATUS_OPEN;
}
void dss_recovery_when_standby(dss_session_t *session, dss_instance_t *inst, uint32 curr_id, uint32 master_id)
{
uint32 old_master_id = dss_get_master_id();
int32 old_status = dss_get_server_status_flag();
if (old_master_id != master_id) {
dss_set_master_id(master_id);
dss_set_server_status_flag(DSS_STATUS_READONLY);
LOG_RUN_INF("[RECOVERY]inst %u set status flag %u when not get cm lock.", curr_id, DSS_STATUS_READONLY);
}
if (!dss_check_join_cluster()) {
dss_set_master_id(old_master_id);
dss_set_server_status_flag(old_status);
LOG_RUN_INF("[RECOVERY]inst %u reset status flag %d and master_id %u when join failed.", curr_id, old_status,
old_master_id);
return;
}
if (inst->status == DSS_STATUS_PREPARE) {
status_t status = dss_load_vg_info_and_recover(CM_FALSE);
if (status != CM_SUCCESS) {
dss_set_master_id(old_master_id);
dss_set_server_status_flag(old_status);
g_dss_instance.is_join_cluster = CM_FALSE;
LOG_RUN_INF("[RECOVERY]inst %u reset status flag %d and master_id %u and join cluster when load failed.",
curr_id, old_status, old_master_id);
LOG_RUN_ERR("[RECOVERY]Just try again to load dss ctrl.");
return;
}
}
inst->status = DSS_STATUS_OPEN;
}
1、old_master_id == master_id, just return;
2、old_master_id != master_id, just indicates that the master has been reselected.so to judge whether recover.
*/
void dss_get_cm_lock_and_recover_inner(dss_session_t *session, dss_instance_t *inst)
{
cm_latch_x(&g_dss_instance.switch_latch, session->id, LATCH_STAT(LATCH_SWITCH));
uint32 old_master_id = dss_get_master_id();
bool32 grab_lock = CM_FALSE;
uint32 master_id = DSS_INVALID_ID32;
status_t status = dss_get_cm_lock_owner(inst, &grab_lock, CM_TRUE, &master_id);
if (status != CM_SUCCESS) {
cm_unlatch(&g_dss_instance.switch_latch, LATCH_STAT(LATCH_SWITCH));
return;
}
if (master_id == DSS_INVALID_ID32) {
LOG_RUN_WAR("[RECOVERY]cm is not init, just try again.");
cm_unlatch(&g_dss_instance.switch_latch, LATCH_STAT(LATCH_SWITCH));
return;
}
dss_config_t *inst_cfg = dss_get_inst_cfg();
uint32 curr_id = (uint32)inst_cfg->params.inst_id;
if (old_master_id == master_id) {
if (master_id == curr_id) {
status = dss_delay_clean_background_task(&g_dss_instance);
cm_unlatch(&g_dss_instance.switch_latch, LATCH_STAT(LATCH_SWITCH));
return;
}
if (inst->is_join_cluster) {
cm_unlatch(&g_dss_instance.switch_latch, LATCH_STAT(LATCH_SWITCH));
return;
}
}
if (master_id != curr_id) {
dss_recovery_when_standby(session, inst, curr_id, master_id);
cm_unlatch(&g_dss_instance.switch_latch, LATCH_STAT(LATCH_SWITCH));
return;
}
dss_set_recover_thread_id(dss_get_current_thread_id());
dss_recovery_when_primary(session, inst, curr_id, grab_lock);
dss_set_recover_thread_id(0);
cm_unlatch(&g_dss_instance.switch_latch, LATCH_STAT(LATCH_SWITCH));
}
#define DSS_RECOVER_INTERVAL 500
#define DSS_SHORT_RECOVER_INTERVAL 100
void dss_get_cm_lock_and_recover(thread_t *thread)
{
cm_set_thread_name("recovery");
uint32 work_idx = dss_get_recover_task_idx();
dss_session_t *session = dss_get_reserv_session(work_idx);
dss_instance_t *inst = (dss_instance_t *)thread->argument;
while (!thread->closed) {
dss_get_cm_lock_and_recover_inner(session, inst);
if (inst->status == DSS_STATUS_PREPARE) {
LOG_RUN_WAR("[RECOVERY]Try to sleep when in prepare status.\n");
cm_sleep(DSS_SHORT_RECOVER_INTERVAL);
} else {
cm_sleep(DSS_RECOVER_INTERVAL);
}
}
}
void dss_init_delete_buf()
{
if (g_delete_buf == NULL) {
uint32 search_stack_size = dss_get_search_stack_size();
uint32 delete_queue_size = sizeof(dss_delete_queue_t);
g_delete_buf = cm_malloc(search_stack_size + delete_queue_size);
DSS_ASSERT_LOG(g_delete_buf != NULL, "Failed to malloc g_delete_buf");
}
}
void dss_delay_clean_proc(thread_t *thread)
{
cm_set_thread_name("delay_clean");
uint32 work_idx = dss_get_delay_clean_task_idx();
dss_session_ctrl_t *session_ctrl = dss_get_session_ctrl();
dss_session_t *session = session_ctrl->sessions[work_idx];
LOG_RUN_INF("Session[id=%u] is available for delay clean task.", session->id);
dss_config_t *inst_cfg = dss_get_inst_cfg();
uint32 sleep_times = 0;
dss_init_delete_buf();
char *serach_stack = g_delete_buf;
char *delete_queue = (char *)(g_delete_buf + dss_get_search_stack_size());
cm_stack_init((cm_stack_t *)serach_stack, g_delete_buf + sizeof(cm_stack_t),
CM_ALIGN8(sizeof(dss_search_node_t) * DSS_MAX_DELETE_DEPTH) + GS_PUSH_RESERVE_SIZE);
dss_init_delete_queue((dss_delete_queue_t *)delete_queue);
while (!thread->closed) {
if (sleep_times < inst_cfg->params.delay_clean_interval) {
cm_sleep(CM_SLEEP_1000_FIXED);
sleep_times++;
continue;
}
g_dss_instance.is_cleaning = CM_TRUE;
if (dss_need_exec_local() && dss_is_readwrite() && (g_dss_instance.status == DSS_STATUS_OPEN)) {
dss_delay_clean_all_vg(session, (cm_stack_t *)serach_stack, (dss_delete_queue_t *)delete_queue);
}
g_dss_instance.is_cleaning = CM_FALSE;
sleep_times = 0;
}
}
status_t dss_delay_clean_background_task(dss_instance_t *inst)
{
status_t status = CM_SUCCESS;
uint32 delay_clean_idx = dss_get_delay_clean_task_idx();
if (inst->threads[delay_clean_idx].id == 0) {
status = cm_create_thread(dss_delay_clean_proc, 0, &g_dss_instance, &(g_dss_instance.threads[delay_clean_idx]));
if (status == CM_SUCCESS) {
LOG_RUN_INF("Succeed to create dss delay clean background task.");
} else {
LOG_RUN_ERR_INHIBIT(LOG_INHIBIT_LEVEL4, "Failed to create dss delay clean background task.");
}
}
return status;
}
void dss_close_delay_clean_background_task(dss_instance_t *inst)
{
uint32 delay_clean_idx = dss_get_delay_clean_task_idx();
if (inst->threads[delay_clean_idx].id != 0) {
cm_close_thread(&inst->threads[delay_clean_idx]);
LOG_RUN_INF("close dss delay clean background task.");
}
}
void dss_alarm_check_proc(thread_t *thread)
{
cm_set_thread_name("alarm_check");
uint32 sleep_times = 0;
uint32 work_idx = dss_get_alarm_check_task_idx();
dss_session_ctrl_t *session_ctrl = dss_get_session_ctrl();
dss_session_t *session = session_ctrl->sessions[work_idx];
uint32 alarm_counts = DSS_VG_ALARM_CHECK_COUNT;
while (!thread->closed) {
if (sleep_times % DSS_VG_ALARM_CHECK_COUNT == 0) {
g_dss_instance.is_checking = CM_TRUE;
dss_alarm_check_vg_usage(session);
g_dss_instance.is_checking = CM_FALSE;
}
cm_sleep(CM_SLEEP_500_FIXED);
sleep_times++;
sleep_times = sleep_times % alarm_counts;
}
}
void dss_hashmap_dynamic_extend_and_redistribute_proc(thread_t *thread)
{
cm_set_thread_name("hashmap_dynamic_extend_and_redistribute");
uint32 work_idx = dss_get_hashmap_dynamic_extend_task_idx();
dss_session_ctrl_t *session_ctrl = dss_get_session_ctrl();
dss_session_t *session = session_ctrl->sessions[work_idx];
while (!thread->closed) {
for (uint32_t i = 0; i < g_vgs_info->group_num; i++) {
dss_vg_info_item_t *vg_item = &g_vgs_info->volume_group[i];
dss_hashmap_dynamic_extend_and_redistribute_per_vg(vg_item, session);
}
cm_sleep(CM_SLEEP_500_FIXED);
}
}
static void dss_check_peer_inst_inner(dss_instance_t *inst)
{
* During installation initialization, db_init depends on the DSS server. However, the CMS is not started.
* Therefore, cm_init cannot be invoked during the DSS server startup.
* Here, cm_init is invoked before the CM interface is invoked at first time.
*/
if (SECUREC_UNLIKELY(!inst->cm_res.is_init)) {
dss_init_cm_res(inst);
}
if (inst->cm_res.is_valid) {
#ifdef ENABLE_DSSTEST
dss_check_peer_by_simulation_cm(inst);
#else
dss_check_peer_by_cm(inst);
#endif
return;
}
dss_check_peer_default();
}
void dss_check_peer_inst(dss_instance_t *inst, uint64 inst_id)
{
dss_config_t *inst_cfg = dss_get_inst_cfg();
if (inst_cfg->params.nodes_list.inst_cnt <= 1) {
return;
}
uint64 inst_mask = ((uint64)0x1 << inst_id);
cm_spin_lock(&inst->inst_work_lock, NULL);
uint64 cur_inst_map = dss_get_inst_work_status();
if (inst_id != DSS_INVALID_ID64 && (cur_inst_map & inst_mask) != 0) {
cm_spin_unlock(&inst->inst_work_lock);
return;
}
dss_check_peer_inst_inner(inst);
cm_spin_unlock(&inst->inst_work_lock);
}
uint64 dss_get_inst_work_status(void)
{
return (uint64)cm_atomic_get((atomic_t *)&g_dss_instance.inst_work_status_map);
}
void dss_set_inst_work_status(uint64 cur_inst_map)
{
(void)cm_atomic_set((atomic_t *)&g_dss_instance.inst_work_status_map, (int64)cur_inst_map);
}
bool32 dss_check_join_cluster()
{
if (g_dss_instance.is_join_cluster) {
return CM_TRUE;
}
if (dss_get_master_id() == g_dss_instance.inst_cfg.params.inst_id) {
g_dss_instance.is_join_cluster = CM_TRUE;
LOG_RUN_INF("Join cluster success by primary.");
} else {
bool32 join_succ = CM_FALSE;
status_t status = dss_join_cluster(&join_succ);
if (status != CM_SUCCESS) {
LOG_RUN_ERR("Join cluster fail, wait next try.");
cm_reset_error();
return CM_FALSE;
}
LOG_DEBUG_INF("Join cluster result [%u].", (uint32)join_succ);
if (!join_succ) {
return CM_FALSE;
}
g_dss_instance.is_join_cluster = CM_TRUE;
LOG_RUN_INF("Join cluster success by standby.");
}
return CM_TRUE;
}
static bool32 dss_find_unreg_volume(dss_session_t *session, char **dev, uint8 *vg_idx, uint8 *volume_id)
{
for (uint32 i = 0; i < g_vgs_info->group_num; i++) {
for (uint32 j = 0; j < DSS_MAX_VOLUMES; j++) {
if (g_vgs_info->volume_group[i].dss_ctrl->volume.defs[j].flag != VOLUME_PREPARE) {
continue;
}
dss_lock_vg_mem_and_shm_s(session, &g_vgs_info->volume_group[i]);
if (g_vgs_info->volume_group[i].dss_ctrl->volume.defs[j].flag != VOLUME_PREPARE) {
dss_unlock_vg_mem_and_shm(session, &g_vgs_info->volume_group[i]);
continue;
}
*dev = g_vgs_info->volume_group[i].dss_ctrl->volume.defs[j].name;
*vg_idx = (uint8)i;
*volume_id = (uint8)j;
dss_unlock_vg_mem_and_shm(session, &g_vgs_info->volume_group[i]);
return CM_TRUE;
}
}
return CM_FALSE;
}
static bool32 dss_is_register(iof_reg_in_t *reg_info, int64 host_id)
{
for (int32 i = 0; i < reg_info->key_count; i++) {
if (reg_info->reg_keys[i] == host_id + 1) {
return CM_TRUE;
}
}
return CM_FALSE;
}
void dss_check_unreg_volume(dss_session_t *session)
{
uint8 vg_idx, volume_id;
iof_reg_in_t reg_info;
(void)memset_s(®_info, sizeof(reg_info), 0, sizeof(reg_info));
bool32 is_unreg = dss_find_unreg_volume(session, ®_info.dev, &vg_idx, &volume_id);
if (!is_unreg) {
return;
}
status_t ret = cm_iof_inql(®_info);
if (ret != CM_SUCCESS) {
return;
}
bool32 remote = CM_FALSE;
dss_vg_info_item_t *vg_item = &g_vgs_info->volume_group[0];
if (dss_lock_vg_storage_r(vg_item, vg_item->entry_path, g_inst_cfg) != CM_SUCCESS) {
return;
}
dss_lock_vg_mem_and_shm_s(session, vg_item);
ret = dss_load_vg_ctrl_part(vg_item, (int64)(DSS_VOLUME_HEAD_SIZE - DSS_DISK_UNIT_SIZE),
&vg_item->dss_ctrl->global_ctrl, DSS_DISK_UNIT_SIZE, &remote);
dss_unlock_vg_mem_and_shm(session, vg_item);
(void)dss_unlock_vg_storage(vg_item, vg_item->entry_path, g_inst_cfg);
if (ret != CM_SUCCESS) {
return;
}
bool32 is_reg = CM_FALSE;
for (uint8 i = 0; i < DSS_MAX_INSTANCES; i++) {
is_reg = cm_bitmap64_exist(&vg_item->dss_ctrl->global_ctrl.cluster_node_info, i);
if (is_reg && !dss_is_register(®_info, i)) {
return;
}
}
vg_item = &g_vgs_info->volume_group[vg_idx];
dss_lock_vg_mem_and_shm_x(session, vg_item);
if (vg_item->dss_ctrl->volume.defs[volume_id].flag == VOLUME_FREE) {
dss_unlock_vg_mem_and_shm(session, vg_item);
return;
}
vg_item->dss_ctrl->volume.defs[volume_id].flag = VOLUME_OCCUPY;
ret = dss_update_volume_ctrl(vg_item);
dss_unlock_vg_mem_and_shm(session, vg_item);
if (ret != CM_SUCCESS) {
return;
}
}
void dss_meta_syn_proc(thread_t *thread)
{
cm_set_thread_name("meta_syn");
dss_bg_task_info_t *bg_task_info = (dss_bg_task_info_t *)(thread->argument);
uint32 work_idx = dss_get_meta_syn_task_idx(bg_task_info->my_task_id);
dss_session_ctrl_t *session_ctrl = dss_get_session_ctrl();
dss_session_t *session = session_ctrl->sessions[work_idx];
while (!thread->closed) {
if (g_dss_instance.status == DSS_STATUS_OPEN) {
(void)dss_meta_syn(session, bg_task_info);
}
cm_sleep(CM_SLEEP_10_FIXED);
}
}
void dss_recycle_meta_proc(thread_t *thread)
{
cm_set_thread_name("recycle_meta");
dss_bg_task_info_t *bg_task_info = (dss_bg_task_info_t *)(thread->argument);
dss_set_recycle_meta_args_to_vg(bg_task_info);
uint32 work_idx = dss_get_recycle_meta_task_idx(bg_task_info->my_task_id);
dss_session_ctrl_t *session_ctrl = dss_get_session_ctrl();
dss_session_t *session = session_ctrl->sessions[work_idx];
date_t clean_time = cm_now();
while (!thread->closed) {
if (g_dss_instance.status == DSS_STATUS_OPEN) {
dss_recycle_meta(session, bg_task_info, &clean_time);
cm_sleep(CM_SLEEP_10_FIXED);
continue;
}
cm_sleep(CM_SLEEP_10_FIXED);
}
}