* Copyright (c) 2021 Huawei Technologies Co.,Ltd.
*
* CM is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* ctl_start.cpp
* cm_ctl start [-z AVAILABILITY_ZONE [--cm_arbitration_mode=ARBITRATION_MODE]]
* [-n NODEID] [-D DATADIR] [-m resume] [-t SECS] [-R]
*
* IDENTIFICATION
* src/cm_ctl/ctl_start.cpp
*
* -------------------------------------------------------------------------
*/
#include <signal.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include "common/config/cm_config.h"
#include "cm/libpq-fe.h"
#include "cm/cm_misc.h"
#include "ctl_common.h"
#include "cm_msg_version_convert.h"
#include "cm/libpq-int.h"
#include "cm_ddb_adapter.h"
#include "ctl_common_res.h"
#include "cm_ip.h"
#define EXPECTED_CLUSTER_START_TIME 120
#define ETCD_START_WAIT 90
#define INSTANCE_START_CONFIRM_TIME 3
#define START_AZ_TRY_HEARTBEAT 20
#define CLUSTER_STATE_CHECK_INTERVAL 10
#define LTRAN_CHECK_INTERVAL 2
#define LTRAN_CHECK_TIMES 30
#define ONLY_GR_RES_CLUSTER 1
static void start_and_check_etcd_cluster();
static void start_cluster(void);
static void start_etcd_cluster(void);
static uint32 get_alive_etcd_node_count();
static void start_and_check_etcd_az(const char* azName);
static void start_etcd_node(uint32 nodeid);
static void check_etcd_cluster_status();
static void start_az(char* azName);
static void start_and_check_etcd_node(uint32 nodeId);
static void start_node(uint32 nodeid);
static void start_datanode_instance_relation(uint32 node, const char *dataPath);
static void start_az_try_more_one(const char* azName);
static void* check_cluster_start_status(void* arg);
static void StartFailQueryAndReport();
static void* start_check(void* arg);
static int start_check_cluster();
static int start_check_az(const char* azName);
static int start_check_node(uint32 node_id_check);
static int start_check_dn_relation(uint32 node, const char *dataPath);
static int start_check_instance(uint32 node_id_check, const char* datapath);
static void ExecuteGsGuc(const char* allAzLists, bool isSingleRep, uint32 repNum, uint32 nodeIndex);
#ifndef ENABLE_MULTIPLE_NODES
static void StartLtranProcess();
static bool CheckLibosKniIsOk();
static int StartLtranProcessByNode(uint32 nodeid);
extern char g_ltranManualStartFile[MAXPGPATH];
extern char g_libnetManualStartFile[MAXPGPATH];
#else
#include "ctl_distribute.h"
extern bool cn_resumes_restart;
#endif
void RunCmdInStartAz(const char* command, uint32 nodeIndex);
static uint32 GetEtcdNumOfAz(const char* azName);
static uint32 GetAliveEtcdNumOfAz(const char* azName);
static int g_cluster_start_status = CM_STATUS_UNKNOWN;
static int g_az_start_status = CM_STATUS_UNKNOWN;
static int g_instance_start_status = CM_STATUS_UNKNOWN;
static int g_node_start_status = CM_STATUS_UNKNOWN;
static int g_dn_relation_start_status = CM_STATUS_UNKNOWN;
static int g_resStartStatus = CM_STATUS_UNKNOWN;
static int g_dn_status = INSTANCE_HA_STATE_UNKONWN;
static StartExitCode g_startExitCode = CM_START_EXIT_INIT;
static int startaz_try_heartbeat = START_AZ_TRY_HEARTBEAT;
static struct timespec g_startTime;
static struct timespec g_endTime;
extern char g_cmData[CM_PATH_LENGTH];
extern char manual_start_file[MAXPGPATH];
extern char instance_manual_start_file[MAXPGPATH];
extern char cluster_manual_starting_file[MAXPGPATH];
extern char etcd_manual_start_file[MAXPGPATH];
extern char minority_az_start_file[MAX_PATH_LEN];
extern char g_minorityAzArbitrateFile[MAX_PATH_LEN];
extern char mpp_env_separate_file[MAXPGPATH];
extern char hosts_path[MAXPGPATH];
extern char g_appPath[MAXPGPATH];
extern char pssh_out_path[MAXPGPATH];
extern char result_path[MAXPGPATH];
extern char sys_log_path[MAXPGPATH];
extern const char* prefix_name;
extern bool got_stop;
extern bool wait_seconds_set;
extern int g_waitSeconds;
extern passwd* pw;
extern bool g_commandRelationship;
extern char* g_command_operation_azName;
extern uint32 g_commandOperationNodeId;
extern uint32 g_nodeId;
extern char* cm_arbitration_mode_set;
extern const char* g_progname;
extern CM_Conn* CmServer_conn;
extern uint32 g_commandOperationInstanceId;
extern char manual_pause_file[MAXPGPATH];
extern char manual_walrecord_file[MAXPGPATH];
static int StartResInstCheck(uint32 instId)
{
if (GetResInstStatus(instId) == CM_RES_STAT_ONLINE) {
return CM_STATUS_NORMAL;
}
return CM_STATUS_UNKNOWN;
}
static void StartResInst(uint32 nodeId, uint32 instId)
{
char instStartFile[MAX_PATH_LEN] = {0};
int ret = snprintf_s(instStartFile, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/instance_manual_start_%u",
g_appPath, instId);
securec_check_intval(ret, (void)ret);
char command[MAX_PATH_LEN] = {0};
if (g_isPauseArbitration) {
ret = snprintf_s(command, MAX_PATH_LEN, MAX_PATH_LEN - 1,
SYSTEMQUOTE "rm -f %s; touch %s < \"%s\" 2>&1" SYSTEMQUOTE,
instStartFile, cluster_manual_starting_file, DEVNULL);
} else {
ret = snprintf_s(command, MAX_PATH_LEN, MAX_PATH_LEN - 1, SYSTEMQUOTE "rm -f %s < \"%s\" 2>&1" SYSTEMQUOTE,
instStartFile, DEVNULL);
}
securec_check_intval(ret, (void)ret);
ret = runCmdByNodeId(command, nodeId);
if (ret != 0) {
write_runlog(DEBUG1, "Failed to start the resource instance with executing the command: command=\"%s\", "
"nodeId=%u, instId=%u, systemReturn=%d, shellReturn=%d, errno=%d.\n",
command, nodeId, instId, ret, SHELL_RETURN_CODE(ret), errno);
}
}
static void StartAllResInstByNode(uint32 nodeId)
{
for (uint32 i = 0; i < CusResCount(); ++i) {
for (uint32 k = 0; k < g_resStatus[i].status.instanceCount; ++k) {
if (g_resStatus[i].status.resStat[k].nodeId != nodeId) {
continue;
}
StartResInst(nodeId, g_resStatus[i].status.resStat[k].cmInstanceId);
}
}
}
status_t StartWholeCluster()
{
int ret;
#ifndef ENABLE_MULTIPLE_NODES
struct stat libnetManualStat = {0};
if (stat(g_libnetManualStartFile, &libnetManualStat) == 0) {
StartLtranProcess();
if (!CheckLibosKniIsOk()) {
write_runlog(ERROR, "start the ltran cluster failed. \n");
return CM_ERROR;
}
write_runlog(LOG, "start the ltran cluster successfully.\n");
}
#endif
if (g_etcd_num > 0) {
write_runlog(LOG, "starting the ETCD cluster.\n");
start_and_check_etcd_cluster();
}
write_runlog(LOG, "checking cluster status.\n");
ret = g_single_node_cluster ? CheckSingleClusterRunningStatus() : CheckClusterRunningStatus();
if (ret != 0) {
return CM_ERROR;
}
write_runlog(LOG, "start cluster. \n");
start_cluster();
return CM_SUCCESS;
}
static status_t WaitCmsPrimaryNormal(CM_Conn **pCmsConn)
{
int times = 0;
while (*pCmsConn == NULL) {
if (times++ > SHARED_STORAGE_MODE_TIMEOUT) {
write_runlog(ERROR, "connect primary timeout.\n");
return CM_ERROR;
}
do_conn_cmserver(false, 0, false, pCmsConn);
cm_sleep(1);
}
return CM_SUCCESS;
}
static bool CheckOfflineInstance(uint32 node)
{
uint32 nodeid = 0;
CM_Conn *pCmsCon = NULL;
if (!IsCmSharedStorageMode()) {
return false;
}
if ((node < 1) || (get_node_index(node) >= g_node_num)) {
write_runlog(ERROR, "node(%u) is invalid, max node num(%u) \n", node, g_node_num);
return false;
}
for (uint32 j = 0; j < g_node_num; j++) {
if (g_node[j].node != node) {
continue;
}
nodeid = j;
break;
}
if (WaitCmsPrimaryNormal(&pCmsCon) != CM_SUCCESS) {
return false;
}
bool result = SetOfflineNode(nodeid, pCmsCon);
ReleaseConn(pCmsCon);
return result;
}
status_t do_start(void)
{
CtlGetCmJsonConf();
int ret;
pthread_t checkStatusThrId;
pthread_t startCheckThrId;
g_startExitCode = CM_START_EXIT_INIT;
#ifndef ENABLE_MULTIPLE_NODES
int nodeNumsInAz = 0;
struct stat libnetManualStat = {0};
int ltranCheckTimes = 0;
(void)atexit(RemoveStartingFile);
#endif
if (g_commandOperationNodeId > 0 && get_node_index(g_commandOperationNodeId) >= g_node_num) {
write_runlog(FATAL, "node_id specified is illegal. \n");
return CM_ERROR;
}
(void)clock_gettime(CLOCK_MONOTONIC, &g_startTime);
(void)getPauseStatus();
(void)getWalrecordMode();
if (g_commandOperationInstanceId == 0 && g_command_operation_azName == NULL && g_commandOperationNodeId == 0) {
#ifdef ENABLE_MULTIPLE_NODES
if (cn_resumes_restart) {
write_stderr(_("%s: enable resuming the fault CN.\n"), g_progname);
remove_stop_resuming_cn_file();
return CM_SUCCESS;
}
#endif
CM_RETURN_IFERR(StartWholeCluster());
} else if (g_command_operation_azName != NULL) {
if (cm_arbitration_mode_set != NULL) {
if (isMinority(cm_arbitration_mode_set)) {
if (g_currentNode->cmServerLevel != 1) {
write_runlog(
FATAL, "The minority AZ starting command can only be executed on nodes with CM Server. \n");
return CM_ERROR;
}
write_runlog(LOG, "Minority cluster! The cluster data may be lost: RPO !=0.\n");
} else if (isMajority(cm_arbitration_mode_set)) {
write_runlog(LOG, "Majority cluster! The minority cluster settings will be clear if they exist.\n");
} else {
write_runlog(FATAL, "invalid cm server arbitration mode.\n");
DoAdvice();
return CM_ERROR;
}
}
#ifndef ENABLE_MULTIPLE_NODES
if (stat(g_libnetManualStartFile, &libnetManualStat) == 0) {
write_runlog(LOG, "start ltran processes. \n");
for (uint32 ii = 0; ii < g_node_num; ii++) {
if (strcmp(g_node[ii].azName, g_command_operation_azName) == 0) {
if (g_node[ii].datanodeCount == 0) {
continue;
}
nodeNumsInAz++;
if (StartLtranProcessByNode(g_node[ii].node) != 0) {
write_runlog(ERROR, "start the ltran processes failed. \n");
return CM_ERROR;
}
}
}
int aliveLtranNum = 0;
ltranCheckTimes = 0;
while (aliveLtranNum < nodeNumsInAz && ltranCheckTimes < LTRAN_CHECK_TIMES) {
aliveLtranNum = 0;
ltranCheckTimes++;
for (uint32 ii = 0; ii < g_node_num; ii++) {
if (strcmp(g_node[ii].azName, g_command_operation_azName) == 0 && g_node[ii].datanodeCount > 0) {
if (CM_STATUS_NORMAL == start_check_instance(ii, "ltran")) {
aliveLtranNum++;
}
}
}
write_runlog(DEBUG1, "ltran processes check %d times\n", ltranCheckTimes);
cm_sleep(LTRAN_CHECK_INTERVAL);
}
if (!CheckLibosKniIsOk()) {
write_runlog(ERROR, "start the ltran processes failed. \n");
return CM_ERROR;
}
write_runlog(LOG, "start the ltran processes successfully. \n");
}
#endif
if (g_etcd_num > 0) {
start_and_check_etcd_az(g_command_operation_azName);
write_runlog(LOG, "checking the ETCD cluster status.\n");
check_etcd_cluster_status();
}
write_runlog(LOG, "start the availability zone: %s. \n", g_command_operation_azName);
start_az(g_command_operation_azName);
} else if (g_commandOperationInstanceId > 0) {
if (CheckResInstInfo(&g_commandOperationNodeId, g_commandOperationInstanceId) != CM_SUCCESS) {
write_runlog(ERROR, "can't do start resource instance, instId:%u.\n", g_commandOperationInstanceId);
return CM_ERROR;
}
write_runlog(LOG, "start resource instance, instId:%u.\n", g_commandOperationInstanceId);
StartAllResInstByNode(g_commandOperationNodeId);
} else if (g_cmData[0] == '\0') {
#ifndef ENABLE_MULTIPLE_NODES
if (stat(g_libnetManualStartFile, &libnetManualStat) == 0) {
write_runlog(LOG, "start ltran process. \n");
uint32 ii = 0;
for (ii = 0; ii < g_node_num; ii++) {
if (g_node[ii].node == g_commandOperationNodeId) {
break;
}
}
if (ii >= g_node_num) {
write_runlog(FATAL, "can't find the nodeid: %u\n", g_commandOperationNodeId);
return CM_ERROR;
}
if (g_node[ii].datanodeCount > 0 && StartLtranProcessByNode(g_commandOperationNodeId) != 0) {
write_runlog(ERROR, "start the ltran process failed. \n");
return CM_ERROR;
}
ltranCheckTimes = 0;
while (CM_STATUS_NORMAL != start_check_instance(ii, "ltran") && ltranCheckTimes < LTRAN_CHECK_TIMES) {
ltranCheckTimes++;
cm_sleep(LTRAN_CHECK_INTERVAL);
write_runlog(DEBUG1, "ltran process checks %d times\n", ltranCheckTimes);
}
if (!CheckLibosKniIsOk()) {
write_runlog(ERROR, "start the ltran process failed. \n");
return CM_ERROR;
}
write_runlog(LOG, "start the ltran process successfully. \n");
}
#endif
start_and_check_etcd_node(g_commandOperationNodeId);
if (g_etcd_num > 0) {
write_runlog(LOG, "checking the ETCD cluster status\n");
check_etcd_cluster_status();
}
write_runlog(LOG, "start the node:%u. \n", g_commandOperationNodeId);
start_node(g_commandOperationNodeId);
} else if (g_commandRelationship) {
if (g_commandOperationNodeId == 0) {
write_runlog(FATAL, "node_id specified is illegal. \n");
return CM_ERROR;
}
if (g_cmData[0] == '\0') {
write_runlog(FATAL, "data path specified is illegal. \n");
return CM_ERROR;
}
write_runlog(LOG, "start relation datanode.\n");
start_datanode_instance_relation(g_commandOperationNodeId, g_cmData);
} else {
if (CheckOfflineInstance(g_commandOperationNodeId)) {
write_runlog(LOG, "the instance(node:%u) is Offline, no need to start.\n", g_commandOperationNodeId);
exit(0);
}
write_runlog(LOG, "start the node:%u,datapath:%s. \n", g_commandOperationNodeId, g_cmData);
start_instance(g_commandOperationNodeId, g_cmData);
}
ret = pthread_create(&checkStatusThrId, NULL, &check_cluster_start_status, NULL);
if (ret != 0) {
write_runlog(FATAL, "failed to create thread to check if cluster started.\n");
return CM_ERROR;
}
ret = pthread_create(&startCheckThrId, NULL, &start_check, NULL);
if (ret != 0) {
write_runlog(FATAL, "failed to create start check thread.\n");
return CM_ERROR;
}
(void)pthread_join(startCheckThrId, NULL);
exit((int)g_startExitCode);
}
* @Description: start ETCD cluster.
* if failed in 30 sec, ETCD cluster should stopped forcely and MPPDB cluster can not be started.
*/
static void start_and_check_etcd_cluster()
{
if (g_etcd_num == 0) {
return;
}
uint32 ii = 0;
start_etcd_cluster();
while (ii < ETCD_START_WAIT) {
(void)sleep(1);
write_runlog(LOG, ".");
if (get_alive_etcd_node_count() > g_etcd_num / 2 && CheckDdbHealth()) {
write_runlog(LOG, "the ETCD cluster starts successfully.\n");
if (g_multi_az_cluster && isMinority(cm_arbitration_mode_set)) {
write_runlog(
FATAL, "The minority AZ starting command can only be executed while ETCD cluster is unhealthy. \n");
exit(1);
}
break;
}
ii++;
}
if (ii == ETCD_START_WAIT) {
write_runlog(ERROR, "failed to start the ETCD cluster.\n");
if (g_multi_az_cluster && isMinority(cm_arbitration_mode_set)) {
az3 has only one etcd/dn/cn/gtm/cms/cma), when etcd start failed, continue to start other instance */
} else {
stop_etcd_cluster();
exit(1);
}
}
}
static void start_cluster(void)
{
if (got_stop) {
return;
}
char command[MAXPGPATH] = {0};
int ret = 0;
init_hosts();
for (uint32 ii = 0; ii < g_node_num; ii++) {
write_runlog(LOG, "start nodeid: %u\n", g_node[ii].node);
}
* in case that cm_ctl can't set start command to etcd or om_monitor can't get start command from etcd, cm_ctl
* need also start cluster by removing cluster_manual_start file through pssh, which has better performance than
* ssh.
*/
if (g_single_node_cluster) {
if (mpp_env_separate_file[0] == '\0') {
ret = snprintf_s(command,
MAXPGPATH,
MAXPGPATH - 1,
SYSTEMQUOTE "source /etc/profile; rm -f %s %s_*" SYSTEMQUOTE,
manual_start_file,
instance_manual_start_file);
} else {
ret = snprintf_s(command,
MAXPGPATH,
MAXPGPATH - 1,
SYSTEMQUOTE "source /etc/profile; source %s; rm -f %s %s_*" SYSTEMQUOTE,
mpp_env_separate_file,
manual_start_file,
instance_manual_start_file);
}
} else {
if (mpp_env_separate_file[0] == '\0') {
ret = snprintf_s(command,
MAXPGPATH,
MAXPGPATH - 1,
SYSTEMQUOTE "source /etc/profile; "
"pssh -i %s -h %s \"rm -f %s %s_*; if [ -f %s ]; then touch %s; fi\" > %s; "
"if [ $? -ne 0 ]; then cat %s; fi; rm -f %s" SYSTEMQUOTE,
PSSH_TIMEOUT_OPTION,
hosts_path,
manual_start_file,
instance_manual_start_file,
manual_pause_file,
cluster_manual_starting_file,
pssh_out_path,
pssh_out_path,
pssh_out_path);
} else {
ret = snprintf_s(command,
MAXPGPATH,
MAXPGPATH - 1,
SYSTEMQUOTE "source /etc/profile;"
"pssh -i %s -h %s \"source %s; rm -f %s %s_*; if [ -f %s ]; then touch %s; fi\" > %s; "
"if [ $? -ne 0 ]; then cat %s; fi; rm -f %s" SYSTEMQUOTE,
PSSH_TIMEOUT_OPTION,
hosts_path,
mpp_env_separate_file,
manual_start_file,
instance_manual_start_file,
manual_pause_file,
cluster_manual_starting_file,
pssh_out_path,
pssh_out_path,
pssh_out_path);
}
}
securec_check_intval(ret, (void)ret);
ret = system(command);
if (ret != 0) {
write_runlog(DEBUG1,
"Failed to start the cluster with executing the command: command=\"%s\","
" nodeId=%u, systemReturn=%d, shellReturn=%d, errno=%d.\n",
command,
g_currentNode->node,
ret,
SHELL_RETURN_CODE(ret),
errno);
}
(void)unlink(hosts_path);
}
static void start_etcd_cluster(void)
{
char command[MAXPGPATH];
int ret = 0;
for (uint32 ii = 0; ii < g_node_num; ii++) {
if (!g_node[ii].etcd) {
continue;
}
ret = snprintf_s(command, MAXPGPATH, MAXPGPATH - 1, SYSTEMQUOTE "rm -f %s < \"%s\" 2>&1" SYSTEMQUOTE,
etcd_manual_start_file, DEVNULL);
securec_check_intval(ret, (void)ret);
ret = RunEtcdCmd(command, ii);
if (ret != 0) {
write_runlog(DEBUG1, "Failed to start the etcd node with executing the command: command=\"%s\","
" nodeId=%u, systemReturn=%d, shellReturn=%d, errno=%d.\n",
command, g_node[ii].node, ret, SHELL_RETURN_CODE(ret), errno);
}
}
}
* @Description: get the count of etcd which is running.
*
* @out: count of etcd which is running
*/
static uint32 get_alive_etcd_node_count()
{
uint32 alive_count = 0;
for (uint32 i = 0; i < g_node_num; i++) {
if (g_node[i].etcd && start_check_instance(i, "etcd") == CM_STATUS_NORMAL) {
alive_count++;
}
}
return alive_count;
}
#ifndef ENABLE_MULTIPLE_NODES
static void StartLtranProcess()
{
char command[MAXPGPATH];
int ret = 0;
uint32 ltranAliveNum = 0;
uint32 dnNodeNums = 0;
int ltranCheckTimes = 0;
write_runlog(LOG, "start the ltran cluster.\n");
for (uint32 ii = 0; ii < g_node_num; ii++) {
if (g_node[ii].datanodeCount > 0) {
dnNodeNums++;
ret = snprintf_s(command,
MAXPGPATH,
MAXPGPATH - 1,
SYSTEMQUOTE "rm -f %s < \"%s\" 2>&1" SYSTEMQUOTE,
g_ltranManualStartFile,
DEVNULL);
securec_check_intval(ret, (void)ret);
if (g_node[ii].node == g_currentNode->node) {
ret = system(command);
} else {
ret = ssh_exec(&g_node[ii], command);
}
if (ret != 0) {
write_runlog(DEBUG1,
"Failed to start the ltran process with executing the command: command=\"%s\","
" nodeId=%u, systemReturn=%d, shellReturn=%d, errno=%d.\n",
command,
g_node[ii].node,
ret,
SHELL_RETURN_CODE(ret),
errno);
}
}
}
while (ltranAliveNum < dnNodeNums && ltranCheckTimes < LTRAN_CHECK_TIMES) {
ltranCheckTimes++;
ltranAliveNum = 0;
for (uint32 ii = 0; ii < g_node_num; ii++) {
if (g_node[ii].datanodeCount > 0 && CM_STATUS_NORMAL == start_check_instance(ii, "ltran")) {
ltranAliveNum++;
}
}
cm_sleep(LTRAN_CHECK_INTERVAL);
write_runlog(LOG, ".");
write_runlog(DEBUG1, "ltran cluster checks %d times\n", ltranCheckTimes);
}
}
static int StartLtranProcessByNode(uint32 nodeid)
{
char command[MAXPGPATH];
int ret;
ret = snprintf_s(command,
MAXPGPATH,
MAXPGPATH - 1,
SYSTEMQUOTE "rm -f %s < \"%s\" 2>&1 &" SYSTEMQUOTE,
g_ltranManualStartFile,
DEVNULL);
securec_check_intval(ret, (void)ret);
ret = runCmdByNodeId(command, nodeid);
if (ret != 0) {
write_runlog(DEBUG1,
"Failed to start the ltran process with executing the command: command=\"%s\","
" nodeId=%u, systemReturn=%d, shellReturn=%d, errno=%d.\n",
command,
nodeid,
ret,
SHELL_RETURN_CODE(ret),
errno);
}
return ret;
}
static bool ExecCheckCmd(const char* printStr, const char* cmd, const uint32 cmdCount)
{
char checkCmd[MAXPGPATH] = {0};
char buf[MAXPGPATH];
if (cmd != NULL) {
int rcs = memcpy_s(checkCmd, MAXPGPATH, cmd, cmdCount);
securec_check_c(rcs, "", "");
write_runlog(DEBUG1, "cheak_libnet %s command is %s.\n", printStr, checkCmd);
}
FILE* fp = popen(checkCmd, "r");
if (fp == NULL) {
write_runlog(ERROR, "popen failed\n");
return false;
}
if (fgets(buf, sizeof(buf), fp) != NULL) {
if (strstr(buf, "success") != NULL) {
(void)pclose(fp);
} else {
(void)pclose(fp);
write_runlog(DEBUG1, "%s cheack failed.\n", printStr);
return false;
}
}
write_runlog(DEBUG1, "%s cheack OK by exec command:%s.\n", printStr, cmd);
return true;
}
static bool CheckLibosKniIsOk()
{
char checkKniCmd[MAXPGPATH] = {0};
char checkIpCmd[MAXPGPATH] = {0};
char checPingCmd[MAXPGPATH] = {0};
const int items = 3;
char* checkItem[items] = {"libnet_kni", "libnet_ip", "libnet_ping"};
uint32 cmdCount = 0;
int idx = 0;
bool checkRes = true;
int rc = strcpy_s(checkKniCmd, MAXPGPATH,
"ifconfig | grep libos_kni > /dev/null;if [ $? == 0 ];then echo success;else echo fail;fi;");
securec_check_errno(rc, (void)rc);
write_runlog(DEBUG1, "cheak_libnet kni command is %s.\n", checkKniCmd);
cmdCount = strlen(checkKniCmd);
checkRes = ExecCheckCmd(checkItem[idx++], checkKniCmd, cmdCount);
if (checkRes == false) {
write_runlog(DEBUG1, "%s cheack failed by exec command:%s.\n", checkItem[idx++], checkKniCmd);
return false;
}
rc = snprintf_s(checkIpCmd,
MAXPGPATH,
MAXPGPATH - 1,
"ifconfig | grep %s > /dev/null;if [ $? == 0 ];then echo success;else echo fail;fi;",
g_currentNode->cmAgentIP);
securec_check_intval(rc, (void)rc);
write_runlog(DEBUG1, "cheak_libnet ip command is %s.\n", checkIpCmd);
cmdCount = strlen(checkIpCmd);
checkRes = ExecCheckCmd(checkItem[idx++], checkIpCmd, cmdCount);
if (checkRes == false) {
write_runlog(DEBUG1, "%s cheack failed by exec command:%s.\n", checkItem[idx++], checkIpCmd);
return false;
}
const char *ping_ip = *g_currentNode->cmAgentIP;
const char *pingStr = GetPingStr(GetIpVersion(ping_ip));
rc = snprintf_s(checPingCmd,
MAXPGPATH,
MAXPGPATH - 1,
"%s -c 1 -w 1 %s > /dev/null;if [ $? == 0 ];then echo success;else echo fail;fi;",
pingStr, ping_ip);
securec_check_intval(rc, (void)rc);
write_runlog(DEBUG1, "cheak_libnet ping command is %s.\n", checPingCmd);
cmdCount = strlen(checPingCmd);
checkRes = ExecCheckCmd(checkItem[idx], checPingCmd, cmdCount);
if (checkRes == false) {
write_runlog(DEBUG1, "%s cheack failed by exec command:%s.\n", checkItem[idx], checPingCmd);
return false;
}
write_runlog(DEBUG1, "libos_kni and libos_ip cheack OK.\n");
return true;
}
#endif
* @Description: start ETCD node in AZ.
* if failed in 30 sec, ETCD node should stopped forcely.
*/
static void start_and_check_etcd_az(const char* azName)
{
uint32 ii;
uint32 etcd_count = 0;
uint32 etcd_alive_count;
uint32 wait_count = 0;
write_runlog(LOG, "start ETCD in availability zone, availability zone name: %s.\n", azName);
for (ii = 0; ii < g_node_num; ii++) {
if (strcmp(g_node[ii].azName, azName) == 0) {
if (g_node[ii].etcd) {
start_etcd_node(g_node[ii].node);
etcd_count++;
}
}
}
while (wait_count < ETCD_START_WAIT) {
(void)sleep(1);
write_runlog(LOG, ".");
etcd_alive_count = 0;
for (ii = 0; ii < g_node_num; ii++) {
if (strcmp(g_node[ii].azName, azName) == 0) {
if (g_node[ii].etcd) {
if (start_check_instance(ii, "etcd") == CM_STATUS_NORMAL) {
etcd_alive_count++;
}
}
}
}
if (etcd_alive_count == etcd_count) {
write_runlog(LOG, "the ETCD instances in this availability zone start successfully.\n");
break;
}
wait_count++;
}
if (wait_count == ETCD_START_WAIT) {
write_runlog(LOG, "failed to start all ETCD in this availability zone.\n");
for (ii = 0; ii < g_node_num; ii++) {
if (strcmp(g_node[ii].azName, azName) == 0) {
if (g_node[ii].etcd) {
stop_etcd_node(g_node[ii].node);
}
}
}
}
}
* start etcd of specified node
*/
static void start_etcd_node(uint32 nodeid)
{
char command[MAXPGPATH] = {0};
int ret;
write_runlog(LOG, "start ETCD in node, nodeid: %u\n", nodeid);
ret = snprintf_s(command, MAXPGPATH, MAXPGPATH - 1, SYSTEMQUOTE "rm -f %s < \"%s\" 2>&1 &" SYSTEMQUOTE,
etcd_manual_start_file, DEVNULL);
securec_check_intval(ret, (void)ret);
ret = runCmdByNodeId(command, nodeid);
if (ret != 0) {
write_runlog(DEBUG1, "Failed to start the etcd node with executing the command: command=\"%s\","
" nodeId=%u, systemReturn=%d, shellReturn=%d, errno=%d.\n", command, nodeid, ret, SHELL_RETURN_CODE(ret),
errno);
}
}
#ifndef ENABLE_MULTIPLE_NODES
static int StartCheckLtranProcess(uint32 nodeIdCheck, const char* datapath)
{
int ret;
int result = -1;
char resultPath[MAXPGPATH] = {0};
int fd;
bool flag = false;
char checkLtranProcessResultPath[MAX_PATH_LEN] = {0};
if (strcmp(datapath, "ltran") == 0) {
ret = GetHomePath(resultPath, sizeof(resultPath));
if (ret != EOK) {
return CM_STATUS_UNKNOWN;
}
ret = snprintf_s(checkLtranProcessResultPath, MAX_PATH_LEN, MAX_PATH_LEN - 1,
"%s/bin/checkLtranProcessResult-XXXXXX", resultPath);
securec_check_intval(ret, (void)ret);
fd = mkstemp(checkLtranProcessResultPath);
if (fd <= 0) {
write_runlog(ERROR, "failed to create the ltran process check result file: errno=%d.\n", errno);
flag = true;
}
char command[MAXPGPATH] = {0};
if (nodeIdCheck == g_nodeId && strstr(g_appPath, "/var/chroot") == NULL) {
ret = snprintf_s(command,
MAXPGPATH,
MAXPGPATH - 1,
"cm_ctl check -B ltran -T ltran \n echo -e $? > %s",
flag ? result_path : checkLtranProcessResultPath);
securec_check_intval(ret, (void)ret);
exec_system(command, &result, flag ? result_path : checkLtranProcessResultPath);
} else {
ret = snprintf_s(command,
MAXPGPATH,
MAXPGPATH - 1,
"cm_ctl check -B ltran -T ltran\" > /dev/null 2>&1; echo -e $? > %s",
flag ? result_path : checkLtranProcessResultPath);
securec_check_intval(ret, (void)ret);
exec_system_ssh(
nodeIdCheck, command, &result, flag ? result_path : checkLtranProcessResultPath, mpp_env_separate_file);
}
if (fd > 0) {
(void)close(fd);
}
(void)unlink(flag ? result_path : checkLtranProcessResultPath);
return (result == PROCESS_RUNNING) ? CM_STATUS_NORMAL : CM_STATUS_UNKNOWN;
}
return CM_STATUS_UNKNOWN;
}
#endif
static int start_check_instance(uint32 node_id_check, const char* datapath)
{
int result = -1;
CM_Conn *pCmsCon = NULL;
if (checkStaticConfigExist(node_id_check) != 0) {
write_runlog(
ERROR, "the cluster static config file does not exist on the node: %u.\n", g_node[node_id_check].node);
write_runlog(FATAL, "failed to check the instance running status: %s.\n", datapath);
return CM_STATUS_UNKNOWN;
}
if (g_node[node_id_check].coordinate == 1 && strncmp(datapath, g_node[node_id_check].DataPath, MAX_PATH_LEN) == 0) {
CheckCnNodeStatusById(node_id_check, &result);
return (result == PROCESS_RUNNING) ? CM_STATUS_NORMAL : CM_STATUS_UNKNOWN;
}
for (uint32 ii = 0; ii < g_node[node_id_check].datanodeCount; ii++) {
char* local_data_path = g_node[node_id_check].datanode[ii].datanodeLocalDataPath;
if (strncmp(datapath, local_data_path, MAX_PATH_LEN) != 0) {
continue;
}
if (IsCmSharedStorageMode()) {
if (WaitCmsPrimaryNormal(&pCmsCon) != CM_SUCCESS) {
return CM_STATUS_UNKNOWN;
}
if (SetOfflineNode(node_id_check, pCmsCon)) {
ReleaseConn(pCmsCon);
return CM_STATUS_NORMAL;
}
ReleaseConn(pCmsCon);
}
CheckDnNodeStatusById(node_id_check, &result, ii);
return (result == PROCESS_RUNNING) ? CM_STATUS_NORMAL : CM_STATUS_UNKNOWN;
}
if (g_node[node_id_check].gtm == 1 &&
strncmp(datapath, g_node[node_id_check].gtmLocalDataPath, MAX_PATH_LEN) == 0) {
CheckGtmNodeStatusById(node_id_check, &result);
return (result == PROCESS_RUNNING) ? CM_STATUS_NORMAL : CM_STATUS_UNKNOWN;
}
if (strcmp(datapath, "etcd") == 0) {
int ret;
char resultPath[MAXPGPATH] = {0};
char checkEtcdProcessResultPath[MAX_PATH_LEN] = {0};
int fd;
bool flag = false;
ret = GetHomePath(resultPath, sizeof(resultPath));
if (ret != EOK) {
return CM_STATUS_UNKNOWN;
}
ret = snprintf_s(checkEtcdProcessResultPath, MAX_PATH_LEN, MAX_PATH_LEN - 1,
"%s/bin/checkEtcdProcessResult-XXXXXX", resultPath);
securec_check_intval(ret, (void)ret);
fd = mkstemp(checkEtcdProcessResultPath);
if (fd <= 0) {
write_runlog(ERROR, "failed to create the etcd process check result file: errno=%d.\n", errno);
flag = true;
}
if (g_node[node_id_check].etcd == 1) {
char command[MAXPGPATH] = {0};
if (node_id_check == g_nodeId && strstr(g_appPath, "/var/chroot") == NULL) {
ret = snprintf_s(command, MAXPGPATH, MAXPGPATH - 1,
"cm_ctl check -B etcd -T %s/bin/etcd \n echo -e $? > %s",
g_appPath, flag ? result_path : checkEtcdProcessResultPath);
securec_check_intval(ret, (void)ret);
exec_system(command, &result, flag ? result_path : checkEtcdProcessResultPath);
} else {
ret = snprintf_s(command, MAXPGPATH, MAXPGPATH - 1,
"cm_ctl check -B etcd -T %s/bin/etcd\" > /dev/null 2>&1; echo -e $? > %s",
g_appPath, flag ? result_path : checkEtcdProcessResultPath);
securec_check_intval(ret, (void)ret);
exec_system_ssh(node_id_check, command, &result, flag ? result_path : checkEtcdProcessResultPath,
mpp_env_separate_file);
}
if (fd > 0) {
(void)close(fd);
}
(void)unlink(flag ? result_path : checkEtcdProcessResultPath);
return (result == PROCESS_RUNNING) ? CM_STATUS_NORMAL : CM_STATUS_UNKNOWN;
}
if (fd > 0) {
(void)close(fd);
}
(void)unlink(flag ? result_path : checkEtcdProcessResultPath);
}
#ifndef ENABLE_MULTIPLE_NODES
return StartCheckLtranProcess(node_id_check, datapath);
#endif
return CM_STATUS_UNKNOWN;
}
* @Description: check ETCD cluster status.
* if failed in 30 sec, MPPDB node can not be started.
*/
static void check_etcd_cluster_status()
{
if (g_etcd_num == 0) {
return;
}
uint32 ii;
* etcd node just started up and etcd cluster can't be healthy right now.
* so cm_ctl check etcd cluster status 30 times.
*/
for (ii = 0; ii < ETCD_START_WAIT; ii++) {
(void)sleep(1);
write_runlog(LOG, ".");
if (isMinority(cm_arbitration_mode_set) &&
GetAliveEtcdNumOfAz(g_command_operation_azName) == GetEtcdNumOfAz(g_command_operation_azName)) {
write_runlog(DEBUG1, "The minority AZ starting check ETCD cluster finished.\n");
break;
}
if (CheckDdbHealth()) {
write_runlog(LOG, "check ETCD cluster finished.\n");
if (g_multi_az_cluster && isMinority(cm_arbitration_mode_set)) {
write_runlog(
FATAL, "The minority AZ starting command can only be executed while ETCD cluster is unhealthy. \n");
exit(1);
}
break;
}
}
if (ii == ETCD_START_WAIT) {
write_runlog(ERROR, "ETCD cluster is unhealthy. \n");
if (g_multi_az_cluster && !isMinority(cm_arbitration_mode_set)) {
exit(1);
}
}
}
static void MinorityStartPreSetGTM(uint32 nodeIdx)
{
char command[MAXPGPATH];
char command_opts[MAXPGPATH];
int ret;
for (unsigned int i = 0; i < g_node_num; i++) {
if (strcmp(g_node[i].azName, g_node[nodeIdx].azName) != 0 || i == nodeIdx) {
continue;
}
if (g_node[i].gtm == 1) {
if (g_currentNode->node == g_node[nodeIdx].node) {
ret = snprintf_s(command_opts,
MAXPGPATH,
MAXPGPATH - 1,
"gs_guc reload -Z gtm -D %s -c \"active_host = '%s'\" -c \"active_port = '%u'\"",
g_node[nodeIdx].gtmLocalDataPath,
g_node[i].gtmLocalHAIP,
g_node[i].gtmLocalHAPort);
} else {
ret = snprintf_s(command_opts,
MAXPGPATH,
MAXPGPATH - 1,
"gs_guc reload -Z gtm -D %s -c \\\"active_host = '%s'\\\" -c \\\"active_port = '%u'\\\"",
g_node[nodeIdx].gtmLocalDataPath,
g_node[i].gtmLocalHAIP,
g_node[i].gtmLocalHAPort);
}
securec_check_intval(ret, (void)ret);
ret = snprintf_s(command,
MAXPGPATH,
MAXPGPATH - 1,
SYSTEMQUOTE "%s > %s 2>&1 &" SYSTEMQUOTE,
command_opts,
DEVNULL);
securec_check_intval(ret, (void)ret);
RunCmdInStartAz(command, nodeIdx);
break;
}
}
if (g_currentNode->node == g_node[nodeIdx].node) {
ret = snprintf_s(command_opts, MAXPGPATH, MAXPGPATH - 1, "gs_guc reload -Z gtm -D %s -c \"standby_only = 1\"",
g_node[nodeIdx].gtmLocalDataPath);
} else {
ret = snprintf_s(command_opts, MAXPGPATH, MAXPGPATH - 1,
"gs_guc reload -Z gtm -D %s -c \\\"standby_only = 1\\\"", g_node[nodeIdx].gtmLocalDataPath);
}
securec_check_intval(ret, (void)ret);
ret = snprintf_s(command, MAXPGPATH, MAXPGPATH - 1, SYSTEMQUOTE "%s > %s 2>&1 &" SYSTEMQUOTE,
command_opts, DEVNULL);
securec_check_intval(ret, (void)ret);
RunCmdInStartAz(command, nodeIdx);
}
static void MinorityStartPreSetServer(uint32 nodeIdx)
{
char command[MAXPGPATH];
char command_opts[MAXPGPATH];
cm_start_mode mode;
int ret;
mode = (g_currentNode->node == g_node[nodeIdx].node) ? MINORITY_START : OTHER_MINORITY_START;
ret = snprintf_s(command_opts, MAXPGPATH, MAXPGPATH - 1, "echo -e \'%d\' > %s; chmod 600 %s",
mode, minority_az_start_file, minority_az_start_file);
securec_check_intval(ret, (void)ret);
ret = snprintf_s(
command, MAXPGPATH, MAXPGPATH - 1, SYSTEMQUOTE "%s > %s 2>&1 &" SYSTEMQUOTE, command_opts, DEVNULL);
securec_check_intval(ret, (void)ret);
RunCmdInStartAz(command, nodeIdx);
if (mode == MINORITY_START) {
ret = snprintf_s(command_opts, MAXPGPATH, MAXPGPATH - 1, "touch %s; chmod 600 %s",
g_minorityAzArbitrateFile, g_minorityAzArbitrateFile);
securec_check_intval(ret, (void)ret);
ret = snprintf_s(
command, MAXPGPATH, MAXPGPATH - 1, SYSTEMQUOTE "%s > %s 2>&1 &" SYSTEMQUOTE, command_opts, DEVNULL);
securec_check_intval(ret, (void)ret);
RunCmdInStartAz(command, nodeIdx);
}
}
static void MinorityStartPreSet(uint32 nodeIdx, uint32 repNumInAZ)
{
if (g_node[nodeIdx].cmServerLevel == 1) {
MinorityStartPreSetServer(nodeIdx);
}
if (g_node[nodeIdx].gtm == 1) {
MinorityStartPreSetGTM(nodeIdx);
}
ExecuteGsGuc(NULL, (repNumInAZ == 1), repNumInAZ, nodeIdx);
}
static void MajorityStartPreSetServer(uint32 nodeIdx)
{
char command[MAXPGPATH];
char command_opts[MAXPGPATH];
int ret;
ret = snprintf_s(command_opts, MAXPGPATH, MAXPGPATH - 1, "rm -f %s;", minority_az_start_file);
securec_check_intval(ret, (void)ret);
ret = snprintf_s(command, MAXPGPATH, MAXPGPATH - 1, SYSTEMQUOTE "%s > %s 2>&1 &" SYSTEMQUOTE,
command_opts, DEVNULL);
securec_check_intval(ret, (void)ret);
RunCmdInStartAz(command, nodeIdx);
}
static void MajorityStartPreSetGTM(uint32 nodeIdx)
{
char command[MAXPGPATH];
char command_opts[MAXPGPATH];
int ret;
if (g_currentNode->node == g_node[nodeIdx].node) {
ret = snprintf_s(command_opts,
MAXPGPATH,
MAXPGPATH - 1,
"gs_guc reload -Z gtm -D %s -c \"standby_only = 0\" -c \"active_host = ''\" "
"-c \"active_port = '1'\"",
g_node[nodeIdx].gtmLocalDataPath);
} else {
ret = snprintf_s(command_opts,
MAXPGPATH,
MAXPGPATH - 1,
"gs_guc reload -Z gtm -D %s -c \\\"standby_only = 0\\\" -c \\\"active_host = ''\\\" "
"-c \\\"active_port = '1'\\\"",
g_node[nodeIdx].gtmLocalDataPath);
}
securec_check_intval(ret, (void)ret);
ret = snprintf_s(
command, MAXPGPATH, MAXPGPATH - 1, SYSTEMQUOTE "%s > %s 2>&1 &" SYSTEMQUOTE, command_opts, DEVNULL);
securec_check_intval(ret, (void)ret);
RunCmdInStartAz(command, nodeIdx);
}
static void MajorityStartPreSet(uint32 nodeIdx, uint32 repNumInAZ, const char *azAllStr)
{
if (g_node[nodeIdx].cmServerLevel == 1) {
MajorityStartPreSetServer(nodeIdx);
}
if (g_node[nodeIdx].gtm == 1) {
MajorityStartPreSetGTM(nodeIdx);
}
ExecuteGsGuc(azAllStr, (repNumInAZ == 1), g_dn_replication_num, nodeIdx);
}
static void StartAz4Force(const char* azName)
{
const uint32 maxAzNum = 5;
char *azLists[maxAzNum];
int rc = memset_s(azLists, sizeof(char *) * maxAzNum, 0, sizeof(char *) * maxAzNum);
securec_check_errno(rc, (void)rc);
uint32 mirrorId, repNumInAZ = 0;
for (uint32 i = 0; i < g_node_num; i++) {
if (g_node[i].datanodeCount == 0) {
continue;
}
for (uint32 j = 0; j < maxAzNum; j++) {
if (azLists[j] == NULL) {
azLists[j] = g_node[i].azName;
break;
} else if (strcmp(g_node[i].azName, azLists[j]) == 0) {
break;
}
}
if (strcmp(g_node[i].azName, azName) == 0) {
for (uint32 j = 0; j < g_node[i].datanodeCount; j++) {
if (repNumInAZ == 0) {
mirrorId = g_node[i].datanode[j].datanodeMirrorId;
repNumInAZ++;
} else if (mirrorId == g_node[i].datanode[j].datanodeMirrorId) {
repNumInAZ++;
}
}
}
}
char azAllStr[maxAzNum * CM_AZ_NAME] = {0};
for (uint32 i = 0; azLists[i] != NULL; i++) {
if (i > 0) {
rc = strncat_s(azAllStr, (maxAzNum * CM_AZ_NAME), ", ", strlen(", "));
securec_check_errno(rc, (void)rc);
}
rc = strncat_s(azAllStr, (maxAzNum * CM_AZ_NAME), azLists[i], strlen(azLists[i]));
securec_check_errno(rc, (void)rc);
}
for (uint32 ii = 0; ii < g_node_num; ii++) {
if (strcmp(g_node[ii].azName, azName) == 0) {
if (isMinority(cm_arbitration_mode_set)) {
MinorityStartPreSet(ii, repNumInAZ);
} else if (isMajority(cm_arbitration_mode_set)) {
MajorityStartPreSet(ii, repNumInAZ, azAllStr);
}
write_runlog(LOG, "start node, nodeid: %u\n", g_node[ii].node);
start_node(g_node[ii].node);
}
}
}
* find the node info according to availability zone name
*/
static void start_az(char *azName)
{
int rc = 0;
write_runlog(LOG, "start instances excepts ETCD in availability zone, availability zone name: %s.\n", azName);
if (!isMinority(cm_arbitration_mode_set)) {
char etcdStopNodesKey[MAXPGPATH] = {0};
char etcdStopNodesValue[MAXPGPATH] = {0};
status_t res;
rc = snprintf_s(etcdStopNodesKey, MAXPGPATH, MAXPGPATH - 1, "/%s/command/%d/az_stop_nodes", pw->pw_name, 0);
securec_check_intval(rc, (void)rc);
rc = snprintf_s(etcdStopNodesValue, MAXPGPATH, MAXPGPATH - 1, "%d", 0);
securec_check_intval(rc, (void)rc);
res = SendKVToCms(etcdStopNodesKey, etcdStopNodesValue, "startAz");
if (res != CM_SUCCESS) {
write_runlog(DEBUG1, "etcd clear stop_nodes failed, ETCD set error.\n\n");
} else {
write_runlog(DEBUG1, "etcd clear stop_nodes successfully.\n");
}
rc = snprintf_s(etcdStopNodesKey, MAXPGPATH, MAXPGPATH - 1, "/%s/command/az_stop_nodes_num", pw->pw_name);
securec_check_intval(rc, (void)rc);
rc = snprintf_s(etcdStopNodesValue, MAXPGPATH, MAXPGPATH - 1, "%d", 0);
securec_check_intval(rc, (void)rc);
res = SendKVToCms(etcdStopNodesKey, etcdStopNodesValue, "startAz");
if (res != CM_SUCCESS) {
write_runlog(DEBUG1, "etcd clear stop_nodes_num failed, ETCD set error.\n");
} else {
write_runlog(DEBUG1, "etcd clear stop_nodes_num successfully.\n");
}
}
StartAz4Force(azName);
}
* @Description: start ETCD node.
* if failed in 30 sec, ETCD node should stopped forcely.
*/
static void start_and_check_etcd_node(uint32 nodeId)
{
uint32 ii;
for (ii = 0; ii < g_node_num; ii++) {
if (g_node[ii].node == nodeId) {
break;
}
}
if (ii >= g_node_num) {
write_runlog(FATAL, "can't find the nodeid: %u\n", nodeId);
exit(1);
}
if (g_node[ii].etcd) {
start_etcd_node(nodeId);
time_t startTime = time(NULL);
while (time(NULL) - startTime < static_cast<time_t>(ETCD_START_WAIT)) {
(void)sleep(1);
write_runlog(LOG, ".");
if (start_check_instance(ii, "etcd") == CM_STATUS_NORMAL) {
write_runlog(LOG, "the ETCD instance in this node starts successfully done.\n");
return;
}
}
write_runlog(LOG, "failed to start the ETCD node.\n");
stop_etcd_node(nodeId);
}
}
* find the node info according to nodeid
*/
static void start_node(uint32 nodeid)
{
if (got_stop) {
return;
}
char command[MAXPGPATH];
uint32 ii;
errno_t rc;
if (g_isPauseArbitration) {
rc = snprintf_s(command, MAXPGPATH, MAXPGPATH - 1,
SYSTEMQUOTE "rm -f %s %s %s_*; touch %s < \"%s\" 2>&1 &" SYSTEMQUOTE,
manual_start_file, etcd_manual_start_file,
instance_manual_start_file, cluster_manual_starting_file, DEVNULL);
} else {
rc = snprintf_s(command, MAXPGPATH, MAXPGPATH - 1,
SYSTEMQUOTE "rm -f %s %s %s_* < \"%s\" 2>&1 &" SYSTEMQUOTE,
manual_start_file, etcd_manual_start_file,
instance_manual_start_file, DEVNULL);
}
securec_check_intval(rc, (void)rc);
if (nodeid == g_currentNode->node) {
rc = system(command);
} else {
for (ii = 0; ii < g_node_num; ii++) {
if (g_node[ii].node == nodeid) {
break;
}
}
if (ii < g_node_num) {
rc = ssh_exec(&g_node[ii], command);
} else {
write_runlog(ERROR, "Could not find the node in the cluster by the node id %u.\n", nodeid);
exit(1);
}
}
if (rc != 0) {
write_runlog(DEBUG1,
"Failed to start the node with executing the command: command=\"%s\","
" nodeId=%u, systemReturn=%d, shellReturn=%d, errno=%d.\n",
command,
nodeid,
rc,
SHELL_RETURN_CODE(rc),
errno);
}
}
static void StartInstanceByInstanceid(uint32 instanceId)
{
for (uint32 i = 0; i < g_node_num; i++) {
for (uint32 j = 0; j < g_node[i].datanodeCount; j++) {
if (instanceId == g_node[i].datanode[j].datanodeId) {
start_instance(g_node[i].node, g_node[i].datanode[j].datanodeLocalDataPath);
write_runlog(LOG,
"start the node:%u,datapath:%s. \n",
g_node[i].node,
g_node[i].datanode[j].datanodeLocalDataPath);
return;
}
}
}
}
static void start_datanode_instance_relation(uint32 node, const char* dataPath)
{
int ret;
uint32 instanceId;
cm_to_ctl_get_datanode_relation_ack getInstanceMsg = {0};
ret = GetDatanodeRelationInfo(node, dataPath, &getInstanceMsg);
if (ret == -1) {
write_runlog(ERROR, "can not get datanode information.\n");
exit(1);
}
for (int i = 0; i < CM_PRIMARY_STANDBY_MAX_NUM; i++) {
instanceId = getInstanceMsg.instanceMember[i].instanceId;
if (instanceId != 0) {
StartInstanceByInstanceid(instanceId);
}
}
}
void start_instance(uint32 nodeid, const char* datapath)
{
int instance_type;
uint32 instanceId;
char command[MAXPGPATH];
int nRet;
nRet = FindInstanceIdAndType(nodeid, datapath, &instanceId, &instance_type);
if (nRet != 0) {
write_runlog(FATAL, "can't find the node_id:%u, data_path:%s.\n", nodeid, datapath);
exit(1);
}
if (instance_type == INSTANCE_TYPE_DATANODE && CheckOfflineInstance(nodeid)) {
write_runlog(DEBUG1, "the instance(node:%u) is Offline, no need to start.\n", nodeid);
return;
}
if (g_isPauseArbitration) {
nRet = snprintf_s(command, MAXPGPATH, MAXPGPATH - 1,
SYSTEMQUOTE "rm -f %s_%u; touch %s < \"%s\" 2>&1 &" SYSTEMQUOTE,
instance_manual_start_file, instanceId, cluster_manual_starting_file, DEVNULL);
} else {
nRet = snprintf_s(command, MAXPGPATH, MAXPGPATH - 1, SYSTEMQUOTE "rm -f %s_%u < \"%s\" 2>&1 &" SYSTEMQUOTE,
instance_manual_start_file, instanceId, DEVNULL);
}
securec_check_intval(nRet, (void)nRet);
nRet = runCmdByNodeId(command, nodeid);
if (nRet != 0) {
write_runlog(DEBUG1, "Failed to start the instance with executing the command: command=\"%s\","
" nodeId=%u, dataPath=\"%s\", systemReturn=%d, shellReturn=%d, errno=%d.\n",
command, nodeid, datapath, nRet, SHELL_RETURN_CODE(nRet), errno);
}
}
void RemoveStartingFile()
{
int ret;
char command[MAX_COMMAND_LEN] = {0};
if (!g_isPauseArbitration) {
exit(0);
}
init_hosts();
if (mpp_env_separate_file[0] == '\0') {
ret = snprintf_s(command,
MAX_COMMAND_LEN,
MAX_COMMAND_LEN - 1,
SYSTEMQUOTE "source /etc/profile;pssh -i %s -h %s \"rm -f %s\" > %s; "
"if [ $? -ne 0 ]; then cat %s; fi; rm -f %s" SYSTEMQUOTE,
PSSH_TIMEOUT_OPTION,
hosts_path,
cluster_manual_starting_file,
pssh_out_path,
pssh_out_path,
pssh_out_path);
} else {
ret = snprintf_s(command,
MAX_COMMAND_LEN,
MAX_COMMAND_LEN - 1,
SYSTEMQUOTE "source /etc/profile;pssh -i %s -h %s \"source %s;rm -f %s\" > %s; "
"if [ $? -ne 0 ]; then cat %s; fi; rm -f %s" SYSTEMQUOTE,
PSSH_TIMEOUT_OPTION,
hosts_path,
mpp_env_separate_file,
cluster_manual_starting_file,
pssh_out_path,
pssh_out_path,
pssh_out_path);
}
securec_check_intval(ret, (void)ret);
ret = system(command);
if (ret != 0) {
write_runlog(DEBUG1,
"Failed to delete the startingFile with executing the command: command=\"%s\","
" nodeId=%u, systemReturn=%d, shellReturn=%d, errno=%d.\n",
command,
g_currentNode->node,
ret,
SHELL_RETURN_CODE(ret),
errno);
}
exit(0);
}
static void ContinueCheckClsStatus(long *startingTime)
{
(void)sleep(1);
(void)clock_gettime(CLOCK_MONOTONIC, &g_endTime);
*startingTime = (g_endTime.tv_sec - g_startTime.tv_sec);
if (*startingTime > EXPECTED_CLUSTER_START_TIME && *startingTime % CLUSTER_STATE_CHECK_INTERVAL == 0) {
write_runlog(DEBUG1, "starting exceeds 2 mins, instance status:g_cluster_start_status=%d,"
"g_az_start_status=%d, g_node_start_status=%d, g_instance_start_status=%d\n",
g_cluster_start_status, g_az_start_status, g_node_start_status, g_instance_start_status);
}
}
static void* check_cluster_start_status(void* arg)
{
int count = 0;
long startingTime = 0;
if (!wait_seconds_set) {
g_waitSeconds = caculate_default_timeout(START_COMMAND);
}
while (startingTime < g_waitSeconds) {
if (g_startExitCode != CM_START_EXIT_INIT) {
ContinueCheckClsStatus(&startingTime);
continue;
}
if (g_cluster_start_status == CM_STATUS_NORMAL) {
write_runlog(LOG, "start cluster successfully.\n");
g_startExitCode = CM_START_EXIT_SUCCESS;
continue;
} else if (g_cluster_start_status == CM_STATUS_NORMAL_WITH_CN_DELETED) {
write_runlog(LOG, "start cluster successfully. There is a coordinator that has been deleted. \n");
g_startExitCode = CM_START_EXIT_SUCCESS;
continue;
} else if (g_az_start_status == CM_STATUS_NORMAL) {
for (uint32 ii = 0; ii < g_node_num; ii++) {
if (g_command_operation_azName != NULL && strcmp(g_node[ii].azName, g_command_operation_azName) == 0) {
write_runlog(LOG, "start node successfully, nodeid: %u. \n", g_node[ii].node);
}
}
write_runlog(LOG, "start availability zone successfully.\n");
g_startExitCode = CM_START_EXIT_SUCCESS;
continue;
} else if (g_az_start_status == CM_STATUS_NORMAL_WITH_CN_DELETED) {
for (uint32 ii = 0; ii < g_node_num; ii++) {
if (g_command_operation_azName != NULL && strcmp(g_node[ii].azName, g_command_operation_azName) == 0) {
write_runlog(LOG, "start node successfully, nodeid: %u. \n", g_node[ii].node);
}
}
write_runlog(LOG, "start availability zone successfully. There is a coordinator that has been deleted. \n");
g_startExitCode = CM_START_EXIT_SUCCESS;
continue;
} else if (g_node_start_status == CM_STATUS_NORMAL) {
write_runlog(LOG, "start node successfully.\n");
g_startExitCode = CM_START_EXIT_SUCCESS;
continue;
} else if (g_node_start_status == CM_STATUS_NORMAL_WITH_CN_DELETED) {
write_runlog(LOG, "start node successfully. There is a coordinator that has been deleted. \n");
g_startExitCode = CM_START_EXIT_SUCCESS;
continue;
} else if (g_instance_start_status == CM_STATUS_NORMAL) {
* CM Client found the instance running, but maybe it quit immediately after startup.
* so only if CM Client found the instance running for 3 times, instance has started.
*/
count++;
if (count > INSTANCE_START_CONFIRM_TIME) {
write_runlog(LOG, "start instance successfully.\n");
g_startExitCode = CM_START_EXIT_SUCCESS;
continue;
}
} else if (g_dn_relation_start_status == CM_STATUS_NORMAL) {
write_runlog(LOG, "start relation datanodes successfully(node:%u, path:%s).\n",
g_commandOperationNodeId, g_cmData);
g_startExitCode = CM_START_EXIT_SUCCESS;
continue;
} else if (g_resStartStatus == CM_STATUS_NORMAL) {
write_runlog(LOG, "start resource instance successfully(nodeId:%u, instId:%u).\n",
g_commandOperationNodeId, g_commandOperationInstanceId);
g_startExitCode = CM_START_EXIT_SUCCESS;
continue;
} else if (g_dn_status == INSTANCE_HA_STATE_READ_ONLY) {
write_runlog(ERROR,
"start cluster failed!\n\n"
"HINT: Some nodes are in ReadOnly mode.\n"
"To identify which nodes are ReadOnly, use the following command:\n"
"\'cm_ctl query -Cvipdw\'\n");
g_startExitCode = CM_START_EXIT_SUCCESS;
continue;
} else {
count = 0;
}
ContinueCheckClsStatus(&startingTime);
write_runlog(LOG, ".");
}
StartFailQueryAndReport();
if ((g_command_operation_azName == NULL) && !g_commandOperationNodeId && (g_cmData[0] == '\0') &&
g_cluster_start_status != CM_STATUS_NORMAL && g_cluster_start_status != CM_STATUS_NORMAL_WITH_CN_DELETED) {
write_runlog(ERROR,
"start cluster failed in (%d)s!\n\n"
"HINT: Maybe the cluster is continually being started in the background.\n"
"You can wait for a while and check whether the cluster starts, or increase the value of parameter \"-t\", "
"e.g -t 600.\n",
g_waitSeconds);
} else if (!g_commandOperationNodeId && (g_cmData[0] == '\0') && g_az_start_status != CM_STATUS_NORMAL &&
g_az_start_status != CM_STATUS_NORMAL_WITH_CN_DELETED) {
write_runlog(ERROR,
"start availability zone failed in (%d)s!\n\n"
"HINT: Maybe the availability zone is continually being started in the background.\n"
"You can wait for a while and check whether the availability zone starts, or increase the value of "
"parameter \"-t\", e.g -t 600.\n",
g_waitSeconds);
} else if ((g_commandOperationNodeId > 0) && (g_commandOperationInstanceId > 0) &&
(g_resStartStatus != CM_STATUS_NORMAL)) {
write_runlog(ERROR,
"start resource instance failed in (%d)s!\n\n"
"HINT: Maybe the node is continually being started in the background.\n"
"You can wait for a while and check whether the node starts, or increase the value of parameter \"-t\", "
"e.g -t 600.\n",
g_waitSeconds);
} else if ((g_cmData[0] == '\0') && g_node_start_status != CM_STATUS_NORMAL &&
g_node_start_status != CM_STATUS_NORMAL_WITH_CN_DELETED) {
write_runlog(ERROR,
"start node failed in (%d)s!\n\n"
"HINT: Maybe the node is continually being started in the background.\n"
"You can wait for a while and check whether the node starts, or increase the value of parameter \"-t\", "
"e.g -t 600.\n",
g_waitSeconds);
} else if (g_instance_start_status != CM_STATUS_NORMAL) {
write_runlog(ERROR,
"start instance failed in (%d)s!\n\n"
"HINT: Maybe the instance is continually being started in the background.\n"
"You can wait for a while and check whether the instance starts, or increase the value of parameter "
"\"-t\", e.g -t 600.\n",
g_waitSeconds);
}
g_startExitCode = CM_START_EXIT_FAILED;
(void)sleep(6);
exit(-1);
}
static void StartFailQueryAndReport()
{
char queryCommand[MAXPGPATH] = {0};
char log_file_name[MAXPGPATH] = {0};
char* name_ptr = NULL;
bool is_exist = false;
DIR *dir = opendir(sys_log_path);
if (dir == NULL) {
write_runlog(DEBUG1, "start_query_and_report: log dir open failed!\n");
return;
}
struct dirent *de;
while ((de = readdir(dir)) != NULL) {
if (strstr(de->d_name, prefix_name) != NULL) {
name_ptr = strstr(de->d_name, "-current.log");
if (name_ptr != NULL) {
name_ptr += strlen("-current.log");
if ((*name_ptr) == '\0') {
is_exist = true;
break;
}
}
}
}
if (is_exist) {
int ret = snprintf_s(log_file_name, MAXPGPATH, MAXPGPATH - 1, "%s/%s", sys_log_path, de->d_name);
securec_check_intval(ret, (void)ret);
int rc = snprintf_s(queryCommand, MAXPGPATH, MAXPGPATH - 1,
"cm_ctl query -Cvdip >> %s 2>&1 &", log_file_name);
securec_check_intval(rc, (void)rc);
ret = system(queryCommand);
if (ret != 0) {
write_runlog(DEBUG1, "check_cluster_start_status: command cm_ctl query -Cv failed, errno=%d.\n", errno);
(void)closedir(dir);
return;
}
} else {
write_runlog(DEBUG1, "check_cluster_start_status: cm_ctl log file not exist.\n");
}
(void)closedir(dir);
return;
}
static void* start_check(void* arg)
{
uint32 ii;
for (;;) {
if (g_startExitCode != CM_START_EXIT_INIT) {
write_runlog(DEBUG1, "start_check thread exit:%d\n", (int)g_startExitCode);
break;
}
if (g_command_operation_azName == NULL && g_commandOperationNodeId == 0) {
g_cluster_start_status = start_check_cluster();
} else if (g_command_operation_azName != NULL) {
g_az_start_status = start_check_az(g_command_operation_azName);
startaz_try_heartbeat--;
} else if (g_commandOperationNodeId > 0 && g_commandOperationInstanceId > 0) {
g_resStartStatus = StartResInstCheck(g_commandOperationInstanceId);
} else if (g_cmData[0] == '\0' || (g_commandOperationNodeId > 0 && g_commandOperationInstanceId == 0)) {
for (ii = 0; ii < g_node_num; ii++) {
if (g_node[ii].node == g_commandOperationNodeId) {
break;
}
}
if (ii >= g_node_num) {
write_runlog(ERROR, "can't find the nodeid: %u\n", g_commandOperationNodeId);
g_startExitCode = CM_START_EXIT_FAILED;
break;
}
g_node_start_status = start_check_node(ii);
} else if (g_commandRelationship) {
g_dn_relation_start_status = start_check_dn_relation(g_commandOperationNodeId, g_cmData);
} else {
for (ii = 0; ii < g_node_num; ii++) {
if (g_node[ii].node == g_commandOperationNodeId) {
break;
}
}
if (ii >= g_node_num) {
write_runlog(ERROR, "can't find the nodeid: %u\n", g_commandOperationNodeId);
g_startExitCode = CM_START_EXIT_FAILED;
break;
}
g_instance_start_status = start_check_instance(ii, g_cmData);
}
(void)sleep(1);
}
return NULL;
}
static void NotifyCMSClusterStarting()
{
int ret;
ctl_to_cm_notify ctlToCmNotifyContent;
ctlToCmNotifyContent.msg_type = MSG_CTL_CM_NOTIFY;
ctlToCmNotifyContent.detail = CLUSTER_STARTING;
ret = cm_client_send_msg(CmServer_conn, 'C', (char*)&ctlToCmNotifyContent, sizeof(ctlToCmNotifyContent));
if (ret != 0) {
write_runlog(WARNING, "Notify Cluster Starting failed.\n");
}
return;
}
static int start_check_cluster()
{
ctl_to_cm_query cm_ctl_cm_query_content;
char* receive_msg = NULL;
cm_msg_type* cm_msg_type_ptr = NULL;
cm_to_ctl_cluster_status* cm_to_ctl_cluster_status_ptr = NULL;
int cnt_deleted = 0;
int cnt_abnormal = 0;
int ret;
int cluster_status = CM_STATUS_UNKNOWN;
cmTime_t tv;
(void)clock_gettime(CLOCK_MONOTONIC, &tv);
if (CmServer_conn != NULL) {
CMPQfinish(CmServer_conn);
CmServer_conn = NULL;
}
do_conn_cmserver(false, 0);
if (CmServer_conn == NULL) {
write_runlog(DEBUG1, "CmServer_conn is null.\n");
return -1;
} else {
if (CMPQstatus(CmServer_conn) != CONNECTION_OK) {
CMPQfinish(CmServer_conn);
CmServer_conn = NULL;
return -1;
}
}
NotifyCMSClusterStarting();
cm_ctl_cm_query_content.msg_type = (int)MSG_CTL_CM_QUERY;
cm_ctl_cm_query_content.node = INVALID_NODE_NUM;
cm_ctl_cm_query_content.instanceId = INVALID_INSTACNE_NUM;
cm_ctl_cm_query_content.wait_seconds = DEFAULT_WAIT;
cm_ctl_cm_query_content.detail = CLUSTER_DETAIL_STATUS_QUERY;
cm_ctl_cm_query_content.relation = 0;
ret = cm_client_send_msg(CmServer_conn, 'C', (char*)&cm_ctl_cm_query_content, sizeof(cm_ctl_cm_query_content));
if (ret != 0) {
CMPQfinish(CmServer_conn);
CmServer_conn = NULL;
return -1;
}
bool rec_data_end = false;
for (;;) {
ret = cm_client_flush_msg(CmServer_conn);
if (ret == TCP_SOCKET_ERROR_EPIPE) {
CMPQfinish(CmServer_conn);
CmServer_conn = NULL;
return -1;
}
CM_BREAK_IF_TRUE(IsTimeOut(&tv, "[start_check_cluster]"));
receive_msg = recv_cm_server_cmd(CmServer_conn);
while (receive_msg != NULL) {
cm_msg_type_ptr = (cm_msg_type*)receive_msg;
switch (cm_msg_type_ptr->msg_type) {
case MSG_CM_CTL_DATA_BEGIN:
cm_to_ctl_cluster_status_ptr = (cm_to_ctl_cluster_status*)receive_msg;
cluster_status = cm_to_ctl_cluster_status_ptr->cluster_status;
if (cluster_status == CM_STATUS_NORMAL) {
rec_data_end = true;
}
break;
case MSG_CM_CTL_DATA:
if (cluster_status == CM_STATUS_DEGRADE) {
cm_to_ctl_instance_status cm_to_ctl_instance_status_ptr = {0};
cm_to_ctl_instance_status_ipv4 *cm_to_ctl_instance_status_ptr_ipv4 = NULL;
if (undocumentedVersion != 0 && undocumentedVersion < SUPPORT_IPV6_VERSION) {
cm_to_ctl_instance_status_ptr_ipv4 = (cm_to_ctl_instance_status_ipv4 *)receive_msg;
CmToCtlInstanceStatusV1ToV2(
cm_to_ctl_instance_status_ptr_ipv4,
&cm_to_ctl_instance_status_ptr);
} else {
errno_t rc = memcpy_s(
&cm_to_ctl_instance_status_ptr,
sizeof(cm_to_ctl_instance_status_ptr),
receive_msg,
sizeof(cm_to_ctl_instance_status_ptr));
securec_check_errno(rc, (void)rc);
}
if (cm_to_ctl_instance_status_ptr.instance_type == INSTANCE_TYPE_COORDINATE) {
int status = cm_to_ctl_instance_status_ptr.coordinatemember.status;
if (status == INSTANCE_ROLE_DELETED) {
cnt_deleted++;
}
if (status != INSTANCE_ROLE_NORMAL && status != INSTANCE_ROLE_DELETED) {
cnt_abnormal++;
}
} else if (cm_to_ctl_instance_status_ptr.instance_type == INSTANCE_TYPE_GTM) {
int local_role = cm_to_ctl_instance_status_ptr.gtm_member.local_status.local_role;
if (local_role != INSTANCE_ROLE_PRIMARY && local_role != INSTANCE_ROLE_STANDBY) {
cnt_abnormal++;
}
} else if (cm_to_ctl_instance_status_ptr.instance_type == INSTANCE_TYPE_DATANODE) {
g_dn_status = cm_to_ctl_instance_status_ptr.data_node_member.local_status.db_state;
int local_role = cm_to_ctl_instance_status_ptr.data_node_member.local_status.local_role;
if (local_role != INSTANCE_ROLE_PRIMARY && local_role != INSTANCE_ROLE_STANDBY &&
local_role != INSTANCE_ROLE_DUMMY_STANDBY) {
cnt_abnormal++;
}
}
}
break;
case MSG_CM_CTL_NODE_END:
break;
case MSG_CM_CTL_DATA_END:
rec_data_end = true;
break;
default:
write_runlog(ERROR, "unknown the msg type is %d.\n", cm_msg_type_ptr->msg_type);
break;
}
receive_msg = recv_cm_server_cmd(CmServer_conn);
}
if (rec_data_end) {
break;
}
}
CMPQfinish(CmServer_conn);
CmServer_conn = NULL;
if (cluster_status == CM_STATUS_DEGRADE && cnt_abnormal == 0 && cnt_deleted >= 1) {
cluster_status = CM_STATUS_NORMAL_WITH_CN_DELETED;
}
return cluster_status;
}
static bool IsAllResInstStarted(uint32 nodeId)
{
if (g_enableWalRecord && CusResCount() > ONLY_GR_RES_CLUSTER) {
return IsNodeStatOnline(nodeId);
}
for (uint32 i = 0; i < CusResCount(); ++i) {
for (uint32 j = 0; j < g_resStatus[i].status.instanceCount; ++j) {
if (g_resStatus[i].status.resStat[j].nodeId != nodeId) {
continue;
}
if (GetResInstStatus(g_resStatus[i].status.resStat[j].cmInstanceId) != CM_RES_STAT_ONLINE) {
return false;
}
}
}
return true;
}
static int start_check_node(uint32 node_id_check)
{
uint32 ii;
int cnt = 0;
int cnt_base = 0;
int cnt_deleted = 0;
int result = -1;
int ret;
errno_t tnRet = 0;
CM_Conn *pCmsCon = NULL;
char command[MAXPGPATH] = {0};
char cmBinPath[MAXPGPATH] = {0};
if (checkStaticConfigExist(node_id_check) != 0) {
write_runlog(
ERROR, "the cluster static config file does not exist on the node: %u.\n", g_node[node_id_check].node);
write_runlog(FATAL, "failed to check the node instances start status.\n");
exit(-1);
}
if (g_node[node_id_check].coordinate == 1) {
cnt_base++;
CheckCnNodeStatusById(node_id_check, &result);
if (result == PROCESS_RUNNING) {
cnt++;
} else {
ctl_to_cm_query cm_ctl_cm_query_content;
char* receive_msg = NULL;
cm_msg_type* cm_msg_type_ptr = NULL;
bool rec_data_end = false;
if (CmServer_conn != NULL) {
CMPQfinish(CmServer_conn);
CmServer_conn = NULL;
}
do_conn_cmserver(false, 0);
if (CmServer_conn == NULL) {
return CM_STATUS_UNKNOWN;
} else {
if (CMPQstatus(CmServer_conn) != CONNECTION_OK) {
CMPQfinish(CmServer_conn);
CmServer_conn = NULL;
return CM_STATUS_UNKNOWN;
}
}
cm_ctl_cm_query_content.msg_type = (int)MSG_CTL_CM_QUERY;
cm_ctl_cm_query_content.node = g_node[node_id_check].node;
cm_ctl_cm_query_content.instanceId = g_node[node_id_check].coordinateId;
cm_ctl_cm_query_content.instance_type = INSTANCE_TYPE_COORDINATE;
cm_ctl_cm_query_content.wait_seconds = DEFAULT_WAIT;
cm_ctl_cm_query_content.detail = CLUSTER_DETAIL_STATUS_QUERY;
cm_ctl_cm_query_content.relation = 0;
ret = cm_client_send_msg(
CmServer_conn, 'C', (char*)&cm_ctl_cm_query_content, sizeof(cm_ctl_cm_query_content));
if (ret != 0) {
CMPQfinish(CmServer_conn);
CmServer_conn = NULL;
return CM_STATUS_UNKNOWN;
}
struct timespec timeBegin = {0, 0};
(void)clock_gettime(CLOCK_MONOTONIC, &timeBegin);
for (;;) {
ret = cm_client_flush_msg(CmServer_conn);
if (ret == TCP_SOCKET_ERROR_EPIPE) {
CMPQfinish(CmServer_conn);
CmServer_conn = NULL;
return CM_STATUS_UNKNOWN;
}
if (IsTimeOut(&timeBegin, "[start_check_node]")) {
CMPQfinish(CmServer_conn);
CmServer_conn = NULL;
return CM_STATUS_UNKNOWN;
}
receive_msg = recv_cm_server_cmd(CmServer_conn);
while (receive_msg != NULL) {
cm_msg_type_ptr = (cm_msg_type *)receive_msg;
switch (cm_msg_type_ptr->msg_type) {
case MSG_CM_CTL_DATA_BEGIN:
break;
case MSG_CM_CTL_DATA: {
cm_to_ctl_instance_status cm_to_ctl_instance_status_ptr = {0};
cm_to_ctl_instance_status_ipv4 *cm_to_ctl_instance_status_ptr_ipv4 = NULL;
if (undocumentedVersion != 0 && undocumentedVersion < SUPPORT_IPV6_VERSION) {
cm_to_ctl_instance_status_ptr_ipv4 = (cm_to_ctl_instance_status_ipv4 *)receive_msg;
CmToCtlInstanceStatusV1ToV2(
cm_to_ctl_instance_status_ptr_ipv4,
&cm_to_ctl_instance_status_ptr);
} else {
errno_t rc = memcpy_s(
&cm_to_ctl_instance_status_ptr,
sizeof(cm_to_ctl_instance_status_ptr),
receive_msg,
sizeof(cm_to_ctl_instance_status_ptr));
securec_check_errno(rc, (void)rc);
}
if (cm_to_ctl_instance_status_ptr.instance_type == INSTANCE_TYPE_COORDINATE &&
g_node[node_id_check].node == cm_to_ctl_instance_status_ptr.node &&
cm_to_ctl_instance_status_ptr.coordinatemember.status == INSTANCE_ROLE_DELETED) {
cnt_deleted++;
}
rec_data_end = true;
break;
}
case MSG_CM_CTL_NODE_END:
break;
case MSG_CM_CTL_DATA_END:
break;
case MSG_CM_BUILD_DOING:
case MSG_CM_BUILD_DOWN:
rec_data_end = true;
break;
default:
write_runlog(ERROR, "unknown the msg type is %d.\n", cm_msg_type_ptr->msg_type);
}
receive_msg = recv_cm_server_cmd(CmServer_conn);
}
if (rec_data_end) {
break;
}
}
CMPQfinish(CmServer_conn);
CmServer_conn = NULL;
}
}
if (IsCmSharedStorageMode()) {
if (WaitCmsPrimaryNormal(&pCmsCon) != CM_SUCCESS) {
return CM_STATUS_UNKNOWN;
}
}
for (ii = 0; ii < g_node[node_id_check].datanodeCount; ii++) {
if (g_enableWalRecord && IsCmSharedStorageMode() && SetOfflineNode(node_id_check, pCmsCon)) {
continue;
}
cnt_base++;
CheckDnNodeStatusById(node_id_check, &result, ii);
if (result == PROCESS_RUNNING) {
cnt++;
}
}
ReleaseConn(pCmsCon);
char resultPath[MAXPGPATH] = {0};
char checkCmserverProcessResultPath[MAX_PATH_LEN] = {0};
int fd;
bool flag = false;
ret = GetHomePath(resultPath, sizeof(resultPath));
if (ret != EOK) {
return CM_STATUS_UNKNOWN;
}
ret = snprintf_s(checkCmserverProcessResultPath, MAX_PATH_LEN, MAX_PATH_LEN - 1,
"%s/bin/checkCmserverProcessResult-XXXXXX", resultPath);
securec_check_intval(ret, (void)ret);
fd = mkstemp(checkCmserverProcessResultPath);
if (fd <= 0) {
write_runlog(ERROR, "failed to create the cmserver process check result file: errno=%d.\n", errno);
flag = true;
}
if (g_node[node_id_check].cmServerLevel == 1) {
cnt_base++;
tnRet = memset_s(command, MAXPGPATH, 0, MAXPGPATH);
securec_check_errno(tnRet, (void)tnRet);
tnRet = memset_s(cmBinPath, MAXPGPATH, 0, MAXPGPATH);
securec_check_errno(tnRet, (void)tnRet);
ret = snprintf_s(cmBinPath, MAXPGPATH, MAXPGPATH - 1, "%s/bin/%s", g_appPath, CM_SERVER_BIN_NAME);
securec_check_intval(ret, (void)ret);
if (node_id_check == g_nodeId && strstr(g_appPath, "/var/chroot") == NULL) {
ret = snprintf_s(command, MAXPGPATH, MAXPGPATH - 1, "cm_ctl check -B %s -T %s \n echo -e $? > %s",
CM_SERVER_BIN_NAME, cmBinPath, flag ? result_path : checkCmserverProcessResultPath);
securec_check_intval(ret, (void)ret);
exec_system(command, &result, flag ? result_path : checkCmserverProcessResultPath);
} else {
ret = snprintf_s(command, MAXPGPATH, MAXPGPATH - 1,
"cm_ctl check -B %s -T %s\" > /dev/null 2>&1; echo -e $? > %s",
CM_SERVER_BIN_NAME, cmBinPath, flag ? result_path : checkCmserverProcessResultPath);
securec_check_intval(ret, (void)ret);
exec_system_ssh(node_id_check, command, &result,
flag ? result_path : checkCmserverProcessResultPath, mpp_env_separate_file);
}
if (result == PROCESS_RUNNING) {
cnt++;
}
}
if (fd > 0) {
(void)close(fd);
}
(void)unlink(flag ? result_path : checkCmserverProcessResultPath);
if (g_node[node_id_check].gtm == 1) {
cnt_base++;
CheckGtmNodeStatusById(node_id_check, &result);
if (result == PROCESS_RUNNING) {
cnt++;
}
}
if (!g_enableWalRecord) {
++cnt_base;
}
if (IsAllResInstStarted(g_node[node_id_check].node)) {
++cnt;
}
if (cnt < cnt_base) {
if (cnt_base == cnt + cnt_deleted && cnt_deleted >= 1) {
return CM_STATUS_NORMAL_WITH_CN_DELETED;
} else {
return CM_STATUS_UNKNOWN;
}
}
return CM_STATUS_NORMAL;
}
static int start_check_dn_relation(uint32 node, const char *dataPath)
{
int ret;
uint32 instanceId;
int dbState;
cm_to_ctl_get_datanode_relation_ack getInstanceMsg = {0};
ret = GetDatanodeRelationInfo(node, dataPath, &getInstanceMsg);
if (ret == -1) {
write_runlog(ERROR, "can not get datanode information.\n");
exit(1);
}
for (int i = 0; i < CM_PRIMARY_STANDBY_MAX_NUM; i++) {
instanceId = getInstanceMsg.instanceMember[i].instanceId;
if (instanceId != 0) {
dbState = getInstanceMsg.data_node_member[i].local_status.db_state;
if (dbState != INSTANCE_HA_STATE_NORMAL) {
return CM_STATUS_UNKNOWN;
}
}
}
return CM_STATUS_NORMAL;
}
* @param allAzLists str of all az names like "AZ1,AZ2,AZ3". NULL means it's in minority
*
* @param repNum rep number in pointe az, 1 means we should change 'most_available_sync'
*
* @param nodeIndex which node.
*
*/
static void ExecuteGsGuc(const char *allAzLists, bool isSingleRep, uint32 repNum, uint32 nodeIndex)
{
char command[MAXPGPATH];
int ret;
const char* switchFlag = (allAzLists != NULL) ? "off" : "on";
const char* az = (allAzLists != NULL) ? allAzLists : g_node[nodeIndex].azName;
const uint32 half = 2;
for (uint32 kk = 0; kk < g_node[nodeIndex].datanodeCount; kk++) {
if (g_currentNode->node == g_node[nodeIndex].node) {
if (isSingleRep) {
ret = snprintf_s(command,
MAXPGPATH, MAXPGPATH - 1,
"gs_guc reload -Z datanode -D %s -c \"most_available_sync = '%s'\" > %s 2>&1 &",
g_node[nodeIndex].datanode[kk].datanodeLocalDataPath, switchFlag, DEVNULL);
} else {
ret = snprintf_s(command,
MAXPGPATH, MAXPGPATH - 1,
"gs_guc reload -Z datanode -D %s -c \"synchronous_standby_names = 'ANY %u(%s)'\" > %s 2>&1 &",
g_node[nodeIndex].datanode[kk].datanodeLocalDataPath,
repNum / half, az, DEVNULL);
}
} else {
if (isSingleRep) {
ret = snprintf_s(command,
MAXPGPATH, MAXPGPATH - 1,
"gs_guc reload -Z datanode -D %s -c \\\"most_available_sync = '%s'\\\" > %s 2>&1 &",
g_node[nodeIndex].datanode[kk].datanodeLocalDataPath, switchFlag, DEVNULL);
} else {
ret = snprintf_s(command,
MAXPGPATH, MAXPGPATH - 1,
"gs_guc reload -Z datanode -D %s -c \\\"synchronous_standby_names = 'ANY %u(%s)'\\\" > %s 2>&1 &",
g_node[nodeIndex].datanode[kk].datanodeLocalDataPath,
repNum / half, az, DEVNULL);
}
}
securec_check_intval(ret, (void)ret);
write_runlog(DEBUG1, "The node %s begins to execute the command: %s.\n", g_node[nodeIndex].nodeName, command);
if (g_currentNode->node == g_node[nodeIndex].node) {
ret = system(command);
} else {
ret = ssh_exec(&g_node[nodeIndex], command);
}
if (ret == 0) {
write_runlog(DEBUG1, "Successful exexution of the above command.\n");
} else {
write_runlog(DEBUG1, "Failed exexution of the above command, ignore it, errno=%d.\n", errno);
}
}
}
void RunCmdInStartAz(const char* command, uint32 nodeIndex)
{
int ret;
write_runlog(DEBUG1, "The node %s begins to execute the command: %s.\n", g_node[nodeIndex].nodeName, command);
if (g_currentNode->node == g_node[nodeIndex].node) {
ret = system(command);
} else {
ret = ssh_exec(&g_node[nodeIndex], command);
}
if (ret == 0) {
write_runlog(DEBUG1, "Successful exexution of the above command.\n");
} else {
write_runlog(DEBUG1, "Failed exexution of the above command, ignore it, errno=%d.\n", errno);
}
}
static uint32 GetEtcdNumOfAz(const char* azName)
{
uint32 etcdNumOfAz = 0;
for (uint32 ii = 0; ii < g_node_num; ii++) {
if (strcmp(g_node[ii].azName, azName) == 0) {
if (g_node[ii].etcd) {
etcdNumOfAz++;
}
}
}
write_runlog(DEBUG1, "The etcd num = %u of %s.\n", etcdNumOfAz, azName);
return etcdNumOfAz;
}
static uint32 GetAliveEtcdNumOfAz(const char* azName)
{
uint32 aliveEtcdNumOfAz = 0;
for (uint32 ii = 0; ii < g_node_num; ii++) {
if (strcmp(g_node[ii].azName, azName) == 0) {
if (g_node[ii].etcd) {
if (start_check_instance(ii, "etcd") == CM_STATUS_NORMAL) {
aliveEtcdNumOfAz++;
}
}
}
}
write_runlog(DEBUG1, "The alive etcd num = %u of %s.\n", aliveEtcdNumOfAz, azName);
return aliveEtcdNumOfAz;
}
static void start_az_try_more_one(const char* azName)
{
for (uint32 ii = 0; ii < g_node_num; ii++) {
if (strcmp(g_node[ii].azName, azName) == 0) {
start_node(g_node[ii].node);
}
}
}
static int start_check_az(const char* azName)
{
int ret = CM_STATUS_UNKNOWN;
for (uint32 ii = 0; ii < g_node_num; ii++) {
if (strcmp(g_node[ii].azName, azName) == 0) {
int node_status;
node_status = start_check_node(ii);
if (node_status == CM_STATUS_UNKNOWN) {
ret = CM_STATUS_UNKNOWN;
break;
} else if (node_status == CM_STATUS_NORMAL_WITH_CN_DELETED) {
ret = CM_STATUS_NORMAL_WITH_CN_DELETED;
} else if (node_status == CM_STATUS_NORMAL && ret != CM_STATUS_NORMAL_WITH_CN_DELETED) {
ret = CM_STATUS_NORMAL;
}
}
}
if (startaz_try_heartbeat <= 0) {
start_az_try_more_one(g_command_operation_azName);
startaz_try_heartbeat = START_AZ_TRY_HEARTBEAT;
}
return ret;
}
void getPauseStatus()
{
struct stat statBuf = { 0 };
if (stat(manual_pause_file, &statBuf) == 0) {
g_isPauseArbitration = true;
} else {
g_isPauseArbitration = false;
}
}
void getWalrecordMode()
{
struct stat statBuf = { 0 };
if (stat(manual_walrecord_file, &statBuf) == 0) {
g_enableWalRecord = true;
} else {
g_enableWalRecord = false;
}
}