* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* dcf_flowcontrol.cpp
*
* IDENTIFICATION
* src/gausskernel/storage/replication/dcf/dcf_flowcontrol.cpp
*
* ---------------------------------------------------------------------------------------
*/
#include "c.h"
#include "postgres.h"
#include "knl/knl_variable.h"
#include "storage/shmem.h"
#include "utils/timestamp.h"
#include "cjson/cJSON.h"
#include "dcf_interface.h"
#include "replication/dcf_flowcontrol.h"
#include "pgstat.h"
#ifndef ENABLE_MULTIPLE_NODES
#ifdef ENABLE_UT
#define static
#endif
static const int MICROSECONDS_PER_SECONDS = 1000000;
static const int MILLISECONDS_PER_SECONDS = 1000;
static const int MILLISECONDS_PER_MICROSECONDS = 1000;
static const int INIT_CONTROL_REPLY = 3;
static const int MAX_CONTROL_REPLY = 1000;
static const int SLEEP_MORE = 200;
static const int SLEEP_LESS = 400;
static const int SHIFT_SPEED = 3;
inline static void SetNodeInfo(int nodeIndex, uint32 nodeID, DCFStandbyReplyMessage reply)
{
DCFStandbyInfo *standbyNodeInfo = &(t_thrd.dcf_cxt.dcfCtxInfo->nodes_info[nodeIndex]);
standbyNodeInfo->nodeID = nodeID;
standbyNodeInfo->isMember = true;
standbyNodeInfo->isActive = true;
standbyNodeInfo->receive = reply.receive;
standbyNodeInfo->write = reply.write;
standbyNodeInfo->flush = reply.flush;
standbyNodeInfo->apply = reply.apply;
standbyNodeInfo->peer_role = reply.peer_role;
standbyNodeInfo->peer_state = reply.peer_state;
standbyNodeInfo->sendTime = reply.sendTime;
}
static void ResetDCFNodeInfo(int index)
{
DCFStandbyInfo *standbyNodeInfo = &(t_thrd.dcf_cxt.dcfCtxInfo->nodes_info[index]);
standbyNodeInfo->isMember = false;
standbyNodeInfo->isActive = false;
standbyNodeInfo->nodeID = 0;
standbyNodeInfo->receive = InvalidXLogRecPtr;
standbyNodeInfo->write = InvalidXLogRecPtr;
standbyNodeInfo->flush = InvalidXLogRecPtr;
standbyNodeInfo->apply = InvalidXLogRecPtr;
standbyNodeInfo->applyRead = InvalidXLogRecPtr;
standbyNodeInfo->peer_role = UNKNOWN_MODE;
standbyNodeInfo->peer_state = UNKNOWN_STATE;
standbyNodeInfo->sendTime = 0;
}
static void ResetDCFNodeLogCtl(int index)
{
DCFLogCtrlData *logCtrl = &(t_thrd.dcf_cxt.dcfCtxInfo->log_ctrl[index]);
logCtrl->prev_sleep_time = -1;
logCtrl->sleep_time = 0;
logCtrl->balance_sleep_time = 0;
logCtrl->prev_RTO = -1;
logCtrl->current_RTO = -1;
logCtrl->sleep_count = 0;
logCtrl->sleep_count_limit = MAX_CONTROL_REPLY;
logCtrl->prev_flush = 0;
logCtrl->prev_apply = 0;
logCtrl->prev_reply_time = 0;
logCtrl->pre_rate1 = 0;
logCtrl->pre_rate2 = 0;
logCtrl->prev_RPO = -1;
logCtrl->current_RPO = -1;
}
bool ResetDCFNodeInfoWithNodeID(uint32 nodeID)
{
for (int i = 0; i < DCF_MAX_NODES; i++) {
if (t_thrd.dcf_cxt.dcfCtxInfo->nodes_info[i].nodeID == nodeID) {
ResetDCFNodeInfo(i);
ResetDCFNodeLogCtl(i);
return true;
}
}
return false;
}
void ResetDCFNodesInfo(void)
{
for (int i = 0; i < DCF_MAX_NODES; i++) {
ResetDCFNodeInfo(i);
ResetDCFNodeLogCtl(i);
}
}
bool GetNodeInfos(cJSON **nodeInfos)
{
char replicInfo[DCF_MAX_STREAM_INFO_LEN] = {0};
int len = dcf_query_stream_info(1, replicInfo, DCF_MAX_STREAM_INFO_LEN * sizeof(char));
if (len == 0) {
ereport(WARNING, (errmodule(MOD_DCF), errmsg("Failed to query dcf config!")));
return false;
}
*nodeInfos = cJSON_Parse(replicInfo);
if (*nodeInfos == nullptr) {
const char* errorPtr = cJSON_GetErrorPtr();
if (errorPtr != nullptr) {
ereport(WARNING, (errmodule(MOD_DCF), errmsg("Failed to parse dcf config: %s!", errorPtr)));
} else {
ereport(WARNING, (errmodule(MOD_DCF), errmsg("Failed to parse dcf config: unkonwn error.")));
}
return false;
}
return true;
}
bool GetDCFNodeInfo(const cJSON *nodeJsons, int nodeID, char *role, int roleLen, char *ip, int ipLen, int *port)
{
if (!cJSON_IsArray(nodeJsons)) {
ereport(WARNING, (errmodule(MOD_DCF), errmsg("Must exist array format in the json file.")));
return false;
}
const cJSON* nodeJson = nullptr;
errno_t rc = EOK;
cJSON_ArrayForEach(nodeJson, nodeJsons)
{
cJSON *idJson = cJSON_GetObjectItem(nodeJson, "node_id");
if (idJson == nullptr) {
ereport(WARNING, (errmodule(MOD_DCF), errmsg("No items with node id %d!", nodeID)));
return false;
}
if (idJson->valueint == nodeID) {
cJSON *roleJson = cJSON_GetObjectItem(nodeJson, "role");
if (roleJson == nullptr) {
ereport(WARNING, (errmodule(MOD_DCF), errmsg("No role item with node id %d!", nodeID)));
return false;
}
rc = strcpy_s(role, roleLen, roleJson->valuestring);
securec_check(rc, "\0", "\0");
cJSON *ipJson = cJSON_GetObjectItem(nodeJson, "ip");
if (ipJson == nullptr) {
ereport(WARNING, (errmodule(MOD_DCF), errmsg("No ip item with node id %d!", nodeID)));
return false;
}
rc = strcpy_s(ip, ipLen, ipJson->valuestring);
securec_check(rc, "\0", "\0");
cJSON *portJson = cJSON_GetObjectItem(nodeJson, "port");
if (portJson == nullptr) {
ereport(WARNING, (errmodule(MOD_DCF), errmsg("No port item with node id %d!", nodeID)));
return false;
}
*port = portJson->valueint;
return true;
}
}
ereport(WARNING, (errmodule(MOD_DCF), errmsg("No node item with node id %d found!", nodeID)));
return false;
}
bool GetNodeInfo(const uint32 nodeID, char *nodeIP, uint32 nodeIPLen, uint32 *nodePort)
{
cJSON *nodeInfos = nullptr;
if (!GetNodeInfos(&nodeInfos)) {
return false;
}
const cJSON *nodeJsons = cJSON_GetObjectItem(nodeInfos, "nodes");
if (nodeJsons == nullptr) {
cJSON_Delete(nodeInfos);
ereport(ERROR, (errmodule(MOD_DCF), errmsg("Get nodes info failed from DCF!")));
return false;
}
const int DCF_ROLE_LEN = 64;
char localDCFRole[DCF_ROLE_LEN] = {0};
if (!GetDCFNodeInfo(nodeJsons, nodeID, localDCFRole, DCF_ROLE_LEN, nodeIP, nodeIPLen, (int*)nodePort)) {
cJSON_Delete(nodeInfos);
return false;
}
cJSON_Delete(nodeInfos);
return true;
}
bool SetNodeInfoByNodeID(uint32 nodeID, DCFStandbyReplyMessage reply, int *nodeIndex)
{
*nodeIndex = -1;
DCFStandbyInfo nodeInfo;
for (int i = 0; i < DCF_MAX_NODES; i++) {
nodeInfo = t_thrd.dcf_cxt.dcfCtxInfo->nodes_info[i];
if (nodeInfo.nodeID == nodeID) {
*nodeIndex = i;
break;
}
}
if (*nodeIndex == -1) {
for (int i = 0; i < DCF_MAX_NODES; i++) {
nodeInfo = t_thrd.dcf_cxt.dcfCtxInfo->nodes_info[i];
if (!nodeInfo.isMember) {
*nodeIndex = i;
break;
}
}
}
if (*nodeIndex == -1) {
ereport(WARNING,
(errmsg("Can't add the node info with node id %u for no space in dcf nodes info array", nodeID)));
return false;
}
SetNodeInfo(*nodeIndex, nodeID, reply);
return true;
}
bool IsForceUpdate(TimestampTz preSendTime, TimestampTz curSendTime)
{
long secToTime;
int microsecToTime;
long millisecTimeDiff = 0;
TimestampDifference(preSendTime, curSendTime, &secToTime, µsecToTime);
millisecTimeDiff = secToTime * MILLISECONDS_PER_SECONDS
+ microsecToTime / MILLISECONDS_PER_MICROSECONDS;
ereport(DEBUG1, (errmsg("The millisec_time_diff is %ld", millisecTimeDiff)));
int secondsNum = 2;
return (millisecTimeDiff > secondsNum * MILLISECONDS_PER_SECONDS);
}
static bool IsUpdateRto(int nodeIndex, TimestampTz sendTime)
{
bool forceUpdate = false;
uint64 curSleepCount = t_thrd.dcf_cxt.dcfCtxInfo->log_ctrl[nodeIndex].sleep_count;
uint64 sleepCountLimit = t_thrd.dcf_cxt.dcfCtxInfo->log_ctrl[nodeIndex].sleep_count_limit;
if (t_thrd.dcf_cxt.dcfCtxInfo->log_ctrl[nodeIndex].prev_reply_time > 0) {
forceUpdate = IsForceUpdate(t_thrd.dcf_cxt.dcfCtxInfo->log_ctrl[nodeIndex].prev_reply_time, sendTime);
}
return ((sleepCountLimit > 0 && curSleepCount % sleepCountLimit == 0) || forceUpdate);
}
bool QueryLeaderNodeInfo(uint32* leaderID, char* leaderIP, uint32 ipLen, uint32 *leaderPort)
{
Assert((t_thrd.dcf_cxt.is_dcf_thread && t_thrd.dcf_cxt.isDcfShmemInited) ||
!t_thrd.dcf_cxt.is_dcf_thread);
*leaderID = 0;
uint32 tmpPort = 0;
uint32 *port = &tmpPort;
char tmpIP[DCF_MAX_IP_LEN] = {0};
char *ip = tmpIP;
uint32 leaderIPLen = DCF_MAX_IP_LEN;
if (leaderPort != NULL) {
port = leaderPort;
}
if (leaderIP != NULL) {
ip= leaderIP;
}
if (ipLen != 0) {
leaderIPLen = ipLen;
}
bool success = (dcf_query_leader_info(1, ip, leaderIPLen, port, leaderID) == 0);
if (!success) {
ereport(WARNING, (errmsg("DCF failed to query leader info.")));
return false;
}
if (*leaderID == 0) {
ereport(WARNING, (errmsg("DCF leader does not exist.")));
return false;
}
return true;
}
static void SetGlobalRtoData(int nodeIndex, int srcNodeID, char *nodename)
{
RTOStandbyData *standbyData = &(g_instance.rto_cxt.dcf_rto_standby_data[nodeIndex]);
int rc = strncpy_s(standbyData->id, DCF_STANDBY_NAME_SIZE, nodename, strlen(nodename));
securec_check(rc, "\0", "\0");
standbyData->current_rto = t_thrd.dcf_cxt.dcfCtxInfo->log_ctrl[nodeIndex].current_RTO;
standbyData->current_sleep_time = t_thrd.dcf_cxt.dcfCtxInfo->log_ctrl[nodeIndex].sleep_time;
standbyData->target_rto = t_thrd.dcf_cxt.dcfCtxInfo->targetRTO;
char *remoteIP = standbyData->dest_ip;
uint32 remotePort = 0;
char *localIP = standbyData->source_ip;
uint32 localPort = 0;
uint32 leaderID = -1;
if (GetNodeInfo(srcNodeID, remoteIP, IP_LEN, &remotePort)) {
standbyData->dest_port = (int)remotePort;
} else {
ereport(WARNING, (errmsg("Get ip and port of node with nodeID %u failed", srcNodeID)));
}
if (QueryLeaderNodeInfo(&leaderID, localIP, IP_LEN, &localPort)) {
standbyData->source_port = (int)localPort;
} else {
ereport(WARNING, (errmsg("Get ip and port of leader failed")));
}
}
static inline uint64 LogCtrlCountBigSpeed(uint64 originSpeed, uint64 curSpeed)
{
uint64 updateSpeed = (((originSpeed << SHIFT_SPEED) - originSpeed) >> SHIFT_SPEED) + curSpeed;
return updateSpeed;
}
* Estimate the time standby need to flush and apply log.
*/
static void DCFLogCtrlCalculateCurrentRTO(const DCFStandbyReplyMessage *reply, const int nodeIndex)
{
volatile DCFLogCtrlData *logCtrl = &(t_thrd.dcf_cxt.dcfCtxInfo->log_ctrl[nodeIndex]);
long secToTime;
int microsecToTime;
if (XLByteLT(reply->receive, reply->flush) || XLByteLT(reply->flush, reply->apply) ||
XLByteLT(reply->flush, logCtrl->prev_flush) || XLByteLT(reply->apply, logCtrl->prev_apply)) {
return;
}
if (XLByteEQ(reply->receive, reply->apply)) {
logCtrl->current_RTO = 0;
return;
}
if (logCtrl->prev_reply_time == 0) {
return;
}
uint64 part1 = reply->receive - reply->flush;
uint64 part2 = reply->flush - reply->apply;
uint64 part1Diff = reply->flush - logCtrl->prev_flush;
uint64 part2Diff = reply->apply - logCtrl->prev_apply;
TimestampDifference(logCtrl->prev_reply_time, reply->sendTime, &secToTime, µsecToTime);
long millisecTimeDiff = secToTime * MILLISECONDS_PER_SECONDS + (microsecToTime / MILLISECONDS_PER_MICROSECONDS);
long timeThreshold = 10;
if (millisecTimeDiff <= timeThreshold) {
return;
}
* consumeRatePart1 and consumeRatePart2 is based on 7/8 previous_speed(walsnd->log_ctrl.pre_rate1) and 1/8
* speed_now(part1_diff / millisec_time_diff). To be more precise and keep more decimal point, we expand speed_now
* by multiply first then divide, which is (8 * previous_speed * 7/8 + speed_now) / 8.
*/
if (logCtrl->pre_rate1 != 0) {
logCtrl->pre_rate1 = LogCtrlCountBigSpeed(logCtrl->pre_rate1, (uint64)(part1Diff / millisecTimeDiff));
} else {
logCtrl->pre_rate1 = ((part1Diff / (uint64)millisecTimeDiff) << SHIFT_SPEED);
}
if (logCtrl->pre_rate2 != 0) {
logCtrl->pre_rate2 = LogCtrlCountBigSpeed(logCtrl->pre_rate2, (uint64)(part2Diff / millisecTimeDiff));
} else {
logCtrl->pre_rate2 = ((uint64)(part2Diff / millisecTimeDiff) << SHIFT_SPEED);
}
uint64 consumeRatePart1 = (logCtrl->pre_rate1 >> SHIFT_SPEED);
uint64 consumeRatePart2 = (logCtrl->pre_rate2 >> SHIFT_SPEED);
if (consumeRatePart1 == 0)
consumeRatePart1 = 1;
if (consumeRatePart2 == 0)
consumeRatePart2 = 1;
uint64 secRTOPart1 = (part1 / consumeRatePart1) / MILLISECONDS_PER_SECONDS;
uint64 secRTOPart2 = ((part1 + part2) / consumeRatePart2) / MILLISECONDS_PER_SECONDS;
uint64 secRTO = (secRTOPart1 > secRTOPart2) ? secRTOPart1 : secRTOPart2;
ereport(DEBUG4, (errmodule(MOD_RTO_RPO),
errmsg("The RTO estimated is = : %lu seconds. reply->reveive is %lu, reply->flush is %lu, "
"reply->apply is %lu, pre_flush is %lu, pre_apply is %lu, TimestampDifference is %ld, "
"consumeRatePart1 is %lu, consumeRatePart2 is %lu",
secRTO, reply->receive, reply->flush, reply->apply, logCtrl->prev_flush,
logCtrl->prev_apply, millisecTimeDiff, consumeRatePart1, consumeRatePart2)));
t_thrd.dcf_cxt.dcfCtxInfo->log_ctrl[nodeIndex].current_RTO = secRTO;
}
static void DCFLogCtrlCalculateIndicatorChange(DCFLogCtrlData *logCtrl, int64 *gapDiff, int64 *gap)
{
int64 rtoPrevGap = 0;
int64 rtoGapDiff = 0;
int64 rtoGap = 0;
int64 rpoPrevGap = 0;
int64 rpoGapDiff = 0;
int64 rpoGap = 0;
if (t_thrd.dcf_cxt.dcfCtxInfo->targetRTO > 0) {
if (logCtrl->prev_RTO < 0) {
logCtrl->prev_RTO = logCtrl->current_RTO;
}
int balanceFactor = 2;
int targetRTO = t_thrd.dcf_cxt.dcfCtxInfo->targetRTO / balanceFactor;
int64 currentRTO = logCtrl->current_RTO;
rtoGap = currentRTO - targetRTO;
rtoPrevGap = logCtrl->prev_RTO - targetRTO;
rtoGapDiff = rtoGap - rtoPrevGap;
}
if (t_thrd.dcf_cxt.dcfCtxInfo->targetRPO > 0) {
if (logCtrl->prev_RPO < 0) {
logCtrl->prev_RPO = logCtrl->current_RPO;
}
int targetRPO = t_thrd.dcf_cxt.dcfCtxInfo->targetRPO;
int64 currentRPO = logCtrl->current_RPO;
rpoGap = currentRPO - targetRPO;
rpoPrevGap = logCtrl->prev_RPO - targetRPO;
rpoGapDiff = rpoGap - rpoPrevGap;
}
if (abs(rpoGapDiff) > abs(rtoGapDiff)) {
*gapDiff = rpoGapDiff;
*gap = rpoGap;
} else {
*gapDiff = rtoGapDiff;
*gap = rtoGap;
}
ereport(DEBUG4, (errmodule(MOD_RTO_RPO),
errmsg("[LogCtrlCalculateIndicatorChange] rto_gap=%d, rto_gap_diff=%d,"
"rpo_gap=%d, rpo_gap_diff=%d, gap=%d, gap_diff=%d",
(int)rtoGap, (int)rtoGapDiff, (int)rpoGap,
(int)rpoGapDiff, (int)*gap, (int)*gapDiff)));
}
* If current RTO/RPO is less than target_rto/time_to_target_rpo, primary need less sleep.
* If current RTO/RPO is more than target_rto/time_to_target_rpo, primary need more sleep.
* If current RTO/RPO equals to target_rto/time_to_target_rpo, primary will sleep.
* according to balance_sleep to maintain rto.
*/
static void DCFLogCtrlCalculateSleepTime(int nodeIndex)
{
DCFLogCtrlData *logCtrl = &(t_thrd.dcf_cxt.dcfCtxInfo->log_ctrl[nodeIndex]);
if ((t_thrd.dcf_cxt.dcfCtxInfo->targetRTO == 0 && t_thrd.dcf_cxt.dcfCtxInfo->targetRPO == 0) ||
logCtrl->current_RTO == 0) {
logCtrl->sleep_time = 0;
return;
}
int64 gapDiff;
int64 gap;
DCFLogCtrlCalculateIndicatorChange(logCtrl, &gapDiff, &gap);
int64 sleepTime = logCtrl->sleep_time;
int64 preTime = logCtrl->sleep_time;
int balanceRange = 1;
if (abs(gapDiff) <= balanceRange) {
if (logCtrl->balance_sleep_time == 0) {
logCtrl->balance_sleep_time = sleepTime;
} else {
int64 balanceFactor = 2;
logCtrl->balance_sleep_time = (logCtrl->balance_sleep_time + sleepTime) / balanceFactor;
}
ereport(DEBUG4, (errmodule(MOD_RTO_RPO), errmsg("The balance time for log control is : %ld microseconds",
logCtrl->balance_sleep_time)));
}
if (abs(gap) <= balanceRange) {
if (logCtrl->balance_sleep_time != 0) {
sleepTime = logCtrl->balance_sleep_time;
} else {
sleepTime -= SLEEP_LESS;
}
} else if (gap > balanceRange) {
sleepTime += SLEEP_MORE;
} else if (gap < -balanceRange) {
sleepTime -= SLEEP_LESS;
}
sleepTime = (sleepTime >= 0) ? sleepTime : 0;
sleepTime = (sleepTime < MICROSECONDS_PER_SECONDS) ? sleepTime : MICROSECONDS_PER_SECONDS;
logCtrl->sleep_time = sleepTime;
int threshold = 500000;
if (logCtrl->sleep_time >= threshold) {
volatile DCFStandbyInfo *nodeInfo = &(t_thrd.dcf_cxt.dcfCtxInfo->nodes_info[nodeIndex]);
ereport(WARNING,
(errmodule(MOD_RTO_RPO),
errmsg("Flow control report for node %d: current sleep time is %ld micronseconds, "
"current rto is %ld, target rto is %d, standby receive lsn is %X/%X, "
"previous flush lsn is %X/%X, flush lsn is %X/%X, previous apply lsn is %X/%X, "
"apply lsn is %X/%X, previous send time is %ld, current send time is %ld, "
"flush speed is %ld B/ms, replay speed is %ld B/ms",
nodeInfo->nodeID, logCtrl->sleep_time,
logCtrl->current_RTO, t_thrd.dcf_cxt.dcfCtxInfo->targetRTO,
(uint32)(nodeInfo->receive >> 32), (uint32)(nodeInfo->receive),
(uint32)(logCtrl->prev_flush >> 32), (uint32)(logCtrl->prev_flush),
(uint32)(nodeInfo->flush >> 32), (uint32)(nodeInfo->flush),
(uint32)(logCtrl->prev_apply >> 32), (uint32)(logCtrl->prev_apply),
(uint32)(nodeInfo->apply >> 32), (uint32)(nodeInfo->apply),
logCtrl->prev_reply_time, nodeInfo->sendTime,
logCtrl->pre_rate1 >> SHIFT_SPEED, logCtrl->pre_rate2 >> SHIFT_SPEED)));
}
if (preTime == 0 && logCtrl->sleep_time != 0) {
ereport(LOG,
(errmodule(MOD_RTO_RPO),
errmsg("Log control take effect, target_rto is %d, "
"current_rto is %ld, current the sleep time is %ld microseconds",
t_thrd.dcf_cxt.dcfCtxInfo->targetRTO, logCtrl->current_RTO,
logCtrl->sleep_time)));
}
if (preTime != 0 && logCtrl->sleep_time == 0) {
ereport(LOG,
(errmodule(MOD_RTO_RPO),
errmsg("Log control does not take effect, target_rto is %d, "
"current_rto is %ld, current the sleep time is %ld microseconds",
t_thrd.dcf_cxt.dcfCtxInfo->targetRTO,
logCtrl->current_RTO, logCtrl->sleep_time)));
}
ereport(DEBUG4,
(errmodule(MOD_RTO_RPO),
errmsg("The sleep time for log control is : %ld microseconds",
logCtrl->sleep_time)));
}
* Count the limit for sleep_count, it is based on sleep time.
*/
static void DCFLogCtrlCountSleepLimit(const int nodeIndex)
{
volatile DCFLogCtrlData *logCtrl = &(t_thrd.dcf_cxt.dcfCtxInfo->log_ctrl[nodeIndex]);
int64 sleepCountLimitCount = 0;
if (logCtrl->sleep_time == 0) {
sleepCountLimitCount = MAX_CONTROL_REPLY;
} else {
sleepCountLimitCount = INIT_CONTROL_REPLY * MICROSECONDS_PER_SECONDS / logCtrl->sleep_time;
sleepCountLimitCount = (sleepCountLimitCount > MAX_CONTROL_REPLY) ?
MAX_CONTROL_REPLY : sleepCountLimitCount;
}
if (sleepCountLimitCount <= 0) {
sleepCountLimitCount = INIT_CONTROL_REPLY;
}
logCtrl->sleep_count_limit = sleepCountLimitCount;
ereport(DEBUG1, (errmsg("Sleep count limit is %ld.", logCtrl->sleep_count_limit)));
}
static void DoActualSleep(int nodeID, int nodeIndex)
{
volatile DCFLogCtrlData *logCtrl = &(t_thrd.dcf_cxt.dcfCtxInfo->log_ctrl[nodeIndex]);
if (logCtrl->sleep_time >= 0 && logCtrl->sleep_time != logCtrl->prev_sleep_time) {
dcf_pause_rep(1, nodeID, logCtrl->sleep_time);
}
logCtrl->sleep_count++;
}
static bool GetFlowControlParam(void)
{
const int rtoLen = 10;
char tempRTO[rtoLen] = {0};
const int rpoLen = 10;
char tempRPO[rtoLen] = {0};
if (dcf_get_param("DN_FLOW_CONTROL_RTO", tempRTO, rtoLen) != 0) {
ereport(WARNING, (errmodule(MOD_RTO_RPO), errmsg("Get rto from dcf failed!")));
return false;
}
t_thrd.dcf_cxt.dcfCtxInfo->targetRTO = atoi(tempRTO);
ereport(DEBUG1, (errmodule(MOD_RTO_RPO),
errmsg("target rto got from dcf is %d",
t_thrd.dcf_cxt.dcfCtxInfo->targetRTO)));
if (dcf_get_param("DN_FLOW_CONTROL_RPO", tempRPO, rpoLen) != 0) {
ereport(WARNING, (errmodule(MOD_RTO_RPO), errmsg("Get rpo from dcf failed!")));
return false;
}
t_thrd.dcf_cxt.dcfCtxInfo->targetRPO = atoi(tempRPO);
ereport(DEBUG1, (errmodule(MOD_RTO_RPO),
errmsg("target rpo got from dcf is %d",
t_thrd.dcf_cxt.dcfCtxInfo->targetRPO)));
return true;
}
static void ResetPreviousValue(int nodeIndex, DCFStandbyReplyMessage reply)
{
DCFLogCtrlData *logCtrl = &(t_thrd.dcf_cxt.dcfCtxInfo->log_ctrl[nodeIndex]);
logCtrl->prev_reply_time = reply.sendTime;
logCtrl->prev_flush = reply.flush;
logCtrl->prev_apply = reply.apply;
logCtrl->prev_RTO = logCtrl->current_RTO;
logCtrl->prev_sleep_time = logCtrl->sleep_time;
}
* Regular reply from standby advising of WAL positions on standby server.
*/
void DCFProcessStandbyReplyMessage(uint32 srcNodeID, const char* msg, uint32 msgSize)
{
DCFStandbyReplyMessage reply;
errno_t errorno = EOK;
char *buf = NULL;
int nodeIndex = -1;
if (msgSize < (sizeof(DCFStandbyReplyMessage) + 1)) {
ereport(WARNING, (errmsg("The size of msg didn't meet reply message and the size is %u\n", msgSize)));
return;
}
buf = const_cast<char*>(msg) + 1;
errorno = memcpy_s(&reply,
sizeof(DCFStandbyReplyMessage),
buf,
sizeof(DCFStandbyReplyMessage));
securec_check(errorno, "\0", "\0");
ereport(DEBUG1, (errmsg("The src node id is %u, standby node name is %s "
"and receive %X/%X write %X/%X flush %X/%X apply %X/%X",
srcNodeID, reply.id,
(uint32)(reply.receive >> 32), (uint32)reply.receive,
(uint32)(reply.write >> 32), (uint32)reply.write,
(uint32)(reply.flush >> 32), (uint32)reply.flush,
(uint32)(reply.apply >> 32), (uint32)reply.apply)));
bool isSet = SetNodeInfoByNodeID(srcNodeID, reply, &nodeIndex);
if (!isSet) {
ereport(WARNING, (errmsg("Set node info with node ID %u failed!", srcNodeID)));
return;
}
if (!GetFlowControlParam()) {
ereport(WARNING, (errmsg("Get flow control parameters from dcf failed!")));
return;
}
if (IsUpdateRto(nodeIndex, reply.sendTime)) {
DCFLogCtrlCalculateCurrentRTO(&reply, nodeIndex);
DCFLogCtrlCalculateSleepTime(nodeIndex);
DCFLogCtrlCountSleepLimit(nodeIndex);
DoActualSleep(srcNodeID, nodeIndex);
ResetPreviousValue(nodeIndex, reply);
}
SetGlobalRtoData(nodeIndex, srcNodeID, reply.id);
}
#endif