* Copyright (c) 2022 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* srv_api.c
* server API
*
* IDENTIFICATION
* src/server/srv_api.c
*
* -------------------------------------------------------------------------
*/
#include "dcc_interface.h"
#include "cm_timer.h"
#include "cm_error.h"
#include "cm_log.h"
#include "cm_latch.h"
#include "srv_logger.h"
#include "util_error.h"
#include "srv_param.h"
#include "executor.h"
#include "storage.h"
#include "dcc_msg_cmd.h"
#include "srv_session.h"
#include "srv_instance.h"
#include "cm_profile_stat.h"
#include "dcc_cmd_parse.h"
#include "srv_cmd_exe.h"
#include "dcf_interface.h"
#include "executor_utils.h"
#ifdef __cplusplus
extern "C" {
#endif
static latch_t g_dcc_latch = {0};
#define SRV_WAIT_COMMIT_TIMEOUT_DEFAULT (5000)
#define SRV_WAIT_COMMIT_EVENT_TIMEOUT (50)
#define DCC_SPLIT_STRING " "
#define DCC_ENCLOSE_CHAR 0
#define DCC_CMD_PARAMETER_CNT 16
#define DCC_TRY_BLOCK_CNT 20
typedef enum en_server_status {
DCC_SRV_UNINIT = 0,
DCC_SRV_RUNNING,
DCC_SRV_STOP,
} srv_status_t;
static srv_status_t g_srv_status = DCC_SRV_UNINIT;
typedef struct st_srv_dcc_block {
volatile bool8 srv_blocked;
cm_event_t srv_block_event;
uint32 srv_wait_timeout_ms;
} srv_dcc_block_t;
static srv_dcc_block_t g_srv_dcc_block;
g_DCC_LIB_VERSION;
EXPORT_API const char *GETLIBVERSION()
{
#ifdef WIN32
return NULL;
#else
return str_DCC_LIB_VERSION;
#endif
}
#ifdef WIN32
const char *dcc_get_version(void)
{
return "NONE";
}
#else
#endif
static srv_status_t srv_get_status(void)
{
return g_srv_status;
}
static void srv_set_status(srv_status_t new_status)
{
g_srv_status = new_status;
}
#define CHECK_SRV_STATUS(exp_status) \
do { \
if (srv_get_status() != (exp_status)) { \
CM_THROW_ERROR(ERR_SERVER_STOPPED, ""); \
return CM_ERROR; \
} \
} while (0)
static inline status_t srv_check_handle_key(const void *handle, const dcc_text_t *key)
{
CM_CHECK_NULL_PTR(handle);
CM_CHECK_NULL_PTR(key);
if (key->value == NULL || key->len == 0 || key->len > SRV_MAX_KEY_SIZE) {
CM_THROW_ERROR(ERR_INVALID_PARAMETER_VALUE, "");
return CM_ERROR;
}
return CM_SUCCESS;
}
static inline status_t srv_check_val(const dcc_text_t *val)
{
CM_CHECK_NULL_PTR(val);
if (val->len > SRV_MAX_VAL_SIZE) {
CM_THROW_ERROR(ERR_INVALID_PARAMETER_VALUE, "");
return CM_ERROR;
}
return CM_SUCCESS;
}
static status_t srv_wait_commit(session_t *sess, uint32 timout)
{
uint32 wait_time = 0;
uint32 cmd_timout = (timout > 0) ? (timout * MILLISECS_PER_SECOND) : SRV_WAIT_COMMIT_TIMEOUT_DEFAULT;
for (;;) {
if (cm_event_timedwait(&sess->event, SRV_WAIT_COMMIT_EVENT_TIMEOUT) == CM_SUCCESS) {
break;
}
wait_time += SRV_WAIT_COMMIT_EVENT_TIMEOUT;
if (wait_time >= cmd_timout) {
CM_THROW_ERROR(ERR_API_COMMIT_TIMEOUT, "");
LOG_DEBUG_ERR("[API] srv api wait commit timeout.");
return CM_ERROR;
}
}
return CM_SUCCESS;
}
static int srv_check_and_block(void)
{
date_t begin = g_timer()->now;
while (g_srv_dcc_block.srv_blocked == CM_TRUE) {
if (g_timer()->now - begin > g_srv_dcc_block.srv_wait_timeout_ms * MICROSECS_PER_MILLISEC) {
break;
}
(void) cm_event_timedwait(&g_srv_dcc_block.srv_block_event, CM_SLEEP_1_FIXED);
}
if (g_srv_dcc_block.srv_blocked == CM_TRUE) {
CM_THROW_ERROR(ERR_SERVER_IS_BLOCKED, "");
return CM_ERROR;
}
return CM_SUCCESS;
}
static status_t srv_new_instance(void)
{
if (g_srv_inst == NULL) {
g_srv_inst = (srv_inst_t *) malloc(sizeof(srv_inst_t));
if (g_srv_inst == NULL) {
LOG_RUN_ERR("[API] srv_new_instance malloc failed.");
return CM_ERROR;
}
}
errno_t errcode = memset_s(g_srv_inst, sizeof(srv_inst_t), 0, sizeof(srv_inst_t));
if (errcode != EOK) {
CM_FREE_PTR(g_srv_inst);
return CM_ERROR;
}
g_srv_inst->attr.inst_type = INST_TYPE_API;
if (srv_init_session_pool() != CM_SUCCESS) {
LOG_RUN_ERR("[API] srv failed to init session pool");
CM_FREE_PTR(g_srv_inst);
return CM_ERROR;
}
LOG_RUN_INF("[API] dcc srv init session pool succeed.");
return CM_SUCCESS;
}
static status_t srv_init_stat_info(void)
{
if (cm_profile_stat_init() != CM_SUCCESS) {
LOG_RUN_ERR("[API] init profile stat failed");
return CM_ERROR;
}
CM_RETURN_IFERR(cm_register_stat_item(DCC_PUT, "DCCPut", STAT_UNIT_MS, STAT_INDICATOR_AVG, NULL));
CM_RETURN_IFERR(cm_register_stat_item(DCC_GET, "DCCGet", STAT_UNIT_MS, STAT_INDICATOR_AVG, NULL));
CM_RETURN_IFERR(cm_register_stat_item(DCC_FETCH, "DCCFetch", STAT_UNIT_MS, STAT_INDICATOR_AVG, NULL));
CM_RETURN_IFERR(cm_register_stat_item(DCC_DELETE, "DCCDel", STAT_UNIT_MS, STAT_INDICATOR_AVG, NULL));
CM_RETURN_IFERR(cm_register_stat_item(DCC_WATCH, "DCCWatch", STAT_UNIT_MS, STAT_INDICATOR_AVG, NULL));
CM_RETURN_IFERR(cm_register_stat_item(DCC_UNWATCH, "DCCUnwatch", STAT_UNIT_MS, STAT_INDICATOR_AVG, NULL));
CM_RETURN_IFERR(cm_register_stat_item(DCC_DB_PUT, "DBPut", STAT_UNIT_MS, STAT_INDICATOR_AVG, NULL));
CM_RETURN_IFERR(cm_register_stat_item(DCC_DB_GET, "DBGet", STAT_UNIT_MS, STAT_INDICATOR_AVG, NULL));
CM_RETURN_IFERR(cm_register_stat_item(DCC_DB_DEL, "DBDel", STAT_UNIT_MS, STAT_INDICATOR_AVG, NULL));
return CM_SUCCESS;
}
static status_t srv_instance_init()
{
log_param_t *log_param = cm_log_param_instance();
log_param->log_instance_startup = CM_FALSE;
cm_reset_error();
init_dcc_errno_desc();
if (cm_start_timer(g_timer()) != CM_SUCCESS) {
LOG_RUN_ERR("[API] cm_start_timer failed");
return CM_ERROR;
}
if (init_logger() != CM_SUCCESS) {
LOG_RUN_ERR("[API] init_logger failed");
return CM_ERROR;
}
LOG_RUN_INF("[API] dcc init logger succeed.");
if (srv_init_stat_info() != CM_SUCCESS) {
LOG_RUN_ERR("[API] init profile stat failed");
return CM_ERROR;
}
exc_try_self_recovery();
LOG_RUN_INF("[API] dcc check if need try_self_recovery end.");
CM_RETURN_IFERR(exc_check_first_init());
if (db_startup(STARTUP_MODE_OPEN) != CM_SUCCESS) {
LOG_RUN_ERR("[API] db_startup failed");
exc_try_self_recovery();
return CM_ERROR;
}
LOG_RUN_INF("[API] dcc db_startup succeed.");
if (exc_init() != CM_SUCCESS) {
db_shutdown();
LOG_RUN_ERR("[API] executor module init failed");
exc_try_self_recovery();
return CM_ERROR;
}
CM_RETURN_IFERR(exc_init_done_tryclean());
LOG_RUN_INF("[API] dcc init executor succeed.");
if (srv_new_instance() != CM_SUCCESS) {
db_shutdown();
exc_deinit();
LOG_RUN_ERR("[API] srv new instance failed");
return CM_ERROR;
}
if (srv_init_sess_apply_mgr() != CM_SUCCESS) {
srv_uninit_sess_apply_mgr();
db_shutdown();
exc_deinit();
LOG_RUN_ERR("[API] srv init sess apply mgr failed");
return CM_ERROR;
}
LOG_RUN_INF("[API] dcc create srv instance and init sess apply mgr succeed.");
(void) exc_register_consensus_proc(srv_sess_consensus_proc);
LOG_RUN_INF("[API] dcc srv instance init succeed.");
return CM_SUCCESS;
}
static status_t srv_init_block(void)
{
g_srv_dcc_block.srv_blocked = CM_FALSE;
g_srv_dcc_block.srv_wait_timeout_ms = 0;
if (cm_event_init(&g_srv_dcc_block.srv_block_event) != CM_SUCCESS) {
CM_THROW_ERROR(ERR_CREATE_EVENT, cm_get_os_error());
return CM_ERROR;
}
return CM_SUCCESS;
}
int srv_dcc_start(void)
{
cm_reset_error();
init_dcc_errno_desc();
cm_latch_x(&g_dcc_latch, 0, NULL);
if (srv_get_status() == DCC_SRV_RUNNING) {
cm_unlatch(&g_dcc_latch, NULL);
CM_THROW_ERROR(ERR_SERVER_STOPPED, "");
LOG_RUN_INF("[API] srv_get_status failed");
return CM_ERROR;
}
if (srv_instance_init() != CM_SUCCESS) {
cm_unlatch(&g_dcc_latch, NULL);
CM_THROW_ERROR(ERR_SERVER_START_FAILED, "");
return CM_ERROR;
}
CM_RETURN_IFERR(srv_init_block());
srv_set_status(DCC_SRV_RUNNING);
cm_unlatch(&g_dcc_latch, NULL);
param_value_t param_value;
CM_RETURN_IFERR(srv_get_param(DCC_PARAM_LOG_SUPPRESS_ENABLE, ¶m_value));
cm_log_param_instance()->log_suppress_enable = (param_value.uint32_val == 1) ? CM_TRUE : CM_FALSE;
LOG_RUN_INF("[API] dcc srv start succeed.");
return CM_SUCCESS;
}
int srv_dcc_stop(void)
{
cm_reset_error();
init_dcc_errno_desc();
cm_profile_stat_uninit();
cm_latch_x(&g_dcc_latch, 0, NULL);
if (srv_get_status() != DCC_SRV_RUNNING) {
cm_unlatch(&g_dcc_latch, NULL);
CM_THROW_ERROR(ERR_SERVER_STOPPED, "");
return CM_ERROR;
}
exc_deinit();
db_shutdown();
srv_uninit_sess_apply_mgr();
if (g_srv_inst->attr.inst_type == INST_TYPE_API) {
srv_kill_all_session();
CM_FREE_PTR(g_srv_inst);
}
cm_close_timer(g_timer());
srv_set_status(DCC_SRV_STOP);
cm_unlatch(&g_dcc_latch, NULL);
LOG_RUN_INF("[API] dcc srv stop succeed.");
uninit_logger();
return CM_SUCCESS;
}
int srv_dcc_set_param(const char *param_name, const char *param_value)
{
CM_CHECK_NULL_PTR(param_name);
cm_reset_error();
init_dcc_errno_desc();
if (cm_str_equal(param_name, "SSL_PWD_PLAINTEXT")) {
LOG_OPER("[API] dcc set param, param_name=%s param_value=%s", param_name, "***");
} else {
LOG_OPER("[API] dcc set param, param_name=%s param_value=%s", param_name, param_value);
}
return srv_set_param(param_name, param_value);
}
int srv_dcc_register_status_notify(dcc_cb_status_notify_t cb_func)
{
cm_reset_error();
return exc_register_status_notify_proc(cb_func);
}
int srv_dcc_register_log_output(usr_cb_log_output_t cb_func)
{
cm_reset_error();
if (cb_func == NULL) {
CM_THROW_ERROR(ERR_INVALID_PARAMETER_VALUE, "callback function is", "NULL");
return CM_ERROR;
}
log_param_t *log_param = cm_log_param_instance();
log_param->log_write = (usr_cb_log_output_t) cb_func;
return CM_SUCCESS;
}
int srv_dcc_alloc_handle(void **handle)
{
cm_reset_error();
init_dcc_errno_desc();
CM_CHECK_NULL_PTR(handle);
CHECK_SRV_STATUS(DCC_SRV_RUNNING);
session_t *sess = NULL;
int ret = srv_alloc_session(&sess, NULL, SESSION_TYPE_API);
if (ret != CM_SUCCESS) {
CM_THROW_ERROR(ERR_MALLOC_MEM, "alloc server session");
LOG_DEBUG_ERR("[API] srv api alloc session failed.");
return CM_ERROR;
}
if (exc_alloc_handle(&sess->stg_handle) != CM_SUCCESS) {
CM_FREE_PTR(sess->req_buf);
srv_return_session(sess);
CM_THROW_ERROR(ERR_MALLOC_MEM, "alloc storage handle");
LOG_DEBUG_ERR("[API] srv api alloc db stg handle failed");
return CM_ERROR;
}
if (exc_read_handle4table(sess->stg_handle, EXC_DCC_KV_TABLE) != CM_SUCCESS) {
exc_free_handle(sess->stg_handle);
sess->stg_handle = NULL;
CM_FREE_PTR(sess->req_buf);
srv_return_session(sess);
CM_THROW_ERROR(ERR_MALLOC_MEM, "alloc storage handle");
LOG_DEBUG_ERR("[API] srv api alloc db stg handle failed");
return CM_ERROR;
}
if (cm_event_init(&sess->event) != CM_SUCCESS) {
exc_free_handle(sess->stg_handle);
sess->stg_handle = NULL;
CM_FREE_PTR(sess->req_buf);
srv_return_session(sess);
CM_THROW_ERROR(ERR_MALLOC_MEM, "init session event");
LOG_DEBUG_ERR("[API] srv create event failed.");
return CM_ERROR;
}
*handle = (void *) sess;
return CM_SUCCESS;
}
void srv_dcc_free_handle(void *handle)
{
cm_reset_error();
if (srv_get_status() != DCC_SRV_RUNNING) {
return;
}
session_t *sess = (session_t *) handle;
exc_free_handle(sess->stg_handle);
sess->stg_handle = NULL;
if (sess->ses_type == SESSION_TYPE_API) {
CM_FREE_PTR(sess->req_buf);
}
srv_return_session(sess);
}
static inline status_t srv_dcc_put_uint32(text_t *buff, uint32 value)
{
*(uint32 *) (buff->str + buff->len) = value;
buff->len += sizeof(uint32);
return CM_SUCCESS;
}
static status_t srv_dcc_put_text(text_t *buff, uint32 buff_len, const text_t *text)
{
*(uint32 *) (buff->str + buff->len) = text->len;
buff->len += sizeof(uint32);
if (text->len == 0) {
return CM_SUCCESS;
}
MEMS_RETURN_IFERR(memcpy_s(buff->str + buff->len, buff_len - buff->len, text->str, text->len));
buff->len += CM_ALIGN4(text->len);
return CM_SUCCESS;
}
int srv_dcc_get(const void *handle, dcc_text_t *range, const dcc_option_t *option,
dcc_text_t *key, dcc_text_t *value, unsigned int *eof)
{
cm_reset_error();
CHECK_SRV_STATUS(DCC_SRV_RUNNING);
CM_RETURN_IFERR(srv_check_handle_key(handle, range));
CM_CHECK_NULL_PTR(key);
CM_CHECK_NULL_PTR(option);
CM_CHECK_NULL_PTR(value);
CM_CHECK_NULL_PTR(eof);
int ret;
int64 now = g_timer()->now;
if (!option->read_op.is_prefix) {
*key = *range;
ret = exc_get(((session_t *) handle)->stg_handle, (text_t *) key, (text_t *) value,
(uint32)option->read_op.read_level, eof);
if (ret != CM_SUCCESS) {
util_convert_exc_errno();
LOG_DEBUG_ERR("[API] exc get failed or eof, ret:%d eof:%u", ret, *eof);
return CM_ERROR;
}
if (*eof == CM_TRUE) {
CM_THROW_ERROR(ERR_KEY_NOT_FOUND, "");
return CM_ERROR;
}
cm_stat_record(DCC_GET, (uint64) (g_timer()->now - now));
return CM_SUCCESS;
}
ret = exc_open_cursor(((session_t *) handle)->stg_handle, (text_t *) range,
(uint32)option->read_op.read_level, eof);
if (ret != CM_SUCCESS) {
util_convert_exc_errno();
LOG_DEBUG_ERR("[API] exc open cursor failed or eof, ret:%d eof:%u", ret, *eof);
return CM_ERROR;
}
if (*eof == CM_TRUE) {
CM_THROW_ERROR(ERR_KEY_NOT_FOUND, "");
return CM_ERROR;
}
ret = exc_cursor_fetch(((session_t *) handle)->stg_handle, (text_t *) key, (text_t *) value);
if (ret == CM_SUCCESS) {
cm_stat_record(DCC_GET, (uint64) (g_timer()->now - now));
} else {
util_convert_exc_errno();
}
return ret;
}
int srv_dcc_fetch(const void *handle, dcc_text_t *key, dcc_text_t *value, const dcc_option_t *option,
unsigned int *eof)
{
cm_reset_error();
CHECK_SRV_STATUS(DCC_SRV_RUNNING);
CM_RETURN_IFERR(srv_check_handle_key(handle, key));
CM_CHECK_NULL_PTR(value);
CM_CHECK_NULL_PTR(option);
CM_CHECK_NULL_PTR(eof);
int64 now = g_timer()->now;
if (!option->read_op.is_prefix) {
*eof = CM_TRUE;
cm_stat_record(DCC_FETCH, (uint64) (g_timer()->now - now));
return CM_SUCCESS;
}
int ret = exc_cursor_next(((session_t *) handle)->stg_handle, eof);
if (ret != CM_SUCCESS) {
util_convert_exc_errno();
LOG_DEBUG_ERR("[API] exc cursor next failed or eof, ret:%d eof:%u", ret, *eof);
return CM_ERROR;
}
if (*eof == CM_TRUE) {
CM_THROW_ERROR(ERR_KEY_NOT_FOUND, "");
return CM_ERROR;
}
ret = exc_cursor_fetch(((session_t *) handle)->stg_handle, (text_t *) key, (text_t *) value);
if (ret == CM_SUCCESS) {
cm_stat_record(DCC_FETCH, (uint64) (g_timer()->now - now));
} else {
util_convert_exc_errno();
}
return ret;
}
int srv_dcc_put(const void *handle, const dcc_text_t *key, const dcc_text_t *value, dcc_option_t *option)
{
cm_reset_error();
CHECK_SRV_STATUS(DCC_SRV_RUNNING);
CM_RETURN_IFERR(srv_check_handle_key(handle, key));
CM_RETURN_IFERR(srv_check_val(value));
CM_CHECK_NULL_PTR(option);
CM_RETURN_IFERR(srv_check_and_block());
int64 now = g_timer()->now;
session_t *sess = (session_t *) handle;
CM_CHECK_NULL_PTR(sess->req_buf);
text_t req = {
.len = 0,
.str = sess->req_buf};
CM_RETURN_IFERR(srv_dcc_put_uint32(&req, DCC_CMD_PUT));
CM_RETURN_IFERR(srv_dcc_put_uint32(&req, option->write_op.sequence));
CM_RETURN_IFERR(srv_dcc_put_uint32(&req, option->write_op.not_existed));
CM_RETURN_IFERR(srv_dcc_put_text(&req, SRV_SESS_API_REQ_BUFF_LEN, (text_t *) value));
text_t exp_val = {
.len = option->write_op.expect_val_size,
.str = option->write_op.expect_value};
CM_RETURN_IFERR(srv_dcc_put_text(&req, SRV_SESS_API_REQ_BUFF_LEN, &exp_val));
uint32 lease_len = 0;
CM_RETURN_IFERR(srv_dcc_put_uint32(&req, lease_len));
CM_RETURN_IFERR(srv_dcc_put_text(&req, SRV_SESS_API_REQ_BUFF_LEN, (text_t *) key));
int ret = exc_put(sess->stg_handle, &req, sess->write_key, &sess->index);
if (ret != CM_SUCCESS) {
util_convert_exc_errno();
return CM_ERROR;
}
ret = srv_wait_commit(sess, option->cmd_timeout);
if (ret == CM_SUCCESS) {
cm_stat_record(DCC_PUT, (uint64) (g_timer()->now - now));
} else {
util_convert_exc_errno();
}
return ret;
}
int srv_dcc_delete(const void *handle, const dcc_text_t *key, const dcc_option_t *option)
{
cm_reset_error();
CHECK_SRV_STATUS(DCC_SRV_RUNNING);
CM_RETURN_IFERR(srv_check_handle_key(handle, key));
CM_CHECK_NULL_PTR(option);
if (option->del_op.is_prefix && key->len >= SRV_MAX_PREFIX_KEY_SIZE) {
CM_THROW_ERROR(ERR_INVALID_PARAMETER_VALUE, "");
return CM_ERROR;
}
CM_RETURN_IFERR(srv_check_and_block());
int64 now = g_timer()->now;
session_t *sess = (session_t *) handle;
CM_CHECK_NULL_PTR(sess->req_buf);
text_t req = {
.len = 0,
.str = sess->req_buf};
CM_RETURN_IFERR(srv_dcc_put_uint32(&req, DCC_CMD_DELETE));
CM_RETURN_IFERR(srv_dcc_put_uint32(&req, option->del_op.is_prefix));
CM_RETURN_IFERR(srv_dcc_put_text(&req, SRV_SESS_API_REQ_BUFF_LEN, (text_t *) key));
int ret = exc_del(sess->stg_handle, &req, sess->write_key, &sess->index);
if (ret != CM_SUCCESS) {
util_convert_exc_errno();
return CM_ERROR;
}
ret = srv_wait_commit(sess, option->cmd_timeout);
if (ret == CM_SUCCESS) {
cm_stat_record(DCC_DELETE, (uint64) (g_timer()->now - now));
} else {
util_convert_exc_errno();
}
return ret;
}
int srv_dcc_watch(const void *handle, dcc_text_t *key, dcc_watch_proc_t proc, dcc_option_t *option)
{
cm_reset_error();
CHECK_SRV_STATUS(DCC_SRV_RUNNING);
CM_RETURN_IFERR(srv_check_handle_key(handle, key));
CM_CHECK_NULL_PTR(key->value);
CM_CHECK_NULL_PTR(proc);
CM_CHECK_NULL_PTR(option);
int64 now = g_timer()->now;
option->sid = ((session_t *) handle)->id;
int ret = exc_watch(((session_t *) handle)->stg_handle, (text_t *) (void *) key, proc, option, NULL);
if (ret == CM_SUCCESS) {
cm_stat_record(DCC_WATCH, (uint64) (g_timer()->now - now));
} else {
util_convert_exc_errno();
}
return ret;
}
int srv_dcc_unwatch(const void *handle, dcc_text_t *key)
{
cm_reset_error();
CHECK_SRV_STATUS(DCC_SRV_RUNNING);
CM_RETURN_IFERR(srv_check_handle_key(handle, key));
int64 now = g_timer()->now;
dcc_option_t option = {.sid = 0};
option.sid = ((session_t *) handle)->id;
int ret = exc_unwatch(((session_t *) handle)->stg_handle, (text_t *) (void *) key, &option);
if (ret == CM_SUCCESS) {
cm_stat_record(DCC_UNWATCH, (uint64) (g_timer()->now - now));
} else {
util_convert_exc_errno();
}
return ret;
}
const char *srv_dcc_get_version(void)
{
cm_reset_error();
return dcc_get_version();
}
int srv_dcc_get_errorno(void)
{
return cm_get_error_code();
}
const char *srv_dcc_get_error(int code)
{
return cm_get_errormsg(code);
}
int srv_dcc_get_node_status(dcc_node_status_t *node_stat)
{
cm_reset_error();
int ret = exc_node_is_healthy(node_stat);
if (ret != CM_SUCCESS) {
util_convert_exc_errno();
return CM_ERROR;
}
return CM_SUCCESS;
}
int srv_dcc_exec_cmd(void *handle, const dcc_text_t *cmd_line, dcc_text_t *ans_buf)
{
cm_reset_error();
CHECK_SRV_STATUS(DCC_SRV_RUNNING);
CM_CHECK_NULL_PTR(handle);
CM_CHECK_NULL_PTR(cmd_line);
if (cmd_line->len == 0) {
return CM_ERROR;
}
CM_CHECK_NULL_PTR(cmd_line->value);
CM_CHECK_NULL_PTR(ans_buf);
session_t *session = (session_t *) handle;
CM_CHECK_NULL_PTR(session->req_buf);
MEMS_RETURN_IFERR(memset_sp(session->req_buf, SRV_SESS_API_REQ_BUFF_LEN, 0, SRV_SESS_API_REQ_BUFF_LEN));
text_t left;
text_t right;
text_t cmd = {
.len = cmd_line->len,
.str = cmd_line->value
};
int argc = 0;
text_t argv[DCC_CMD_PARAMETER_CNT];
do {
if (argc == DCC_CMD_PARAMETER_CNT) {
CM_THROW_ERROR(ERR_INVALID_PARAMETER_VALUE, "cmd param num", "count is larger than 16");
return CM_ERROR;
}
cm_trim_text(&cmd);
cm_split_text(&cmd, (DCC_SPLIT_STRING)[0], DCC_ENCLOSE_CHAR, &left, &right);
argv[argc] = left;
cmd = right;
argc++;
} while (right.str != NULL);
ans_buf->value = session->req_buf;
ans_buf->len = 0;
ctl_command_t ctl_command = {0};
ctl_command.command_option.read_level = DCC_READ_LEVEL_CONSISTENT;
int ret = ctl_parse_process(argv, argc, 0, &ctl_command);
if (ret != CM_SUCCESS) {
CM_THROW_ERROR(ERR_INVALID_CMD_CONTENT, "command content is error");
return CM_ERROR;
}
return srv_exec_cmd_process(session, &ctl_command, ans_buf);
}
int srv_dcc_query_cluster_info(char *buffer, unsigned int length)
{
cm_reset_error();
CHECK_SRV_STATUS(DCC_SRV_RUNNING);
CM_CHECK_NULL_PTR(buffer);
int len = dcf_query_cluster_info(buffer, length);
if (len == 0) {
CM_THROW_ERROR(ERR_DCF_INTERNAL, "");
LOG_RUN_ERR("[API] dcf_query_cluster_info: error_no:%d, error_msg:%s",
dcf_get_errorno(),
dcf_get_error(dcf_get_errorno()));
return 0;
}
return len;
}
int srv_dcc_query_leader_info(unsigned int *node_id)
{
cm_reset_error();
CHECK_SRV_STATUS(DCC_SRV_RUNNING);
CM_CHECK_NULL_PTR(node_id);
*node_id = exc_get_leader_id();
if (*node_id == EXC_INVALID_NODE_ID) {
return CM_ERROR;
}
return CM_SUCCESS;
}
int srv_dcc_set_blocked(unsigned int is_block, unsigned int wait_timeout_ms)
{
uint32 cnt = 0;
cm_reset_error();
CHECK_SRV_STATUS(DCC_SRV_RUNNING);
LOG_OPER("[API] dcc set blocked, is_block=%u wait_timeout_ms=%u", is_block, wait_timeout_ms);
g_srv_dcc_block.srv_blocked = is_block == 0 ? CM_FALSE : CM_TRUE;
g_srv_dcc_block.srv_wait_timeout_ms = wait_timeout_ms;
cm_event_notify(&g_srv_dcc_block.srv_block_event);
if (g_srv_dcc_block.srv_blocked) {
while (!exc_is_idle() && cnt < DCC_TRY_BLOCK_CNT) {
cm_sleep(CM_SLEEP_50_FIXED);
cnt++;
}
return cnt < DCC_TRY_BLOCK_CNT ? CM_SUCCESS : CM_ERROR;
}
return CM_SUCCESS;
}
int srv_dcc_set_work_mode(dcc_work_mode_t work_mode, unsigned int vote_num)
{
cm_reset_error();
CHECK_SRV_STATUS(DCC_SRV_RUNNING);
LOG_OPER("[API] dcc set work mode, work_mode=%d vote_num=%u", work_mode, vote_num);
int ret = dcf_set_work_mode(DCC_STREAM_ID, work_mode, vote_num);
if (ret != CM_SUCCESS) {
CM_THROW_ERROR(ERR_DCF_INTERNAL, "");
LOG_DEBUG_ERR(
"[API] dcf_set_work_mode: error_no:%d, error_msg:%s", dcf_get_errorno(), dcf_get_error(dcf_get_errorno()));
return CM_ERROR;
}
return CM_SUCCESS;
}
int srv_dcc_demote_follower(void)
{
cm_reset_error();
CHECK_SRV_STATUS(DCC_SRV_RUNNING);
LOG_OPER("[API] dcc demote follower");
int ret = dcf_demote_follower(DCC_STREAM_ID);
if (ret != CM_SUCCESS) {
CM_THROW_ERROR(ERR_DCF_INTERNAL, "");
LOG_DEBUG_ERR("[API] dcf_demote_follower: error_no:%d, error_msg:%s",
dcf_get_errorno(),
dcf_get_error(dcf_get_errorno()));
return CM_ERROR;
}
return CM_SUCCESS;
}
int srv_dcc_set_election_priority(unsigned long long priority)
{
cm_reset_error();
CHECK_SRV_STATUS(DCC_SRV_RUNNING);
LOG_DEBUG_INF("[API]dcf_set_election_priority, priority :%llu", priority);
int ret = dcf_set_election_priority(DCC_STREAM_ID, priority);
if (ret != CM_SUCCESS) {
CM_THROW_ERROR(ERR_DCF_INTERNAL, "");
LOG_DEBUG_ERR("[API] dcf_set_election_priority: error_no:%d, error_msg:%s",
dcf_get_errorno(), dcf_get_error(dcf_get_errorno()));
return CM_ERROR;
}
return CM_SUCCESS;
}
int srv_dcc_promote_leader(unsigned int node_id, unsigned int wait_timeout_ms)
{
cm_reset_error();
CHECK_SRV_STATUS(DCC_SRV_RUNNING);
LOG_OPER("dcc promote leader, node_id:%u wait_timeout:%u", node_id, wait_timeout_ms);
int ret = dcf_promote_leader(DCC_STREAM_ID, node_id, wait_timeout_ms);
if (ret != CM_SUCCESS) {
CM_THROW_ERROR(ERR_DCF_INTERNAL, "");
LOG_DEBUG_ERR(
"[API] dcf_promote_leader: error_no:%d, error_msg:%s", dcf_get_errorno(), dcf_get_error(dcf_get_errorno()));
return CM_ERROR;
}
return CM_SUCCESS;
}
int srv_dcc_backup(const char *bak_format)
{
cm_reset_error();
CHECK_SRV_STATUS(DCC_SRV_RUNNING);
LOG_OPER("[API] dcc backup");
int ret = exc_backup(bak_format);
if (ret != CM_SUCCESS) {
LOG_DEBUG_ERR("[API] dcc backup failed: error_no:%d, error_msg:%s",
dcf_get_errorno(), dcf_get_error(dcf_get_errorno()));
return CM_ERROR;
}
return CM_SUCCESS;
}
int srv_dcc_restore(const char *restore_path)
{
cm_reset_error();
LOG_OPER("[API] dcc restore");
int ret = exc_restore(restore_path, NULL, NULL);
if (ret != CM_SUCCESS) {
LOG_DEBUG_ERR("[API] dcc restore failed: error_no:%d, error_msg:%s",
dcf_get_errorno(), dcf_get_error(dcf_get_errorno()));
return CM_ERROR;
}
return CM_SUCCESS;
}
int srv_dcc_set_dcf_param(const char *param_name, const char *param_value)
{
CM_CHECK_NULL_PTR(param_name);
cm_reset_error();
init_dcc_errno_desc();
return dcf_set_param(param_name, param_value);
}
#ifdef __cplusplus
}
#endif