* Copyright (c) 2021 Huawei Technologies Co.,Ltd.
*
* CM is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* cms_arbitrate_synclist.cpp
* DN synclist arbitrate
*
* IDENTIFICATION
* src/cm_server/cms_arbitrate_synclist.cpp
*
* -------------------------------------------------------------------------
*/
#include "cm/cm_elog.h"
#include "cms_ddb.h"
#include "cms_az.h"
#include "cms_alarm.h"
#include "cms_process_messages.h"
#include "cms_common.h"
static void SyncCurrentWithExceptSyncList(uint32 groupIndex, bool isCurSameWithExpect);
static bool CompareDnOnlineWithExpectSyncList(const DatanodeDynamicStatus *statusDn, const DatanodeSyncList *syncList);
static int SetCurSyncListStatusValue(uint32 groupIndex, char *value, size_t len);
static void SetInitWaitReduceOrIncreaseTime();
static bool CompareMemberSyncWithExceptSyncList(
const DatanodeSyncList *memberSyncList, const DatanodeSyncList *expectSyncList);
static bool GetHistoryClusterSyncListFromDdb();
static void DoReduceSyncList(uint32 groupIndex, const CurrentInstanceStatus *statusInstance, int printTime,
cm_instance_report_status *reportStatus);
static void DoIncreaseSyncList(
uint32 groupIndex, const CurrentInstanceStatus *statusInstance, cm_instance_report_status *reportStatus);
static bool IsDoReduceOrIncreaseSyncList(uint32 groupIndex, const DatanodeDynamicStatus *statusDnOnline,
cm_instance_report_status *reportStatus, DatanodeDynamicStatus *historyDnOnline, bool isCurSameWithExpect);
static void PrintLogMsg(uint32 groupIndex, const CurrentInstanceStatus *statusInstance, const char *value);
static void PrintReduceOrIncreaseMsg(uint32 groupIndex, const cm_instance_report_status *reportStatus,
const char *value, bool isReduce, const CurrentInstanceStatus *statusInstance);
static bool IsDoGsGucFlag(uint32 groupIndex);
const int DELAY_TIME_TO_INCREASE_STANDBY = 300;
const int DELAY_TIME_TO_REDUCE_STANDBY = 12;
const int ONE_PRIMARY_ONE_SLAVE = 2;
const int SYNC_LIST_TIMES = 1;
void GetSyncListString(const DatanodeSyncList *syncList, char *syncListString, size_t maxLen)
{
errno_t rc = 0;
size_t strLen = 0;
if (syncList->count == 0) {
rc = strcpy_s(syncListString, maxLen, "sync list is empty");
securec_check_errno(rc, (void)rc);
return;
}
if (maxLen <= 1) {
write_runlog(ERROR, "maxLen is 1 or 0.\n");
return;
}
for (int index = 0; index < syncList->count; ++index) {
strLen = strlen(syncListString);
if (strLen >= (maxLen - 1)) {
return;
}
if (index == syncList->count - 1) {
rc = snprintf_s(
syncListString + strLen, maxLen - strLen, (maxLen - strLen) - 1, "%u", syncList->dnSyncList[index]);
} else {
rc = snprintf_s(
syncListString + strLen, maxLen - strLen, (maxLen - strLen) - 1, "%u, ", syncList->dnSyncList[index]);
}
securec_check_intval(rc, (void)rc);
}
}
void GetDnDynamicStatus(uint32 groupIndex, CurrentInstanceStatus *statusInstance, char *value, int32 valueLen)
{
cm_instance_role_group *dnRoleGroup = &g_instance_role_group_ptr[groupIndex];
cm_instance_datanode_report_status *dnReport =
g_instance_group_report_status_ptr[groupIndex].instance_status.data_node_member;
int32 dnFailCount = 0;
int32 dnOnlineCount = 0;
int32 dnPrimaryCount = 0;
int32 dnVoteAzCount = 0;
int32 normalPriCount = 0;
bool result = false;
for (int32 i = 0; i < dnRoleGroup->count; i++) {
result = IsCurInstanceInVoteAz(groupIndex, i);
if (result) {
if (i < valueLen) {
value[i] = INSTANCE_DATA_IN_VOTE + '0';
}
statusInstance->statusDnVoteAz.dnStatus[dnVoteAzCount++] = dnRoleGroup->instanceMember[i].instanceId;
continue;
}
cm_local_replconninfo *localSta = &(dnReport[i].local_status);
if (localSta->local_role == INSTANCE_ROLE_UNKNOWN) {
statusInstance->statusDnFail.dnStatus[dnFailCount++] = dnRoleGroup->instanceMember[i].instanceId;
if (i < valueLen) {
value[i] = INSTANCE_DATA_NO_REDUCED + '0';
}
continue;
}
statusInstance->statusDnOnline.dnStatus[dnOnlineCount++] = dnRoleGroup->instanceMember[i].instanceId;
if (i < valueLen) {
value[i] = INSTANCE_DATA_REDUCED + '0';
}
if (localSta->local_role == INSTANCE_ROLE_PRIMARY) {
statusInstance->statusPrimary.dnStatus[dnPrimaryCount++] = dnRoleGroup->instanceMember[i].instanceId;
bool res = (localSta->db_state == INSTANCE_HA_STATE_NORMAL && localSta->term != InvalidTerm);
if (res) {
statusInstance->norPrimary.dnStatus[normalPriCount++] = dnRoleGroup->instanceMember[i].instanceId;
}
}
}
statusInstance->statusDnFail.count = dnFailCount;
statusInstance->statusDnOnline.count = dnOnlineCount;
statusInstance->statusPrimary.count = dnPrimaryCount;
statusInstance->statusDnVoteAz.count = dnVoteAzCount;
statusInstance->norPrimary.count = normalPriCount;
write_runlog(DEBUG1, "line %d: instanceId(%u), statusDnOnline=%d, statusDnFail=%d, statusPrimary=%d, "
"normalPrimary=%d statusDnVoteAz=%d, value=%s.\n",
__LINE__, dnRoleGroup->instanceMember[0].instanceId, statusInstance->statusDnOnline.count,
statusInstance->statusDnFail.count, statusInstance->statusPrimary.count, statusInstance->norPrimary.count,
statusInstance->statusDnVoteAz.count, value);
}
void MemsetDnStatus(CurrentInstanceStatus *statusInstance, char *value, size_t len)
{
errno_t rc = memset_s(statusInstance, sizeof(CurrentInstanceStatus), 0, sizeof(CurrentInstanceStatus));
securec_check_errno(rc, (void)rc);
rc = memset_s(value, len, '\0', len);
securec_check_errno(rc, (void)rc);
}
bool CompareHistorywithCurrOnline(
const DatanodeDynamicStatus *statusDnOnline, const DatanodeDynamicStatus *historyDnOnline)
{
if (statusDnOnline->count != historyDnOnline->count) {
return false;
}
for (int i = 0; i < statusDnOnline->count; ++i) {
if (statusDnOnline->dnStatus[i] != historyDnOnline->dnStatus[i]) {
return false;
}
}
return true;
}
bool IsInstanceIdInSyncList(uint32 instanceId, const DatanodeSyncList *syncList)
{
if (syncList->count == 0) {
write_runlog(DEBUG1, "The sync list is empty.\n");
return true;
}
for (int i = 0; i < syncList->count; ++i) {
if (instanceId == syncList->dnSyncList[i]) {
return true;
}
}
return false;
}
void ReportAlarmSyncList(uint32 groupIndex, bool isIncrease)
{
DatanodeSyncList *currentSyncList =
&(g_instance_group_report_status_ptr[groupIndex].instance_status.currentSyncList);
for (int i = 0; i < currentSyncList->count; ++i) {
ReportIncreaseOrReduceAlarm(ALM_AT_Event, currentSyncList->dnSyncList[i], isIncrease);
}
}
static void SyncCurrentWithExceptSyncList(uint32 groupIndex, bool isCurSameWithExpect)
{
if (isCurSameWithExpect) {
return;
}
errno_t rc = 0;
char statusKey[MAX_PATH_LEN] = {0};
char statusValue[MAX_PATH_LEN] = {0};
bool isIncrease = false;
uint32 instanceId = g_instance_role_group_ptr[groupIndex].instanceMember[0].instanceId;
bool hasDoGsGucFlag = IsDoGsGucFlag(groupIndex);
cm_instance_report_status *reportGrp = &(g_instance_group_report_status_ptr[groupIndex].instance_status);
if (reportGrp->currentSyncList.count < reportGrp->exceptSyncList.count) {
isIncrease = true;
}
if (hasDoGsGucFlag) {
reportGrp->waitSyncTime++;
if (reportGrp->waitSyncTime <= SYNC_LIST_TIMES) {
write_runlog(LOG, "instd(%u) time is [%d/%d] delay to sync curSyncList.\n",
instanceId, reportGrp->waitSyncTime, SYNC_LIST_TIMES);
return;
}
int doResult = SetCurSyncListStatusValue(groupIndex, statusValue, sizeof(statusValue));
if (doResult == -1) {
return;
}
rc = snprintf_s(statusKey, MAX_PATH_LEN, MAX_PATH_LEN - 1, "/%s/CMServer/DnCurSyncList", pw->pw_name);
securec_check_intval(rc, (void)rc);
status_t st = SetKV2Ddb(statusKey, MAX_PATH_LEN, statusValue, MAX_PATH_LEN, NULL);
if (st != CM_SUCCESS) {
write_runlog(ERROR, "%u:ddb set failed. key=%s,value=%s.\n", instanceId, statusKey, statusValue);
return;
}
write_runlog(LOG, "%u: ddb set status DnCurSyncList success, key=%s, value=%s.\n", instanceId, statusKey,
statusValue);
rc = memcpy_s(&(reportGrp->currentSyncList), sizeof(DatanodeSyncList), &(reportGrp->exceptSyncList),
sizeof(DatanodeSyncList));
securec_check_errno(rc, (void)rc);
current_cluster_az_status = AnyFirstNo;
ReportAlarmSyncList(groupIndex, isIncrease);
reportGrp->waitSyncTime = 0;
}
}
static bool PrimaryDnSyncDone(uint32 groupIdx, int32 memIdx)
{
cm_instance_datanode_report_status *dnReport =
&(g_instance_group_report_status_ptr[groupIdx].instance_status.data_node_member[memIdx]);
if (dnReport->local_status.local_role != INSTANCE_ROLE_PRIMARY) {
return true;
}
uint32 instId = g_instance_role_group_ptr[groupIdx].instanceMember[memIdx].instanceId;
if (dnReport->syncDone == SUCCESS_SYNC_DATA) {
write_runlog(LOG, "dn primary instance %u, it has finished sync.\n", instId);
return true;
}
static int32 times = 0;
const int32 printLogTimes = 20;
if (times >= printLogTimes || log_min_messages <= DEBUG1) {
write_runlog(LOG, "dn primary instance %u, it has not finished sync yet.\n", instId);
times = 0;
}
++times;
return false;
}
static bool IsDoGsGucFlag(uint32 groupIndex)
{
char doGsGuc[MAX_PATH_LEN] = {0};
char expectSyncListStr[MAX_PATH_LEN] = {0};
bool hasDoGsGucFlag = true;
for (int32 i = 0; i < g_instance_role_group_ptr[groupIndex].count; ++i) {
if (!PrimaryDnSyncDone(groupIndex, i)) {
return false;
}
if (CompareMemberSyncWithExceptSyncList(
&(g_instance_group_report_status_ptr[groupIndex].instance_status.data_node_member[i].dnSyncList),
&(g_instance_group_report_status_ptr[groupIndex].instance_status.exceptSyncList))) {
doGsGuc[i] = '1';
continue;
}
doGsGuc[i] = '0';
if (!IsInstanceIdInSyncList(g_instance_role_group_ptr[groupIndex].instanceMember[i].instanceId,
&(g_instance_group_report_status_ptr[groupIndex].instance_status.exceptSyncList))) {
continue;
}
hasDoGsGucFlag = false;
break;
}
GetSyncListString(&(g_instance_group_report_status_ptr[groupIndex].instance_status.exceptSyncList),
expectSyncListStr, sizeof(expectSyncListStr));
write_runlog(LOG, "instanceId(%u) doGsGuc is [%s], expectSyncList is [%s].\n",
g_instance_role_group_ptr[groupIndex].instanceMember[0].instanceId, doGsGuc, expectSyncListStr);
return hasDoGsGucFlag;
}
static bool CompareDnOnlineWithExpectSyncList(const DatanodeDynamicStatus *statusDn, const DatanodeSyncList *syncList)
{
if (statusDn == NULL || syncList == NULL) {
return false;
}
if (statusDn->count != syncList->count) {
return false;
}
for (int i = 0; i < statusDn->count; ++i) {
if (statusDn->dnStatus[i] != syncList->dnSyncList[i]) {
return false;
}
}
return true;
}
static int SetCurSyncListStatusValue(uint32 groupIndex, char *value, size_t len)
{
uint32 instanceId = 6001;
uint32 curInstanceId = g_instance_role_group_ptr[groupIndex].instanceMember[0].instanceId;
uint32 tempInstanceId = 0;
errno_t rc = memset_s(value, len, '0', len - 1);
securec_check_errno(rc, (void)rc);
value[len - 1] = '\0';
for (uint32 i = 0; i < g_dynamic_header->relationCount; ++i) {
if (g_instance_role_group_ptr[i].instanceMember[0].instanceType != INSTANCE_TYPE_DATANODE) {
continue;
}
tempInstanceId = g_instance_role_group_ptr[i].instanceMember[0].instanceId;
cm_instance_report_status *reportStatus = &(g_instance_group_report_status_ptr[i].instance_status);
if (groupIndex == i) {
if (reportStatus->exceptSyncList.count <= 0) {
write_runlog(ERROR, "line %d: curInstanceId(%u), instanceId(%u) expectSyncList is empty, cannot set "
"currentSyncList.\n", __LINE__, curInstanceId, tempInstanceId);
return -1;
}
for (int index = 0; index < reportStatus->exceptSyncList.count; ++index) {
value[reportStatus->exceptSyncList.dnSyncList[index] - instanceId] = INSTANCE_DATA_REDUCED + '0';
}
for (int j = 0; j < reportStatus->voteAzInstance.count; ++j) {
value[reportStatus->voteAzInstance.dnStatus[j] - instanceId] = INSTANCE_DATA_IN_VOTE + '0';
}
} else {
if (reportStatus->currentSyncList.count <= 0) {
write_runlog(ERROR, "line %d: curInstanceId(%u), instanceId(%u) currentSyncList is empty, cannot set "
"currentSyncList.\n", __LINE__, curInstanceId, tempInstanceId);
return -1;
}
for (int index = 0; index < reportStatus->currentSyncList.count; ++index) {
value[reportStatus->currentSyncList.dnSyncList[index] - instanceId] = INSTANCE_DATA_REDUCED + '0';
}
for (int j = 0; j < reportStatus->voteAzInstance.count; ++j) {
value[reportStatus->voteAzInstance.dnStatus[j] - instanceId] = INSTANCE_DATA_IN_VOTE + '0';
}
}
}
return 0;
}
static void SetInitWaitReduceOrIncreaseTime()
{
for (uint32 i = 0; i < g_dynamic_header->relationCount; ++i) {
if (g_instance_role_group_ptr[i].instanceMember[0].instanceType != INSTANCE_TYPE_DATANODE) {
continue;
}
g_instance_group_report_status_ptr[i].instance_status.waitReduceTimes = DELAY_TIME_TO_REDUCE_STANDBY;
g_instance_group_report_status_ptr[i].instance_status.waitIncreaseTimes = DELAY_TIME_TO_INCREASE_STANDBY;
}
}
void ComputeTimeForArbitrate(struct timeval checkBegin, struct timeval checkEnd, int *printTime)
{
int arbitrateTime = 2;
uint32 sleepInterval = 1000000;
(void)gettimeofday(&checkEnd, NULL);
++(*printTime);
if (*printTime > MAX_VALUE_OF_PRINT) {
*printTime = 0;
}
uint32 usedTime = (uint32)GetTimeMinus(checkEnd, checkBegin);
if ((checkEnd.tv_sec - checkBegin.tv_sec) > arbitrateTime) {
write_runlog(LOG, "it take %u us for group arbitrate.\n", usedTime);
}
if (sleepInterval > usedTime) {
CmUsleep(sleepInterval - usedTime);
}
}
static void UpdateSyncListStat(maintenance_mode upgradeMode)
{
if (upgradeMode != MAINTENANCE_MODE_NONE) {
g_isEnableUpdateSyncList = SYNCLIST_THREADS_IN_MAINTENANCE;
return;
}
if (!IsDdbHealth(DDB_PRE_CONN)) {
g_isEnableUpdateSyncList = SYNCLIST_THREADS_IN_DDB_BAD;
return;
}
g_isEnableUpdateSyncList = SYNCLIST_THREADS_IN_SLEEP;
}
static bool IsSyncListNumZero()
{
for (uint32 i = 0; i < g_dynamic_header->relationCount; ++i) {
if (g_instance_role_group_ptr[i].instanceMember[0].instanceType != INSTANCE_TYPE_DATANODE) {
continue;
}
if (g_instance_group_report_status_ptr[i].instance_status.currentSyncList.count <= 0 ||
g_instance_group_report_status_ptr[i].instance_status.exceptSyncList.count <= 0) {
write_runlog(LOG, "group(%u) curr sync list num is %d, except sync list num is %d.\n",
i, g_instance_group_report_status_ptr[i].instance_status.currentSyncList.count,
g_instance_group_report_status_ptr[i].instance_status.exceptSyncList.count);
return true;
}
}
return false;
}
static void GetHistoryClusterSyncListWhenEmptySyncList()
{
static int32 cmsRole = CM_SERVER_UNKNOWN;
if (cmsRole == g_HA_status->local_role) {
if (cmsRole != CM_SERVER_PRIMARY) {
return;
}
} else {
write_runlog(LOG, "last cmsRole is %d, and current cms role is %d.\n", cmsRole, g_HA_status->local_role);
}
cmsRole = g_HA_status->local_role;
if (cmsRole != CM_SERVER_PRIMARY) {
return;
}
if (IsSyncListNumZero()) {
if (!GetHistoryClusterSyncListFromDdb()) {
write_runlog(LOG, "cannot get history syncList, and change cmsRole to %d.\n", CM_SERVER_UNKNOWN);
cmsRole = CM_SERVER_UNKNOWN;
}
}
return;
}
static bool CheckDnPendingCmd(uint32 groupIdx)
{
cm_instance_command_status *cmd = g_instance_group_report_status_ptr[groupIdx].instance_status.command_member;
for (int32 i = 0; i < g_instance_role_group_ptr[groupIdx].count; ++i) {
if (cmd[i].pengding_command == (int32)MSG_CM_AGENT_SWITCHOVER) {
write_runlog(LOG, "instd(%u) is doing(%d: %d), cannot modify synclist.\n",
GetInstanceIdInGroup(groupIdx, i), cmd[i].command_status, cmd[i].pengding_command);
return false;
}
}
return true;
}
static bool CheckCurDnModifySyncList(uint32 groupIdx, const CurrentInstanceStatus *statusInstance, const char *value)
{
const int dnPrimaryNum = 1;
if (statusInstance->statusPrimary.count != dnPrimaryNum || (statusInstance->norPrimary.count != dnPrimaryNum) ||
(statusInstance->statusDnOnline.count + statusInstance->statusDnFail.count !=
(g_instance_role_group_ptr[groupIdx].count - statusInstance->statusDnVoteAz.count))) {
write_runlog(ERROR, "instanceId(%u), primary count is (%d: %d), statusDnOnline is %d, statusDnFail is %d, "
"value is %s, can not arbitrate.\n", GetInstanceIdInGroup(groupIdx, 0), statusInstance->statusPrimary.count,
statusInstance->norPrimary.count, statusInstance->statusDnOnline.count, statusInstance->statusDnFail.count,
value);
return false;
}
return CheckDnPendingCmd(groupIdx);
}
void *DnGroupStatusCheckAndArbitrateMain(void *arg)
{
uint32 sleepInterval = 1;
thread_name = "GpArbitrate";
bool hasHistory = false;
DatanodeDynamicStatus *historyDnOnline =
(DatanodeDynamicStatus *)malloc(sizeof(DatanodeDynamicStatus) * MAX_INSTANCE_NUM);
if (historyDnOnline == NULL) {
write_runlog(FATAL, "Out of memory: historyDnOnline failed.\n");
g_isEnableUpdateSyncList = CANNOT_START_SYNCLIST_THREADS;
FreeNotifyMsg();
exit(-1);
}
errno_t rc = memset_s(historyDnOnline, sizeof(DatanodeDynamicStatus) * MAX_INSTANCE_NUM, 0,
sizeof(DatanodeDynamicStatus) * MAX_INSTANCE_NUM);
securec_check_errno(rc, (void)rc);
CurrentInstanceStatus statusInstance = {{0}};
char value[DDB_MIN_VALUE_LEN] = {0};
SetInitWaitReduceOrIncreaseTime();
struct timeval checkBegin = {0, 0};
struct timeval checkEnd = {0, 0};
bool isResult = false;
int printTime = 0;
bool isCurSameWithExpect = false;
int logLevel = 0;
bool isNeedReduce = false;
bool isNeedIncrease = false;
int count = 0;
CmsArbitrateStatus cmsSt = {false, CM_SERVER_UNKNOWN, MAINTENANCE_MODE_NONE};
for (;;) {
(void)gettimeofday(&checkBegin, NULL);
if (got_stop == 1) {
write_runlog(LOG, "receive exit request in DnGroupStatusCheckAndArbitrateMain.\n");
cm_sleep(sleepInterval);
continue;
}
if (CmsCanArbitrate(&cmsSt, "[DnGroupStatusCheckAndArbitrateMain]") != CM_SUCCESS) {
hasHistory = false;
logLevel = (g_HA_status->local_role != CM_SERVER_PRIMARY) ? DEBUG1 : LOG;
write_runlog(logLevel, "cannot arbitrate reduce or increase, in the condition that ddb is health is %d "
"or upgradeMode is %u.\n", cmsSt.isDdbHealth, (uint32)cmsSt.upgradeMode);
UpdateSyncListStat(cmsSt.upgradeMode);
GetHistoryClusterSyncListWhenEmptySyncList();
cm_sleep(sleepInterval);
continue;
}
g_isEnableUpdateSyncList = SYNCLIST_THREADS_IN_PROCESS;
if (!hasHistory) {
if (!GetHistoryClusterSyncListFromDdb()) {
cm_sleep(sleepInterval);
continue;
}
hasHistory = true;
}
for (uint32 groupIndex = 0; groupIndex < g_dynamic_header->relationCount; ++groupIndex) {
if (g_instance_role_group_ptr[groupIndex].instanceMember[0].instanceType != INSTANCE_TYPE_DATANODE) {
continue;
}
cm_instance_report_status *reportStatus = &g_instance_group_report_status_ptr[groupIndex].instance_status;
MemsetDnStatus(&statusInstance, value, DDB_MIN_VALUE_LEN);
GetDnDynamicStatus(groupIndex, &statusInstance, value, DDB_MIN_VALUE_LEN);
if (!CheckCurDnModifySyncList(groupIndex, &statusInstance, value)) {
continue;
}
isCurSameWithExpect = CompareCurWithExceptSyncList(groupIndex);
SyncCurrentWithExceptSyncList(groupIndex, isCurSameWithExpect);
PrintLogMsg(groupIndex, &statusInstance, value);
isResult = IsDoReduceOrIncreaseSyncList(groupIndex, &(statusInstance.statusDnOnline), reportStatus,
historyDnOnline, isCurSameWithExpect);
if (!isResult) {
continue;
}
count = (g_instance_role_group_ptr[groupIndex].count - statusInstance.statusDnVoteAz.count) / 2;
isNeedReduce = statusInstance.statusDnOnline.count <= reportStatus->currentSyncList.count &&
statusInstance.statusDnOnline.count <= count;
if (isNeedReduce) {
PrintReduceOrIncreaseMsg(groupIndex, reportStatus, value, true, &statusInstance);
DoReduceSyncList(groupIndex, &statusInstance, printTime, reportStatus);
continue;
}
isNeedIncrease = statusInstance.statusDnOnline.count > reportStatus->currentSyncList.count &&
statusInstance.statusDnOnline.count >= count;
if (isNeedIncrease) {
PrintReduceOrIncreaseMsg(groupIndex, reportStatus, value, false, &statusInstance);
DoIncreaseSyncList(groupIndex, &statusInstance, reportStatus);
}
}
ComputeTimeForArbitrate(checkBegin, checkEnd, &printTime);
}
FREE_AND_RESET(historyDnOnline);
return NULL;
}
static void PrintReduceOrIncreaseMsg(uint32 groupIndex, const cm_instance_report_status *reportStatus,
const char *value, bool isReduce, const CurrentInstanceStatus *statusInstance)
{
char curSyncListStr[MAX_PATH_LEN] = {0};
char expectSyncListStr[MAX_PATH_LEN] = {0};
char onlineStr[MAX_PATH_LEN] = {0};
char primaryStr[MAX_PATH_LEN] = {0};
char failStr[MAX_PATH_LEN] = {0};
char voteAzStr[MAX_PATH_LEN] = {0};
GetSyncListString(&(reportStatus->currentSyncList), curSyncListStr, sizeof(curSyncListStr));
GetSyncListString(&(reportStatus->exceptSyncList), expectSyncListStr, sizeof(expectSyncListStr));
GetDnStatusString(&(statusInstance->statusDnOnline), onlineStr, sizeof(onlineStr));
GetDnStatusString(&(statusInstance->statusPrimary), primaryStr, sizeof(primaryStr));
GetDnStatusString(&(statusInstance->statusDnFail), failStr, sizeof(failStr));
GetDnStatusString(&(statusInstance->statusDnVoteAz), voteAzStr, sizeof(voteAzStr));
if (isReduce) {
write_runlog(LOG,
"instanceId(%u) begin to reduce sync list, primary is [%s], online is [%s], fail is [%s], voteAz is [%s], "
"value is [%s], currentSyncList is [%s], exceptSyncList is [%s], time is %d.\n",
g_instance_role_group_ptr[groupIndex].instanceMember[0].instanceId, primaryStr, onlineStr, failStr,
voteAzStr, value, curSyncListStr, expectSyncListStr, reportStatus->waitReduceTimes);
} else {
write_runlog(LOG,
"instanceId(%u) begin to increase sync list, primary is [%s], online is [%s], fail is [%s], voteAz is [%s],"
" value is [%s], currentSyncList is [%s], exceptSyncList is [%s], time is %d.\n",
g_instance_role_group_ptr[groupIndex].instanceMember[0].instanceId, primaryStr, onlineStr, failStr,
voteAzStr, value, curSyncListStr, expectSyncListStr, reportStatus->waitIncreaseTimes);
}
}
static void PrintLogMsg(uint32 groupIndex, const CurrentInstanceStatus *statusInstance, const char *value)
{
if (log_min_messages > DEBUG1) {
return;
}
cm_instance_report_status *reportStatus = &g_instance_group_report_status_ptr[groupIndex].instance_status;
char curSyncListStr[MAX_PATH_LEN] = {0};
char expectSyncListStr[MAX_PATH_LEN] = {0};
char onlineStr[MAX_PATH_LEN] = {0};
char primaryStr[MAX_PATH_LEN] = {0};
char failStr[MAX_PATH_LEN] = {0};
char voteAzStr[MAX_PATH_LEN] = {0};
GetSyncListString(&(reportStatus->currentSyncList), curSyncListStr, sizeof(curSyncListStr));
GetSyncListString(&(reportStatus->exceptSyncList), expectSyncListStr, sizeof(expectSyncListStr));
GetDnStatusString(&(statusInstance->statusDnOnline), onlineStr, sizeof(onlineStr));
GetDnStatusString(&(statusInstance->statusPrimary), primaryStr, sizeof(primaryStr));
GetDnStatusString(&(statusInstance->statusDnFail), failStr, sizeof(failStr));
GetDnStatusString(&(statusInstance->statusDnVoteAz), voteAzStr, sizeof(voteAzStr));
write_runlog(DEBUG1,
"line %d: instanceId(%u), primary is [%s], online is [%s], fail is [%s], voteAz is [%s], "
"value is [%s], currentSyncList is [%s], exceptSyncList is [%s].\n",
__LINE__, g_instance_role_group_ptr[groupIndex].instanceMember[0].instanceId,
primaryStr, onlineStr, failStr, voteAzStr, value, curSyncListStr, expectSyncListStr);
}
bool IsDnSyncListVaild(uint32 groupIndex, uint32 *instanceId)
{
if (g_isEnableUpdateSyncList == CANNOT_START_SYNCLIST_THREADS) {
return true;
}
for (int i = 0; i < g_instance_role_group_ptr[groupIndex].count; ++i) {
if (g_instance_group_report_status_ptr[groupIndex].instance_status.data_node_member[i].dnSyncList.count == -1) {
if (instanceId != NULL) {
*instanceId = g_instance_role_group_ptr[groupIndex].instanceMember[i].instanceId;
}
return false;
}
}
return true;
}
void GetDnStatusString(const DatanodeDynamicStatus *dnDynamicStatus, char *dnStatusStr, size_t maxLen)
{
errno_t rc = 0;
size_t strLen = 0;
if (maxLen < 1) {
return;
}
if (dnDynamicStatus->count == 0) {
rc = strcpy_s(dnStatusStr, maxLen, "dynamic status is empty");
securec_check_errno(rc, (void)rc);
return;
}
for (int index = 0; index < dnDynamicStatus->count; ++index) {
strLen = strlen(dnStatusStr);
if (strLen >= (maxLen - 1)) {
return;
}
if (index == dnDynamicStatus->count - 1) {
rc = snprintf_s(
dnStatusStr + strLen, maxLen - strLen, (maxLen - strLen) - 1, "%u", dnDynamicStatus->dnStatus[index]);
} else {
rc = snprintf_s(dnStatusStr + strLen,
maxLen - strLen, (maxLen - strLen) - 1, "%u, ", dnDynamicStatus->dnStatus[index]);
}
securec_check_intval(rc, (void)rc);
}
}
static bool IsDoReduceOrIncreaseSyncList(uint32 groupIndex, const DatanodeDynamicStatus *statusDnOnline,
cm_instance_report_status *reportStatus, DatanodeDynamicStatus *historyDnOnline, bool isCurSameWithExpect)
{
errno_t rc = 0;
uint32 instanceId = 0;
if (!isCurSameWithExpect) {
return false;
}
if (CompareDnOnlineWithExpectSyncList(statusDnOnline, &(reportStatus->exceptSyncList))) {
return false;
}
if (!CompareHistorywithCurrOnline(statusDnOnline, &(historyDnOnline[groupIndex]))) {
char onlineStr[MAX_PATH_LEN] = {0};
char historyStr[MAX_PATH_LEN] = {0};
GetDnStatusString(statusDnOnline, onlineStr, sizeof(onlineStr));
GetDnStatusString(&(historyDnOnline[groupIndex]), historyStr, sizeof(historyStr));
write_runlog(LOG, "instanceId(%u): statusDnOnline[%s] is different from historyDnOnline[%s], "
"and reset the time.\n", g_instance_role_group_ptr[groupIndex].instanceMember[0].instanceId,
onlineStr, historyStr);
rc = memcpy_s(&(historyDnOnline[groupIndex]), sizeof(DatanodeDynamicStatus),
statusDnOnline, sizeof(DatanodeDynamicStatus));
securec_check_errno(rc, (void)rc);
reportStatus->waitReduceTimes = DELAY_TIME_TO_REDUCE_STANDBY;
reportStatus->waitIncreaseTimes = DELAY_TIME_TO_INCREASE_STANDBY;
return false;
}
if (!IsDnSyncListVaild(groupIndex, &instanceId)) {
write_runlog(ERROR, "syncList in instanceId(%u) is invalid, can't do reduce or increase operf.\n", instanceId);
return false;
}
return true;
}
static void DoIncreaseSyncList(
uint32 groupIndex, const CurrentInstanceStatus *statusInstance, cm_instance_report_status *reportStatus)
{
if ((reportStatus->waitIncreaseTimes--) > 0) {
return;
}
bool doResult = SetGroupExpectSyncList(groupIndex, statusInstance);
if (doResult) {
reportStatus->waitIncreaseTimes = DELAY_TIME_TO_INCREASE_STANDBY;
}
}
static void DoReduceSyncList(uint32 groupIndex, const CurrentInstanceStatus *statusInstance, int printTime,
cm_instance_report_status *reportStatus)
{
if (statusInstance->statusDnOnline.count < ONE_PRIMARY_ONE_SLAVE) {
if (printTime % 10 == 0) {
write_runlog(ERROR,
"dn instance online count(%d) less than one primary one slave, cannot to do reduce.\n",
statusInstance->statusDnOnline.count);
}
return;
}
if ((reportStatus->waitReduceTimes--) > 0) {
return;
}
bool doResult = SetGroupExpectSyncList(groupIndex, statusInstance);
if (doResult) {
reportStatus->waitReduceTimes = DELAY_TIME_TO_REDUCE_STANDBY;
}
}
static bool GetHistoryClusterSyncListFromDdb()
{
if (GetHistoryClusterCurSyncListFromDdb() == FAILED_GET_VALUE) {
write_runlog(LOG, "can't get the value from ddb.\n");
return false;
}
if (GetHistoryClusterExceptSyncListFromDdb() == FAILED_GET_VALUE) {
write_runlog(LOG, "can't get the value from ddb.\n");
return false;
}
return true;
}
bool CompareCurWithExceptSyncList(uint32 groupIndex)
{
cm_instance_report_status *dnReportStatus = &g_instance_group_report_status_ptr[groupIndex].instance_status;
if (dnReportStatus->currentSyncList.count != dnReportStatus->exceptSyncList.count) {
return false;
}
for (int i = 0; i < dnReportStatus->currentSyncList.count; ++i) {
if (dnReportStatus->currentSyncList.dnSyncList[i] != dnReportStatus->exceptSyncList.dnSyncList[i]) {
return false;
}
}
return true;
}
static bool CompareMemberSyncWithExceptSyncList(const DatanodeSyncList *memberSyncList,
const DatanodeSyncList *expectSyncList)
{
if (memberSyncList->count != expectSyncList->count) {
return false;
}
for (int i = 0; i < memberSyncList->count; ++i) {
if (memberSyncList->dnSyncList[i] != expectSyncList->dnSyncList[i]) {
return false;
}
}
return true;
}