* Copyright (c) 2021 Huawei Technologies Co.,Ltd.
*
* CM is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* cm_ddb_etcd.cpp
*
*
* IDENTIFICATION
* src/cm_adapter/cm_etcd_adapter/cm_ddb_etcd.cpp
*
* -------------------------------------------------------------------------
*/
#include "cm_ddb_etcd.h"
#include "alarm.h"
#include "cm/cm_elog.h"
#include "cm/cm_c.h"
uint32 g_etcdNum = 0;
static uint32 g_healthEtcdIndex[MAX_ETCD_NODE_NUM] = {0};
static bool g_healthEtcdFlag = false;
static uint32 g_healthEtcdCount = 0;
EtcdTlsAuthPath g_etcdTlsPath = {{0}};
int32 g_timeOut = 0;
static pthread_rwlock_t g_healthEtcdRwlock = PTHREAD_RWLOCK_INITIALIZER;
static ModuleId g_modId = MOD_ALL;
ServerSocket *g_etcdInfo = NULL;
static status_t EtcdLoadApi(const DrvApiInfo *apiInfo);
static DdbDriver g_drvEtcd = {PTHREAD_RWLOCK_INITIALIZER, false, DB_ETCD, "etcd conn", EtcdLoadApi};
static const int ABNORMAL_ETCD_ALARM_LIST_SIZE = 3;
static Alarm g_etcdAlarmList[ABNORMAL_ETCD_ALARM_LIST_SIZE];
enum AbnormalEtcdAlarmItem { ETCD_UNHEALTH = 0, ETCD_DOWN = 1, ETCD_NEAR_QUOTA = 2 };
void CmsEtcdAbnormalAlarmItemInitialize(void)
{
errno_t rc = memset_s(g_etcdAlarmList, sizeof(g_etcdAlarmList), 0, sizeof(g_etcdAlarmList));
securec_check_errno(rc, (void)rc);
AlarmItemInitialize(&(g_etcdAlarmList[ETCD_UNHEALTH]), ALM_AI_AbnormalEtcdUnhealth, ALM_AS_Init, NULL, 0, 0);
AlarmItemInitialize(&(g_etcdAlarmList[ETCD_DOWN]), ALM_AI_AbnormalEtcdDown, ALM_AS_Init, NULL, 0, 0);
AlarmItemInitialize(&(g_etcdAlarmList[ETCD_NEAR_QUOTA]), ALM_AI_AbnormalEtcdNearQuota, ALM_AS_Init, NULL);
}
static void PrintEtcdServerList(const EtcdServerSocket *etcdServerList, uint32 len, int32 logLevel)
{
char serverStr[DDB_MAX_KEY_VALUE_LEN] = {0};
size_t serverSize = 0;
errno_t rc = 0;
for (uint32 i = 0; i < len; ++i) {
serverSize = strlen(serverStr);
if (serverSize >= (DDB_MAX_KEY_VALUE_LEN - 1)) {
break;
}
rc = snprintf_s(serverStr + serverSize, DDB_MAX_KEY_VALUE_LEN - serverSize,
DDB_MAX_KEY_VALUE_LEN - 1 - serverSize, "%s:%u; ", etcdServerList[i].host, etcdServerList[i].port);
securec_check_intval(rc, (void)rc);
}
write_runlog(logLevel, "etcdServerList is %s.\n", serverStr);
}
static status_t InitEtcdServerList(EtcdServerSocket **etcdServerList, const DrvApiInfo *apiInfo, uint32 len)
{
if (len == 0) {
write_runlog(ERROR, "InitEtcdServerList len is 0.\n");
return CM_ERROR;
}
int32 logLevel = (apiInfo->modId == MOD_CMCTL) ? DEBUG5 : ((apiInfo->modId == MOD_CMS) ? LOG : DEBUG1);
uint32 idx = 0;
for (uint32 i = 0; i < apiInfo->serverLen; ++i) {
if (apiInfo->serverList[i].host == NULL || apiInfo->serverList[i].port == 0) {
break;
}
(*etcdServerList)[idx].host = apiInfo->serverList[i].host;
(*etcdServerList)[idx].port = (unsigned short)apiInfo->serverList[i].port;
++idx;
}
if (idx == 0) {
write_runlog(logLevel, "etcdServerList is empty.\n");
FREE_AND_RESET((*etcdServerList));
return CM_ERROR;
}
if (idx < len - 1) {
(*etcdServerList)[idx + 1].host = NULL;
}
logLevel = (apiInfo->modId == MOD_CMCTL) ? DEBUG5 : DEBUG1;
PrintEtcdServerList((*etcdServerList), idx, logLevel);
return CM_SUCCESS;
}
status_t InitEtcdServerSocket(EtcdServerSocket **etcdServerList, const DrvApiInfo *apiInfo)
{
int32 logLevel = (apiInfo->modId == MOD_CMCTL) ? DEBUG1 : LOG;
size_t len = apiInfo->serverLen * sizeof(EtcdServerSocket);
*etcdServerList = (EtcdServerSocket *)malloc(len);
if (*etcdServerList == NULL) {
write_runlog(logLevel, "etcdSeverList is null.\n");
return CM_ERROR;
}
errno_t rc = memset_s(*etcdServerList, len, 0, len);
securec_check_errno(rc, FREE_AND_RESET(*etcdServerList));
status_t st = InitEtcdServerList(etcdServerList, apiInfo, apiInfo->serverLen);
return st;
}
status_t CreateEtcdSession(EtcdSession *session, const DrvApiInfo *apiInfo)
{
int32 logLevel = (apiInfo->modId == MOD_CMCTL) ? DEBUG5 : ((apiInfo->modId == MOD_CMS) ? LOG : DEBUG1);
EtcdServerSocket *etcdServerList = NULL;
status_t st = InitEtcdServerSocket(&etcdServerList, apiInfo);
if (st != CM_SUCCESS) {
return CM_ERROR;
}
if (etcdServerList == NULL) {
write_runlog(logLevel, "line %s:%d, etcdServerList is NULL.\n", __FUNCTION__, __LINE__);
return CM_ERROR;
}
int32 res = etcd_open(session, etcdServerList, &g_etcdTlsPath, apiInfo->timeOut);
FREE_AND_RESET(etcdServerList);
if (res != (int32)ETCD_OK) {
write_runlog(logLevel, "cannot open etcd conn, error is %s.\n", get_last_error());
return CM_ERROR;
}
write_runlog(logLevel, "etcdSession is %d\n", *session);
return CM_SUCCESS;
}
status_t DrvEtcdAllocConn(DrvCon_t *session, const DrvApiInfo *apiInfo)
{
EtcdSession **etcdSession = (EtcdSession **)session;
int32 logLevel = (apiInfo->modId == MOD_CMCTL) ? DEBUG1 : LOG;
*etcdSession = (EtcdSession *)malloc(sizeof(EtcdSession));
if (*etcdSession == NULL) {
write_runlog(logLevel, "%s:%d Failed to malloc etcdSession.\n", __FUNCTION__, __LINE__);
return CM_ERROR;
}
errno_t rc = memset_s(*etcdSession, sizeof(EtcdSession), 0, sizeof(EtcdSession));
securec_check_errno(rc, (void)rc);
status_t st = CreateEtcdSession(*etcdSession, apiInfo);
if (st != CM_SUCCESS) {
FREE_AND_RESET(*session);
}
return st;
}
static status_t DrvEtcdFreeConn(DrvCon_t *session)
{
EtcdSession **etcdSession = (EtcdSession **)session;
int32 res = etcd_close(**etcdSession);
FREE_AND_RESET(*session);
if (res != (int32)ETCD_OK) {
write_runlog(ERROR, "Failed to close etcd, error is %s.\n", get_last_error());
return CM_ERROR;
}
return CM_SUCCESS;
}
status_t DrvEtcdGetValue(const DrvCon_t session, DrvText *key, DrvText *value, const DrvGetOption *option)
{
const EtcdSession *etcdSession = (const EtcdSession *)session;
GetEtcdOption getOption = {false, false, true};
if (option != NULL) {
getOption.quorum = option->quorum;
}
int32 res = etcd_get(*etcdSession, key->data, value->data, (int32)value->len, &getOption);
if (res != (int32)ETCD_OK) {
return CM_ERROR;
}
return CM_SUCCESS;
}
status_t DrvEtcdGetAllKV(
const DrvCon_t session, DrvText *key, DrvKeyValue *keyValue, uint32 length, const DrvGetOption *option)
{
const EtcdSession *etcdSession = (const EtcdSession *)session;
GetEtcdOption getOption = {false, false, true};
if (option != NULL) {
getOption.quorum = option->quorum;
}
char etcdKeyValue[DDB_MAX_KEY_VALUE_LEN] = {0};
int32 res = EtcdGetAllValues(*etcdSession, key->data, etcdKeyValue, &getOption, DDB_MAX_KEY_VALUE_LEN);
if (res != (int32)ETCD_OK) {
return CM_ERROR;
}
write_runlog(
DEBUG1, "get all values by cgo, and key is [%s], result_key_value is [%s].\n", key->data, etcdKeyValue);
errno_t rc = 0;
char *pLeft = NULL;
char *pKey = strtok_r(etcdKeyValue, ",", &pLeft);
char *pValue = strtok_r(NULL, ",", &pLeft);
uint32 i = 0;
while (pKey && pValue) {
rc = snprintf_s(keyValue[i].key, DDB_KEY_LEN, DDB_KEY_LEN - 1, "%s", pKey);
securec_check_intval(rc, (void)rc);
rc = snprintf_s(keyValue[i].value, DDB_VALUE_LEN, DDB_VALUE_LEN - 1, "%s", pValue);
securec_check_intval(rc, (void)rc);
if (++i >= length) {
break;
}
pKey = strtok_r(NULL, ",", &pLeft);
pValue = strtok_r(NULL, ",", &pLeft);
}
if (i == 0) {
write_runlog(
ERROR, "get all values by cgo, and key is [%s], result_key_value is [%s].\n", key->data, etcdKeyValue);
return CM_ERROR;
}
return CM_SUCCESS;
}
static status_t SaveAllKV(const char *key, size_t keyLen, const char *value, size_t valueLen, FILE *fp)
{
if (fp == NULL) {
write_runlog(ERROR, "line:%d, fp is NULL.\n", __LINE__);
return CM_ERROR;
}
if (fwrite(key, keyLen, 1, fp) == 0) {
write_runlog(ERROR, "line:%d, write kv file failed, key(%s).\n", __LINE__, key);
return CM_ERROR;
}
if (fputc('\n', fp) == EOF) {
write_runlog(ERROR, "line:%d, write kv file failed.\n", __LINE__);
return CM_ERROR;
}
if (fwrite(value, valueLen, 1, fp) == 0) {
write_runlog(ERROR, "line:%d, write kv file failed, key(%s).\n", __LINE__, value);
return CM_ERROR;
}
if (fputc('\n', fp) == EOF) {
write_runlog(ERROR, "line:%d, write kv file failed.\n", __LINE__);
return CM_ERROR;
}
return CM_SUCCESS;
}
status_t DrvEtcdSaveAllKV(const DrvCon_t session, const DrvText *key, DrvSaveOption *option)
{
char *pLeft = NULL;
char etcdKeyValue[DDB_MAX_KEY_VALUE_LEN] = {0};
const EtcdSession *etcdSession = (const EtcdSession *)session;
GetEtcdOption getOption = {false, false, true};
if (EtcdGetAllValues(*etcdSession, key->data, etcdKeyValue, &getOption, DDB_MAX_KEY_VALUE_LEN) != (int32)ETCD_OK) {
return CM_ERROR;
}
write_runlog(DEBUG1, "get all values by cgo, and key is \"\", result_key_value is [%s].\n", etcdKeyValue);
char *pKey = strtok_r(etcdKeyValue, ",", &pLeft);
char *pValue = strtok_r(NULL, ",", &pLeft);
if (option->kvFile == NULL) {
write_runlog(ERROR, "open kvs file is null.\n");
return CM_ERROR;
}
canonicalize_path(option->kvFile);
FILE *fp = fopen(option->kvFile, "w+");
if (fp == NULL) {
write_runlog(ERROR, "open kvs file \"%s\" failed.\n", option->kvFile);
return CM_ERROR;
}
while (pKey && pValue) {
if (SaveAllKV(pKey, strlen(pKey), pValue, strlen(pValue), fp) != CM_SUCCESS) {
(void)fclose(fp);
return CM_ERROR;
}
pKey = strtok_r(NULL, ",", &pLeft);
pValue = strtok_r(NULL, ",", &pLeft);
}
(void)fclose(fp);
return CM_SUCCESS;
}
status_t DrvEtcdSetKV(const DrvCon_t session, DrvText *key, DrvText *value, DrvSetOption *option)
{
const EtcdSession *etcdSession = (const EtcdSession *)session;
int32 res = 0;
if (option == NULL) {
res = etcd_set(*etcdSession, key->data, value->data, NULL);
} else {
SetEtcdOption setOption = {0};
setOption.prevValue = option->preValue;
res = etcd_set(*etcdSession, key->data, value->data, &setOption);
}
if (res != (int32)ETCD_OK) {
return CM_ERROR;
}
return CM_SUCCESS;
}
status_t DrvEtcdDelKV(const DrvCon_t session, DrvText *key)
{
const EtcdSession *etcdSession = (const EtcdSession *)session;
int32 res = etcd_delete(*etcdSession, key->data, NULL);
if (res != (int32)ETCD_OK) {
return CM_ERROR;
}
return CM_SUCCESS;
}
status_t DrvEtcdNodeHealth(DrvCon_t session, char *memberName, DdbNodeState *nodeState)
{
EtcdSession *etcdSession = (EtcdSession *)session;
char health[ETCD_STATE_LEN] = {0};
int32 res = etcd_cluster_health(*etcdSession, memberName, health, ETCD_STATE_LEN);
if (res != (int)ETCD_OK) {
nodeState->health = DDB_STATE_DOWN;
return CM_ERROR;
}
if (strcmp(health, "healthy") != 0) {
nodeState->health = DDB_STATE_DOWN;
return CM_ERROR;
}
nodeState->health = DDB_STATE_HEALTH;
return CM_SUCCESS;
}
status_t DrvEtcdNodeState(DrvCon_t session, char *memberName, DdbNodeState *nodeState)
{
EtcdSession *etcdSession = (EtcdSession *)session;
char health[ETCD_STATE_LEN] = {0};
int32 res = etcd_cluster_health(*etcdSession, memberName, health, ETCD_STATE_LEN);
if (res != (int32)ETCD_OK) {
nodeState->health = DDB_STATE_DOWN;
nodeState->role = DDB_ROLE_UNKNOWN;
return CM_ERROR;
}
if (strcmp(health, "healthy") != 0) {
nodeState->health = DDB_STATE_DOWN;
nodeState->role = DDB_ROLE_UNKNOWN;
return CM_ERROR;
}
nodeState->health = DDB_STATE_HEALTH;
bool isLeader = false;
res = etcd_cluster_state(*etcdSession, memberName, &isLeader);
if (res != (int32)ETCD_OK) {
nodeState->role = DDB_ROLE_UNKNOWN;
return CM_ERROR;
}
if (isLeader) {
nodeState->role = DDB_ROLE_LEADER;
} else {
nodeState->role = DDB_ROLE_FOLLOWER;
}
return CM_SUCCESS;
}
static status_t GetEtcdNodeHealth(uint32 idx, DdbNodeState *nodeState, int timeOut)
{
int logLevel = (g_modId == MOD_CMCTL) ? DEBUG1 : ERROR;
const uint32 serverLen = 2;
EtcdServerSocket server[serverLen] = {{0}};
server[0].host = g_etcdInfo[idx].host;
server[0].port = (unsigned short)g_etcdInfo[idx].port;
server[1].host = NULL;
EtcdSession sess = 0;
if (etcd_open(&sess, server, &g_etcdTlsPath, timeOut) != 0) {
write_runlog(logLevel, "open etcd server %s failed: %s.\n", server[0].host, get_last_error());
return CM_TIMEDOUT;
}
status_t st = DrvEtcdNodeHealth((DrvCon_t)(&sess), g_etcdInfo[idx].nodeInfo.nodeName, nodeState);
if (etcd_close(sess) != 0) {
write_runlog(logLevel, "line %s %d: cannot free conn, error is %s.\n", __FUNCTION__, __LINE__,
get_last_error());
}
if (st == CM_ERROR) {
write_runlog(logLevel, "line %s %d: cannot get ddbInstance, error is %s.\n", __FUNCTION__, __LINE__,
get_last_error());
return CM_ERROR;
}
return st;
}
static status_t GetEtcdNodeState(uint32 idx, DdbNodeState *nodeState)
{
int logLevel = (g_modId == MOD_CMCTL) ? DEBUG1 : ERROR;
const uint32 serverLen = 2;
EtcdServerSocket server[serverLen] = {{0}};
server[0].host = g_etcdInfo[idx].host;
server[0].port = (unsigned short)g_etcdInfo[idx].port;
server[1].host = NULL;
EtcdSession sess = 0;
if (etcd_open(&sess, server, &g_etcdTlsPath, g_timeOut) != 0) {
write_runlog(logLevel, "open etcd server %s failed: %s.\n", server[0].host, get_last_error());
return CM_TIMEDOUT;
}
status_t st = DrvEtcdNodeState((DrvCon_t)(&sess), g_etcdInfo[idx].nodeInfo.nodeName, nodeState);
if (etcd_close(sess) != 0) {
write_runlog(logLevel, "line %s %d: cannot free conn, error is %s.\n",
__FUNCTION__, __LINE__, get_last_error());
}
if (st == CM_ERROR) {
write_runlog(logLevel, "line %s %d: cannot get ddbInstance, error is %s.\n",
__FUNCTION__, __LINE__, get_last_error());
return CM_ERROR;
}
return st;
}
static status_t EtcdNodeIsHealth(uint32 idx, int timeOut)
{
int logLevel = (g_modId == MOD_CMCTL) ? DEBUG1 : ERROR;
DdbNodeState nodeState;
errno_t rc = memset_s(&nodeState, sizeof(DdbNodeState), 0, sizeof(DdbNodeState));
securec_check_errno(rc, (void)rc);
status_t st = GetEtcdNodeHealth(idx, &nodeState, timeOut);
if (st != CM_SUCCESS) {
return st;
}
write_runlog(DEBUG5, "line %s %d: nodeState heal is %d, role is %d.\n",
__FUNCTION__, __LINE__, (int32)nodeState.health, (int32)nodeState.role);
if (nodeState.health != DDB_STATE_HEALTH) {
write_runlog(logLevel, "line %s %d: node (%s)is unhealth.\n", __FUNCTION__, __LINE__,
g_etcdInfo[idx].nodeInfo.nodeName);
return CM_ERROR;
}
return CM_SUCCESS;
}
static uint32 GetHealthEtcdFromAllEtcd(int timeOut)
{
uint32 healthCount = 0;
uint32 unhealthCount = 0;
bool findUnhealth = true;
status_t st = CM_SUCCESS;
uint32 healthEtcdIndex[MAX_ETCD_NODE_NUM] = {0};
for (uint32 i = 0; i < g_etcdNum; i++) {
st = EtcdNodeIsHealth(i, timeOut);
if (st == CM_TIMEDOUT) {
continue;
}
if (st == CM_ERROR) {
++unhealthCount;
} else if (st == CM_SUCCESS) {
healthEtcdIndex[healthCount] = i;
++healthCount;
}
if (healthCount > g_etcdNum / 2) {
findUnhealth = false;
break;
}
if (unhealthCount > g_etcdNum / 2) {
break;
}
}
(void)pthread_rwlock_wrlock(&g_healthEtcdRwlock);
g_healthEtcdCount = 0;
for (uint32 i = 0; i < healthCount; ++i) {
g_healthEtcdIndex[i] = healthEtcdIndex[i];
}
if (findUnhealth) {
g_healthEtcdFlag = false;
} else {
g_healthEtcdFlag = true;
g_healthEtcdCount = healthCount;
}
(void)pthread_rwlock_unlock(&g_healthEtcdRwlock);
return healthCount;
}
static uint32 GetHealthEtcdNodeCount(int timeOut)
{
uint32 i;
uint32 healthCount = 0;
status_t st = CM_SUCCESS;
if (g_healthEtcdFlag) {
for (i = 0; i < g_healthEtcdCount; i++) {
uint32 etcdIndex = g_healthEtcdIndex[i];
if (etcdIndex >= g_etcdNum) {
break;
}
st = EtcdNodeIsHealth(etcdIndex, timeOut);
if (st != CM_SUCCESS) {
break;
}
++healthCount;
if (healthCount > g_etcdNum / 2) {
return healthCount;
}
}
}
return GetHealthEtcdFromAllEtcd(timeOut);
}
bool IsEtcdHealth(DDB_CHECK_MOD checkMod, int timeOut)
{
if (g_etcdNum == 0) {
return true;
}
if (checkMod == DDB_HEAL_COUNT) {
uint32 healCount = GetHealthEtcdNodeCount(timeOut);
if (healCount <= g_etcdNum / 2) {
return false;
}
} else if (checkMod == DDB_PRE_CONN) {
if (g_healthEtcdCountForPreConn <= g_etcdNum / 2) {
return false;
}
}
return true;
}
static void DrvEtcdFreeInfo(void)
{
if (g_modId != MOD_CMCTL) {
return;
}
FREE_AND_RESET(g_etcdInfo);
}
static status_t InitEtcdTlsPath(const TlsAuthPath *tlsPath)
{
int32 logLevel = (g_modId == MOD_CMCTL) ? DEBUG5 : DEBUG1;
if (tlsPath == NULL) {
logLevel = (g_modId == MOD_CMCTL) ? DEBUG1 : ERROR;
write_runlog(logLevel, "line %s: %d tlsPath is null.\n", __FUNCTION__, __LINE__);
return CM_ERROR;
}
write_runlog(logLevel, "init: ca: %s, crt: %s, key: %s.\n", tlsPath->caFile, tlsPath->crtFile, tlsPath->keyFile);
errno_t rc = memcpy_s(g_etcdTlsPath.etcd_ca_path, ETCD_MAX_PATH_LEN - 1, tlsPath->caFile, DDB_MAX_PATH_LEN - 1);
securec_check_errno(rc, (void)rc);
rc = memcpy_s(g_etcdTlsPath.client_crt_path, ETCD_MAX_PATH_LEN - 1, tlsPath->crtFile, DDB_MAX_PATH_LEN - 1);
securec_check_errno(rc, (void)rc);
rc = memcpy_s(g_etcdTlsPath.client_key_path, ETCD_MAX_PATH_LEN - 1, tlsPath->keyFile, DDB_MAX_PATH_LEN - 1);
securec_check_errno(rc, (void)rc);
write_runlog(logLevel, "end: ca: %s, crt: %s, key: %s.\n", g_etcdTlsPath.etcd_ca_path,
g_etcdTlsPath.client_crt_path, g_etcdTlsPath.client_key_path);
return CM_SUCCESS;
}
static void PrintEtcdInfo()
{
int32 logLevel = (g_modId == MOD_CMCTL) ? DEBUG1 : LOG;
char etcdStr[DDB_MAX_KEY_VALUE_LEN] = {0};
size_t etcdSize = 0;
errno_t rc = 0;
for (uint32 i = 0; i < g_etcdNum; ++i) {
etcdSize = strlen(etcdStr);
if (etcdSize >= DDB_MAX_KEY_VALUE_LEN) {
break;
}
rc = snprintf_s(etcdStr + etcdSize, DDB_MAX_KEY_VALUE_LEN - etcdSize, DDB_MAX_KEY_VALUE_LEN - 1 - etcdSize,
"%s:%u:%s:%u:%u:%s; ", g_etcdInfo[i].host, g_etcdInfo[i].port, g_etcdInfo[i].nodeInfo.nodeName,
g_etcdInfo[i].nodeIdInfo.nodeId, g_etcdInfo[i].nodeIdInfo.instd, g_etcdInfo[i].nodeIdInfo.azName);
securec_check_intval(rc, (void)rc);
}
write_runlog(logLevel, "etcdStr is %s.\n", etcdStr);
}
static status_t InitEtcdInfoEx(const DrvApiInfo *apiInfo)
{
int32 logLevel = (apiInfo->modId == MOD_CMCTL) ? DEBUG1 : LOG;
g_etcdNum = apiInfo->nodeNum;
if (g_etcdNum == 0) {
write_runlog(ERROR, "g_etcdNum is 0.\n");
return CM_ERROR;
}
size_t len = g_etcdNum * sizeof(ServerSocket);
g_etcdInfo = (ServerSocket *)malloc(len);
if (g_etcdInfo == NULL) {
write_runlog(logLevel, "g_etcdInfo malloc failed.\n");
return CM_ERROR;
}
errno_t rc = memset_s(g_etcdInfo, len, 0, len);
securec_check_errno(rc, FREE_AND_RESET(g_etcdInfo));
uint32 idx = 0;
for (uint32 i = 0; i < apiInfo->serverLen; ++i) {
if (idx >= g_etcdNum) {
break;
}
ServerSocket *server = &apiInfo->serverList[i];
if (server->host == NULL || server->port == 0 || server->nodeInfo.nodeName == NULL ||
server->nodeInfo.len == 0) {
continue;
}
g_etcdInfo[idx] = *server;
++idx;
}
if (idx == 0 || idx != g_etcdNum) {
write_runlog(logLevel, "%s: %d, failed to init etcd info, idx: %u, g_etcdNum is %u.\n",
__FUNCTION__, __LINE__, idx, g_etcdNum);
FREE_AND_RESET(g_etcdInfo);
return CM_ERROR;
}
PrintEtcdInfo();
return CM_SUCCESS;
}
static status_t DrvEtcdLeaderNodeId(NodeIdInfo *idInfo, const char *azName)
{
DdbNodeState nodeState = {DDB_STATE_HEALTH};
status_t st = CM_SUCCESS;
for (uint32 i = 0; i < g_etcdNum; ++i) {
if (g_etcdInfo[i].nodeIdInfo.azName == NULL) {
write_runlog(ERROR, "[DrvEtcdLeaderNodeId]: i=%u, azName is NULL.\n", i);
return CM_ERROR;
}
if ((azName != NULL) && (strcmp(g_etcdInfo[i].nodeIdInfo.azName, azName) != 0)) {
continue;
}
st = GetEtcdNodeState(i, &nodeState);
if (st != CM_SUCCESS) {
continue;
}
if (nodeState.role == DDB_ROLE_LEADER) {
idInfo->azName = g_etcdInfo[i].nodeIdInfo.azName;
idInfo->nodeId = g_etcdInfo[i].nodeIdInfo.nodeId;
return CM_SUCCESS;
}
}
return CM_ERROR;
}
status_t InitEtcdInfo(const DrvApiInfo *apiInfo)
{
g_modId = apiInfo->modId;
g_timeOut = apiInfo->timeOut;
status_t st = InitEtcdTlsPath(apiInfo->client_t.tlsPath);
CM_RETURN_IFERR(st);
st = InitEtcdInfoEx(apiInfo);
return st;
}
Alarm *DrvEtcdGetAlarm(int alarmIndex)
{
if (alarmIndex < 0 || alarmIndex >= ABNORMAL_ETCD_ALARM_LIST_SIZE) {
return NULL;
}
return &g_etcdAlarmList[alarmIndex];
}
static status_t DrvEtcdSetParam(const char *key, const char *value)
{
if (key == NULL || value == NULL) {
write_runlog(ERROR, "failed to set dcc param, because key(%s) or value(%s) is null.\n", key, value);
return CM_ERROR;
}
return CM_SUCCESS;
}
static status_t DrvEtcdStop(bool *ddbStop)
{
*ddbStop = true;
return CM_SUCCESS;
}
static status_t EtcdLoadApi(const DrvApiInfo *apiInfo)
{
CmsEtcdAbnormalAlarmItemInitialize();
DdbDriver *drv = DrvEtcdGet();
drv->allocConn = DrvEtcdAllocConn;
drv->freeConn = DrvEtcdFreeConn;
drv->getValue = DrvEtcdGetValue;
drv->getAllKV = DrvEtcdGetAllKV;
drv->saveAllKV = DrvEtcdSaveAllKV;
drv->setKV = DrvEtcdSetKV;
drv->delKV = DrvEtcdDelKV;
drv->drvNodeState = DrvEtcdNodeState;
drv->lastError = get_last_error;
drv->isHealth = IsEtcdHealth;
drv->freeNodeInfo = DrvEtcdFreeInfo;
drv->notifyDdb = DrvNotifyEtcd;
drv->setMinority = DrvEtcdSetMinority;
drv->getAlarm = DrvEtcdGetAlarm;
drv->leaderNodeId = DrvEtcdLeaderNodeId;
drv->restConn = DrvEtcdRestConn;
drv->setParam = DrvEtcdSetParam;
drv->stop = DrvEtcdStop;
status_t st = InitEtcdInfo(apiInfo);
if (st != CM_SUCCESS) {
return CM_ERROR;
}
st = CreateEtcdThread(apiInfo);
return st;
}
DdbDriver *DrvEtcdGet(void)
{
return &g_drvEtcd;
}