* Copyright (c) 2022 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* clt_core.c
*
*
* IDENTIFICATION
* src/client/clt_core.c
*
* -------------------------------------------------------------------------
*/
#include "clt_core.h"
#include "clt_msg_adaptor.h"
#include "dcc_msg_protocol.h"
#include "net_client.h"
#include "dcc_msg_cmd.h"
#include "cm_timer.h"
#include "cm_error.h"
#include "cm_thread.h"
#include "cm_file.h"
#include "cm_utils.h"
#define URL_SPLIT_CHAR ','
#define URL_END_CHAR '\0'
#define TIMEOUT_ONE_THIRD 3
#define CIPHER_KEY_FILE ".cipher"
#define RAND_KEY_FILE ".rand"
#define CLT_MEM_2_FIXED (2)
static status_t clt_resuming_watch(clt_handle_t *handle);
static status_t clt_init_async_conn(clt_handle_t *handle);
static status_t clt_send_request(clt_handle_t *handle, uint8 cmd, const void *request);
static status_t clt_rcv_response(clt_handle_t *handle, uint8 cmd);
static inline status_t parse_response(cs_packet_t *pack, clt_handle_t *handle)
{
CM_RETURN_IFERR(cs_get_int32(pack, (int32 *) &handle->eof));
CM_RETURN_IFERR(cs_get_int32(pack, (int32 *) &handle->kv_cnt));
handle->pack_offset = pack->offset;
return CM_SUCCESS;
}
static status_t clt_create_conn(clt_handle_t *handle, bool32 is_sync);
static void clt_close_conn(clt_handle_t *handle, bool32 is_sync);
static void clt_hb_thread_entry(thread_t *thread);
static status_t clt_hb_proc(cs_packet_t *pack, void *handle);
static status_t clt_watch_proc(cs_packet_t *packet, void *handle);
static status_t clt_parse_url(clt_handle_t *handle, char *server_list);
static void clt_try_next_url(clt_handle_t *handle, bool32 is_sync);
static inline status_t clt_init_pack(cs_packet_t **packet, void *channel)
{
*packet = cs_get_send_pack(channel);
CM_CHECK_NULL_PTR(*packet);
cs_init_set(*packet, CS_LOCAL_VERSION);
return CM_SUCCESS;
}
static inline void clt_get_rcv_pack(cs_packet_t **rcv_pack, void *channel)
{
*rcv_pack = cs_get_recv_pack(channel);
if (*rcv_pack == NULL) {
return;
}
cs_init_get(*rcv_pack);
}
static inline void clt_poll_next_url(atomic32_t *idx, int32 server_cnt)
{
if (server_cnt == 0) {
return;
}
int32 old_leader_idx = cm_atomic32_get(idx);
(void) cm_atomic32_cas(idx, old_leader_idx, (old_leader_idx + 1) % server_cnt);
}
static status_t clt_read_key_rand(clt_handle_t *handle)
{
int file;
int size;
int len = 0;
status_t ret;
char real_path[CM_FILE_NAME_BUFFER_SIZE] = {0};
char buf[CM_FULL_PATH_BUFFER_SIZE] = {0};
CM_RETURN_IFERR(realpath_file(handle->key_file, real_path, CM_FILE_NAME_BUFFER_SIZE));
len = sprintf_s(buf, CM_FULL_PATH_BUFFER_SIZE, "%s%s", real_path, RAND_KEY_FILE);
if (len < 0 || ((uint32) len) > CM_FULL_PATH_BUFFER_SIZE) {
return CM_ERROR;
}
CM_RETURN_IFERR(cm_open_file(buf, O_BINARY | O_RDWR, &file));
ret = cm_read_file(file, handle->cipher.rand, RANDOM_LEN, &size);
if (ret != CM_SUCCESS || size < RANDOM_LEN) {
cm_close_file(file);
return CM_ERROR;
}
ret = cm_read_file(file, handle->cipher.salt, RANDOM_LEN, &size);
if (ret != CM_SUCCESS || size < RANDOM_LEN) {
cm_close_file(file);
return CM_ERROR;
}
ret = cm_read_file(file, handle->cipher.IV, RANDOM_LEN, &size);
if (ret != CM_SUCCESS || size < RANDOM_LEN) {
cm_close_file(file);
return CM_ERROR;
}
cm_close_file(file);
return CM_SUCCESS;
}
static status_t clt_read_cipher(clt_handle_t *handle)
{
int file;
int len;
status_t ret;
char buf[CM_FULL_PATH_BUFFER_SIZE] = {0};
len = sprintf_s(buf, CM_FULL_PATH_BUFFER_SIZE, "%s%s", handle->key_file, CIPHER_KEY_FILE);
if (len < 0 || ((uint32) len) > CM_FULL_PATH_BUFFER_SIZE) {
return CM_ERROR;
}
CM_RETURN_IFERR(cm_open_file(buf, O_BINARY | O_RDWR, &file));
ret = cm_read_file(file, handle->cipher.cipher_text, CM_PASSWD_MAX_LEN, (int32 *) &handle->cipher.cipher_len);
if (ret != CM_SUCCESS) {
cm_close_file(file);
return CM_ERROR;
}
cm_close_file(file);
return CM_SUCCESS;
}
static status_t clt_init_ssl_option(clt_handle_t *handle, const dcc_open_option_t *open_option)
{
status_t ret;
if (open_option->ca_file != NULL) {
uint32 ca_size = (uint32) strlen(open_option->ca_file) + 1;
handle->ca_file = malloc(CM_FULL_PATH_BUFFER_SIZE);
if (handle->ca_file == NULL) {
CM_THROW_ERROR(DCC_CLI_NO_MEMORY_ERR, "");
return CM_ERROR;
}
MEMS_RETURN_IFERR(memcpy_sp(handle->ca_file, CM_FULL_PATH_BUFFER_SIZE, open_option->ca_file, ca_size));
}
if (open_option->crt_file != NULL) {
handle->crt_file = malloc(CM_FULL_PATH_BUFFER_SIZE);
if (handle->crt_file == NULL) {
CM_THROW_ERROR(DCC_CLI_NO_MEMORY_ERR, "");
return CM_ERROR;
}
uint32 crt_size = (uint32) strlen(open_option->crt_file) + 1;
MEMS_RETURN_IFERR(memcpy_sp(handle->crt_file, CM_FULL_PATH_BUFFER_SIZE, open_option->crt_file, crt_size));
}
if (open_option->key_file != NULL) {
handle->key_file = malloc(CM_FULL_PATH_BUFFER_SIZE);
if (handle->key_file == NULL) {
CM_THROW_ERROR(DCC_CLI_NO_MEMORY_ERR, "");
return CM_ERROR;
}
uint32 key_size = (uint32) strlen(open_option->key_file) + 1;
MEMS_RETURN_IFERR(memcpy_sp(handle->key_file, CM_FULL_PATH_BUFFER_SIZE, open_option->key_file, key_size));
handle->passwd = (uchar *) malloc(CM_PASSWD_MAX_LEN);
if (handle->passwd == NULL) {
CM_THROW_ERROR(DCC_CLI_NO_MEMORY_ERR, "");
return CM_ERROR;
}
ret = clt_read_cipher(handle);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("[CLI]decode cipher failed");
return CM_ERROR;
}
ret = clt_read_key_rand(handle);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("[CLI]decode key failed");
return CM_ERROR;
}
}
return CM_SUCCESS;
}
status_t clt_init_handle(clt_handle_t **handle, const dcc_open_option_t *open_option)
{
status_t ret;
*handle = malloc(sizeof(clt_handle_t));
if (*handle == NULL) {
CM_THROW_ERROR(DCC_CLI_NO_MEMORY_ERR, "");
return CM_ERROR;
}
MEMS_RETURN_IFERR(memset_sp(*handle, sizeof(clt_handle_t), 0, sizeof(clt_handle_t)));
ret = clt_parse_url(*handle, open_option->server_list);
if (ret != CM_SUCCESS) {
CM_THROW_ERROR(DCC_CLI_ENDPOINTS_FORMAT_ERR, "");
LOG_RUN_ERR("[CLI]the endpoints's format is wrong");
return CM_ERROR;
}
if (open_option->clt_name == NULL) {
CM_THROW_ERROR(DCC_CLI_ENDPOINTS_FORMAT_ERR, "");
LOG_RUN_ERR("[CLI]the clt_name is NULL");
return CM_ERROR;
}
size_t len = strlen(open_option->clt_name) + 1;
MEMS_RETURN_IFERR(memcpy_sp((*handle)->clt_name, MAX_CLI_NAME_ZIE, open_option->clt_name, len));
ret = clt_init_ssl_option(*handle, open_option);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("[CLI]init ssl option failed");
return CM_ERROR;
}
ret = clt_watch_pool_init(&(*handle)->watch_manager);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("[CLI]init watcher manager failed");
return CM_ERROR;
}
(*handle)->time_out = open_option->time_out;
(*handle)->hb_interval = open_option->time_out / TIMEOUT_ONE_THIRD;
(void) cm_atomic32_add(&(*handle)->conn_idx, (int32) cm_random((*handle)->server_cnt));
return CM_SUCCESS;
}
static status_t clt_parse_url(clt_handle_t *handle, char *server_list)
{
int32 server_cnt = 0;
text_t text, l_text, r_text;
cm_str2text(server_list, &text);
while (text.len != 0) {
if (server_cnt >= MAX_SERVER_SIZE) {
return CM_ERROR;
}
cm_split_text(&text, URL_SPLIT_CHAR, URL_END_CHAR, &l_text, &r_text);
handle->server_texts[server_cnt] = malloc(l_text.len + 1);
if (handle->server_texts[server_cnt] == NULL) {
CM_THROW_ERROR(DCC_CLI_NO_MEMORY_ERR, "");
return CM_ERROR;
}
handle->server_cnt = server_cnt + 1;
MEMS_RETURN_IFERR(memcpy_sp(handle->server_texts[server_cnt], l_text.len, l_text.str, l_text.len));
handle->server_texts[server_cnt][l_text.len] = URL_END_CHAR;
cm_str2text_safe(r_text.str, r_text.len, &text);
LOG_DEBUG_INF("[CLI]the endpoint%u is: %s", server_cnt, handle->server_texts[server_cnt]);
server_cnt++;
}
return CM_SUCCESS;
}
status_t clt_init_conn(clt_handle_t *handle)
{
status_t ret;
ret = clt_process_sync_cmd(handle, DCC_CMD_CONNECT, NULL, handle->server_cnt + 1);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("[CLI]connect to leader failed");
return ret;
}
return CM_SUCCESS;
}
static status_t clt_init_async_conn(clt_handle_t *handle)
{
status_t ret;
CM_RETURN_IFERR(cm_event_init(&handle->async_channel_event));
ret = cm_create_thread(clt_hb_thread_entry, SIZE_M(CLT_MEM_2_FIXED), (void *) handle,
&handle->async_channel_thread);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("[CLI]create hb thread failed");
return ret;
}
return CM_SUCCESS;
}
void clt_register_net_proc(void)
{
cs_register_msg_process(DCC_CMD_WATCH, clt_watch_proc);
cs_register_msg_process(DCC_CMD_HEARTBEAT, clt_hb_proc);
}
void clt_deinit(clt_handle_t **handle)
{
if (*handle == NULL) {
return;
}
cm_event_notify(&(*handle)->async_channel_event);
cm_close_thread(&(*handle)->async_channel_thread);
cm_event_destory(&(*handle)->async_channel_event);
clt_close_conn((*handle), CM_FALSE);
clt_close_conn((*handle), CM_TRUE);
}
static void clt_hb_thread_entry(thread_t *thread)
{
status_t ret;
cs_packet_t *send_pack = NULL;
clt_handle_t *handle = (clt_handle_t *) thread->argument;
while (!thread->closed) {
if (handle->async_connected == CLT_NOT_CONNECTED) {
if (handle->try_times >= (uint32)(handle->server_cnt)) {
cm_sleep(MICROSECS_PER_MILLISEC);
handle->try_times = 0;
}
if (handle->channel[ASYNC_CHANNEL_IDX] != NULL) {
clt_close_conn(handle, CM_FALSE);
}
ret = clt_create_conn(handle, CM_FALSE);
handle->try_times++;
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("[CLI]create async conn failed");
continue;
}
handle->async_connected = CLT_CONNECTING;
}
(void) clt_init_pack(&send_pack, handle->channel[ASYNC_CHANNEL_IDX]);
ret = cs_remote_call_no_wait(handle->channel[ASYNC_CHANNEL_IDX], send_pack, DCC_CMD_HEARTBEAT);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("[CLI]send hb message failed");
clt_try_next_url(handle, CM_FALSE);
}
(void) cm_event_timedwait(&handle->async_channel_event, handle->hb_interval);
}
}
static status_t clt_hb_proc(cs_packet_t *pack, void *handle)
{
status_t ret;
connect_res_t connect_res;
clt_handle_t *hd = (clt_handle_t *) handle;
if (pack->head->result != CM_SUCCESS || pack->head->cmd != DCC_CMD_HEARTBEAT) {
LOG_DEBUG_ERR("[CLI]the format of hb is wrong, cmd:%hhu, code:%hhu", pack->head->cmd, pack->head->result);
clt_try_next_url(hd, CM_FALSE);
cm_event_notify(&hd->async_channel_event);
return CM_ERROR;
}
ret = decode_connect_res(pack, &connect_res);
if (ret != CM_SUCCESS) {
LOG_DEBUG_ERR("[CLI]decode connect response failed");
clt_try_next_url(hd, CM_FALSE);
cm_event_notify(&hd->async_channel_event);
return CM_ERROR;
}
if (hd->async_connected != CLT_CONNECTED) {
hd->async_connected = CLT_CONNECTED;
hd->session_id = connect_res.session_id;
hd->try_times = 0;
ret = clt_resuming_watch(handle);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("[CLI]resuming watch failed");
}
cm_event_notify(&hd->async_channel_event);
}
return CM_SUCCESS;
}
static status_t clt_watch_proc(cs_packet_t *packet, void *handle)
{
text_t key;
status_t ret;
clt_handle_t *hd = (clt_handle_t *) handle;
watch_res_t watch_res = {0};
ret = decode_watch_res(packet, &watch_res);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("[CLI]decode watch res failed");
return ret;
}
dcc_watch_result_t watch_result = {0};
convert_watch_response(&watch_res, &watch_result);
uint32 is_prefix = watch_res.is_dir;
cm_str2text_safe(watch_res.key, watch_res.key_size, &key);
ret = clt_watch_pool_call(hd->watch_manager, &key, is_prefix, &watch_result);
return ret;
}
static status_t clt_create_conn(clt_handle_t *handle, bool32 is_sync)
{
status_t ret;
uint32 passwd_len;
conn_option_t option = {
.connect_timeout = CM_CONNECT_TIMEOUT,
.socket_timeout = handle->time_out,
.ssl_para = {0}
};
if (handle->key_file != NULL) {
ret = cm_decrypt_pwd(&handle->cipher, handle->passwd, &passwd_len);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("[CLI]decrypt pwd failed");
MEMS_RETURN_IFERR(memset_sp(handle->passwd, CM_PASSWD_MAX_LEN, 0, CM_PASSWD_MAX_LEN));
return CM_ERROR;
}
}
option.ssl_para.key_password = (char *) handle->passwd;
option.ssl_para.key_file = handle->key_file;
option.ssl_para.ca_file = handle->ca_file;
option.ssl_para.cert_file = handle->crt_file;
uint32 channel_idx = is_sync == CM_TRUE ? SYNC_CHANNEL_IDX : ASYNC_CHANNEL_IDX;
uint32 server_idx = (uint32) (is_sync == CM_TRUE ? cm_atomic32_get(&handle->conn_idx) :
cm_atomic32_get(&handle->conn_idx));
if (is_sync) {
handle->channel[channel_idx] = cs_connect_sync_channel(handle->server_texts[server_idx], (void *) handle,
&option);
} else {
handle->channel[channel_idx] = cs_connect_async_channel(handle->server_texts[server_idx], (void *) handle,
&option);
}
if (handle->key_file != NULL) {
MEMS_RETURN_IFERR(memset_sp(handle->passwd, CM_PASSWD_MAX_LEN, 0, CM_PASSWD_MAX_LEN));
}
return handle->channel[channel_idx] == NULL ? CM_ERROR : CM_SUCCESS;
}
static void clt_close_conn(clt_handle_t *handle, bool32 is_sync)
{
status_t ret;
cs_packet_t *send_pack = NULL;
uint32 channel_id = is_sync == CM_TRUE ? SYNC_CHANNEL_IDX : ASYNC_CHANNEL_IDX;
if (handle->channel[channel_id] == NULL) {
return;
}
(void) clt_init_pack(&send_pack, handle->channel[channel_id]);
ret = cs_remote_call_no_wait(handle->channel[channel_id], send_pack, DCC_CMD_DISCONNECT);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("[CLI]send close pack failed");
}
cs_disconnect_channel(handle->channel[channel_id]);
handle->channel[channel_id] = NULL;
}
static void clt_try_next_url(clt_handle_t *handle, bool32 is_sync)
{
uint32 idx = ASYNC_CHANNEL_IDX;
if (is_sync) {
idx = SYNC_CHANNEL_IDX;
handle->sync_connected = CM_FALSE;
} else {
handle->async_connected = CLT_NOT_CONNECTED;
}
clt_close_conn(handle, is_sync);
handle->channel[idx] = NULL;
clt_poll_next_url(&handle->conn_idx, handle->server_cnt);
}
static status_t clt_send_request(clt_handle_t *handle, uint8 cmd, const void *request)
{
cs_packet_t *send_pack;
if (handle->sync_connected == CM_FALSE) {
clt_close_conn(handle, CM_TRUE);
CM_RETURN_IFERR(clt_create_conn(handle, CM_TRUE));
}
CM_RETURN_IFERR(clt_init_pack(&send_pack, handle->channel[SYNC_CHANNEL_IDX]));
CM_RETURN_IFERR(encode_request(cmd, request, &send_pack));
return cs_remote_call(handle->channel[SYNC_CHANNEL_IDX], send_pack, cmd);
}
static status_t clt_rcv_response(clt_handle_t *handle, uint8 cmd)
{
status_t ret;
cs_packet_t *rcv_pack = NULL;
clt_get_rcv_pack(&rcv_pack, handle->channel[SYNC_CHANNEL_IDX]);
if (rcv_pack->head->result != CM_SUCCESS) {
return rcv_pack->head->result;
}
if (cmd == DCC_CMD_GET || cmd == DCC_CMD_FETCH) {
handle->kv_idx = 0;
ret = parse_response(rcv_pack, handle);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("[CLI]parse response failed");
return CM_ERROR;
}
} else if (cmd == DCC_CMD_PUT && handle->is_sequence) {
cs_get_int32(rcv_pack, (int32 *) &handle->sequence_no);
}
return CM_SUCCESS;
}
status_t clt_process_sync_cmd(clt_handle_t *handle, uint8 cmd, void *request, int32 try_cnt)
{
status_t ret;
for (int32 i = 0; i < try_cnt; i++) {
ret = clt_send_request(handle, cmd, request);
if (ret != CM_SUCCESS) {
LOG_DEBUG_ERR("[CLI]failed to send %hhu request", cmd);
clt_try_next_url(handle, CM_TRUE);
continue;
}
ret = clt_rcv_response(handle, cmd);
if (ret == CM_SUCCESS) {
handle->sync_connected = CM_TRUE;
return CM_SUCCESS;
} else {
clt_try_next_url(handle, CM_TRUE);
LOG_DEBUG_ERR("[CLI]failed to receive %hhu response", cmd);
}
}
return CM_ERROR;
}
status_t clt_wait_session_id(clt_handle_t *handle)
{
if (!handle->async_td_created) {
CM_RETURN_IFERR(clt_init_async_conn(handle));
handle->async_td_created = CM_TRUE;
}
uint32 cnt = 0;
date_t now = g_timer()->now;
while (handle->async_connected != CLT_CONNECTED) {
if (cnt < TIMEOUT_ONE_THIRD) {
(void) cm_event_timedwait(&handle->async_channel_event, handle->hb_interval);
} else {
break;
}
cnt++;
}
if (handle->async_connected != CLT_CONNECTED) {
LOG_RUN_ERR("[CLI]the async conn is disconnected, time: %lld", g_timer()->now - now);
return CM_ERROR;
}
return CM_SUCCESS;
}
status_t clt_fetch_from_pack(clt_handle_t *handle, dcc_result_t *result)
{
void *data = NULL;
CM_CHECK_NULL_PTR(result);
if (handle->kv_idx == handle->kv_cnt && handle->eof == CM_TRUE) {
CM_THROW_ERROR(DCC_SRV_KEY_NOT_EXISTED, "");
return CM_ERROR;
}
if (handle->kv_idx == handle->kv_cnt && handle->eof == CM_FALSE) {
LOG_DEBUG_INF("[CLI]fetch another pack of data from: %u", handle->kv_idx);
CM_RETURN_IFERR(clt_process_sync_cmd(handle, DCC_CMD_FETCH, NULL, CLT_NO_TRY_CNT));
LOG_DEBUG_INF("[CLI]fetch another pack of data success");
}
result->eof = ((handle->kv_idx + 1 == handle->kv_cnt) && handle->eof == CM_TRUE) ? CM_TRUE : CM_FALSE;
cs_packet_t *pack = NULL;
clt_get_rcv_pack(&pack, handle->channel[SYNC_CHANNEL_IDX]);
pack->offset = handle->pack_offset;
CM_RETURN_IFERR(cs_get_int32(pack, (int32 *) &result->key_len));
if (result->key_len == 0) {
CM_THROW_ERROR(DCC_CLI_KEY_IS_EMPTY, "");
return CM_ERROR;
}
CM_RETURN_IFERR(cs_get_data(pack, result->key_len, &data));
errno_t errcode = memcpy_sp((void *)result->key, MAX_KEY_SIZE, data, result->key_len);
if (errcode != EOK) {
LOG_RUN_ERR("[CLI]copy key:%u failed", result->key_len);
return CM_ERROR;
}
CM_RETURN_IFERR(cs_get_int32(pack, (int32 *) &result->val_len));
if (result->val_len != 0) {
CM_RETURN_IFERR(cs_get_data(pack, result->val_len, &data));
errcode = memcpy_sp((void *) result->val, MAX_VAL_SIZE, data, result->val_len);
if (errcode != EOK) {
LOG_RUN_ERR("[CLI]copy val:%u failed", result->val_len);
return CM_ERROR;
}
}
handle->kv_idx++;
handle->pack_offset = pack->offset;
LOG_DEBUG_INF("[CLI]fetch data success from: %u", handle->kv_idx);
return CM_SUCCESS;
}
static status_t clt_watch_when_exception(clt_handle_t *handle, uint8 cmd)
{
uint32 cnt = 0;
dcc_string_t text;
dcc_option_t option;
clt_watch_node_t *tmp;
watch_request_t request;
clt_watch_node_t *cur = handle->watch_manager->watch_key_list->first;
while (cur != NULL) {
cnt++;
tmp = cur->next;
text.len = cur->clt_watch_iv.begin.len;
text.data = cur->clt_watch_iv.begin.str;
option.watch_op.prefix = 0;
CM_RETURN_IFERR(clt_wait_session_id(handle));
convert_watch_request(&text, handle->session_id, &option, &request);
CM_RETURN_IFERR(clt_process_sync_cmd(handle, cmd, &request, CLT_TRY_ONCE));
cur = tmp;
}
LOG_DEBUG_INF("[CLI]dcc resuming watch key, cnt: %u", cnt);
cnt = 0;
cur = handle->watch_manager->watch_group_list->first;
while (cur != NULL) {
tmp = cur->next;
text.len = cur->clt_watch_iv.begin.len;
text.data = cur->clt_watch_iv.begin.str;
option.watch_op.prefix = 1;
CM_RETURN_IFERR(clt_wait_session_id(handle));
convert_watch_request(&text, handle->session_id, &option, &request);
CM_RETURN_IFERR(clt_process_sync_cmd(handle, cmd, &request, CLT_TRY_ONCE));
cur = tmp;
}
LOG_DEBUG_INF("[CLI]dcc resuming watch group, cnt: %u", cnt);
return CM_SUCCESS;
}
static status_t clt_resuming_watch(clt_handle_t *handle)
{
status_t ret;
cm_spin_lock(&handle->latch, NULL);
ret = clt_watch_when_exception(handle, DCC_CMD_WATCH);
cm_spin_unlock(&handle->latch);
return ret;
}
status_t clt_parse_children(clt_handle_t *handle, dcc_array_t *result)
{
uint32 idx = 0;
uint32 cnt = 0;
char *key = NULL;
uint32 key_len = 0;
uint32 eof = CM_TRUE;
cs_packet_t *packet = NULL;
result->count = 0;
result->strings = NULL;
clt_get_rcv_pack(&packet, handle->channel[SYNC_CHANNEL_IDX]);
CM_RETURN_IFERR(cs_get_int32(packet, (int32 *) &eof));
CM_RETURN_IFERR(cs_get_int32(packet, (int32 *) &cnt));
if (!eof) {
CM_THROW_ERROR(DCC_SRV_MESSAGE_TOO_LARGE, "");
return CM_ERROR;
}
if (cnt == 0) {
result->strings = NULL;
return CM_SUCCESS;
}
result->strings = (dcc_string_t **) malloc(sizeof(dcc_string_t *) * cnt);
if (result->strings == NULL) {
return CM_ERROR;
}
for (; idx < cnt; idx++) {
CM_RETURN_IFERR(cs_get_int32(packet, (int32 *) &key_len));
if (key_len <= 0) {
CM_THROW_ERROR(DCC_CLI_KEY_IS_EMPTY, "");
return CM_ERROR;
}
CM_RETURN_IFERR(cs_get_data(packet, key_len, (void **) &key));
result->strings[idx] = (dcc_string_t *) malloc(sizeof(dcc_string_t) + key_len);
if (result->strings[idx] == NULL) {
LOG_RUN_ERR("[CLI]alloc memory: %u failed", key_len);
return CM_ERROR;
}
result->count++;
result->strings[idx]->len = key_len;
result->strings[idx]->data = (char *) result->strings[idx] + sizeof(dcc_string_t);
MEMS_RETURN_IFERR(memcpy_sp(result->strings[idx]->data, key_len, key, key_len));
}
return CM_SUCCESS;
}
status_t clt_get_lease_info_from_pack(clt_handle_t *handle, dcc_lease_info_t *lease_info)
{
net_channel_t *chan = (net_channel_t *)(handle->channel[SYNC_CHANNEL_IDX]);
cs_packet_t *pack = &chan->recv_pack;
cs_init_get(pack);
CM_RETURN_IFERR(cs_get_int32(pack, (int32 *)&lease_info->ttl));
CM_RETURN_IFERR(cs_get_int32(pack, (int32 *)&lease_info->remain_ttl));
return CM_SUCCESS;
}
int clt_lease_keep_alive(clt_handle_t *handle, const dcc_string_t *lease_name)
{
status_t ret;
lease_request_t lease_req;
lease_req.lease_name.str = lease_name->data;
lease_req.lease_name.len = lease_name->len;
cm_spin_lock(&handle->latch, NULL);
ret = clt_process_sync_cmd(handle, DCC_CMD_LEASE_RENEW, (void *)&lease_req, handle->server_cnt);
cm_spin_unlock(&handle->latch);
return ret;
}