* Copyright (c) 2021 Huawei Technologies Co.,Ltd.
*
* CM is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* cma_main.cpp
* cma main file
*
* IDENTIFICATION
* src/cm_agent/cma_main.cpp
*
* -------------------------------------------------------------------------
*/
#include <sys/wait.h>
#include <sys/procfs.h>
#include <sys/file.h>
#ifdef __aarch64__
#include <sys/sysinfo.h>
#endif
#include "cm_cipher.h"
#include "alarm/alarm_log.h"
#include "cm/pqsignal.h"
#include "cm_json_config.h"
#include "cm_ip.h"
#include "cma_global_params.h"
#include "cma_common.h"
#include "cma_threads.h"
#include "cma_client.h"
#include "cma_datanode_scaling.h"
#include "cma_disk_check.h"
#include "cma_log_management.h"
#include "cma_instance_management.h"
#include "cma_instance_management_res.h"
#include "config.h"
#include "cma_process_messages.h"
#include "cm_util.h"
#include "cma_connect.h"
#include "cma_status_check.h"
#include "cma_mes.h"
#ifdef ENABLE_MULTIPLE_NODES
#include "cma_gtm.h"
#include "cma_coordinator.h"
#include "cma_cn_gtm_work_threads_mgr.h"
#include "cma_instance_check.h"
#endif
#ifndef ENABLE_MULTIPLE_NODES
const char *g_libnetManualStart = "libnet_manual_start";
#endif
#define ACTIVITY_TIMEOUT 120
#define STACK_CAPTURE_TIMEOUT 120
cm_instance_central_node_msg g_ccnNotify;
char g_agentDataDir[CM_PATH_LENGTH] = {0};
static volatile sig_atomic_t g_gotParameterReload = 0;
int g_tcpKeepalivesIdle = 30;
int g_tcpKeepalivesInterval = 30;
int g_tcpKeepalivesCount = 3;
static const int diskUsageDefaultThreshold = 90;
bool g_poolerPingEnd = false;
bool *g_coordinatorsDrop;
datanode_failover *g_datanodesFailover = NULL;
gtm_failover *g_gtmsFailover = NULL;
uint32 *g_droppedCoordinatorId = NULL;
coordinator_status *g_cnStatus = NULL;
uint32 g_cancelCoordinatorId = 0;
bool g_coordinatorsCancel;
pthread_rwlock_t g_datanodesFailoverLock;
pthread_rwlock_t g_gtmsFailoverLock;
pthread_rwlock_t g_cnDropLock;
pthread_rwlock_t g_coordinatorsCancelLock;
pthread_t g_cmsConnThread = 0;
ThreadActivity *threadActivities;
int activities_index;
pthread_rwlock_t activitiesMutex;
time_t lastStackCaptureTime = 0;
bool g_poolerPingEndRequest = false;
int g_gtmConnFailTimes = 0;
int g_cnConnFailTimes = 0;
int g_dnConnFailTimes[CM_MAX_DATANODE_PER_NODE] = {0};
char *g_eventTriggers[EVENT_COUNT] = {NULL};
static const uint32 MAX_MSG_BUF_POOL_SIZE = 102400;
static const uint32 MAX_MSG_BUF_POOL_COUNT = 200;
static const int32 INVALID_ID = -1;
void create_system_call_log(void);
int check_one_instance_status(const char *processName, const char *cmdLine, int *isPhonyDead);
int get_agent_global_params_from_configfile();
void stop_flag(void)
{
g_exitFlag = true;
cm_sleep(6);
}
void report_conn_fail_alarm(AlarmType alarmType, InstanceTypes instance_type, uint32 instanceId)
{
int rc = 0;
uint32 alarmIndex = 0;
char instanceName[CM_NODE_NAME] = {0};
if (instance_type == INSTANCE_CN) {
if (alarmType == ALM_AT_Fault) {
g_cnConnFailTimes++;
if (g_cnConnFailTimes < CONN_FAIL_TIMES) {
return;
}
} else {
g_cnConnFailTimes = 0;
}
rc = check_one_instance_status(type_int_to_str_binname(instance_type), g_currentNode->DataPath, NULL);
if (rc != PROCESS_RUNNING) {
return;
}
alarmIndex = g_currentNode->datanodeCount;
rc = snprintf_s(instanceName, sizeof(instanceName), sizeof(instanceName) - 1, "cn_%u", instanceId);
} else if (instance_type == INSTANCE_DN) {
for (uint32 ii = 0; ii < g_currentNode->datanodeCount; ii++) {
if (g_currentNode->datanode[ii].datanodeId == instanceId) {
if (alarmType == ALM_AT_Fault) {
g_dnConnFailTimes[ii]++;
if (g_dnConnFailTimes[ii] < CONN_FAIL_TIMES) {
return;
}
} else {
g_dnConnFailTimes[ii] = 0;
}
rc = check_one_instance_status(
type_int_to_str_binname(instance_type), g_currentNode->datanode[ii].datanodeLocalDataPath, NULL);
alarmIndex = ii;
break;
}
}
if (rc != PROCESS_RUNNING) {
return;
}
rc = snprintf_s(instanceName, sizeof(instanceName), sizeof(instanceName) - 1, "dn_%u", instanceId);
} else if (instance_type == INSTANCE_GTM) {
if (alarmType == ALM_AT_Fault) {
g_gtmConnFailTimes++;
if (g_gtmConnFailTimes < CONN_FAIL_TIMES) {
return;
}
} else {
g_gtmConnFailTimes = 0;
}
rc = check_one_instance_status(type_int_to_str_binname(instance_type), g_currentNode->gtmLocalDataPath, NULL);
if (rc != PROCESS_RUNNING) {
return;
}
rc = snprintf_s(instanceName, sizeof(instanceName), sizeof(instanceName) - 1, "gtm_%u", instanceId);
alarmIndex = g_currentNode->datanodeCount + g_currentNode->coordinate;
} else {
}
securec_check_intval(rc, (void)rc);
AlarmAdditionalParam tempAdditionalParam;
WriteAlarmAdditionalInfo(&tempAdditionalParam, instanceName, "", "", "", &(g_abnormalCmaConnAlarmList[alarmIndex]),
alarmType, instanceName);
AlarmReporter(&(g_abnormalCmaConnAlarmList[alarmIndex]), alarmType, &tempAdditionalParam);
}
uint32 GetThreadDeadEffectiveTime(size_t threadIdx)
{
const int specialEffectiveTime = 10;
if (g_threadName[threadIdx] != NULL && strcmp(g_threadName[threadIdx], "SendCmsMsg") == 0) {
return specialEffectiveTime;
}
return g_threadDeadEffectiveTime;
}
void check_thread_state()
{
size_t length = sizeof(g_threadId) / sizeof(g_threadId[0]);
struct timespec now = {0};
(void)clock_gettime(CLOCK_MONOTONIC, &now);
for (size_t i = 0; i < length; i++) {
if (g_threadId[i] == 0) {
continue;
}
uint32 threadDeadEffectiveTime = GetThreadDeadEffectiveTime(i);
if ((now.tv_sec - g_thread_state[i] < 0) || (now.tv_sec - g_thread_state[i] > 4 * threadDeadEffectiveTime)) {
g_thread_state[i] = now.tv_sec;
continue;
}
if ((now.tv_sec - g_thread_state[i] > threadDeadEffectiveTime) && g_thread_state[i] != 0) {
write_runlog(FATAL, "the thread(%lu) is not execing for a long time(%ld).\n",
g_threadId[i], now.tv_sec - g_thread_state[i]);
exit(-1);
}
}
}
void reload_cmagent_parameters(int arg)
{
g_gotParameterReload = 1;
}
void RecvSigusrSingle(int arg)
{
return;
}
#ifdef ENABLE_MULTIPLE_NODES
void SetFlagToUpdatePortForCnDn(int arg)
{
cm_agent_need_check_libcomm_port = true;
}
#endif
void GetCmdlineOpt(int argc, char *argv[])
{
long logChoice = 0;
const int base = 10;
if (argc > 1) {
logChoice = strtol(argv[1], NULL, base);
switch (logChoice) {
case LOG_DESTION_STDERR:
log_destion_choice = LOG_DESTION_FILE;
break;
case LOG_DESTION_SYSLOG:
log_destion_choice = LOG_DESTION_SYSLOG;
break;
case LOG_DESTION_FILE:
log_destion_choice = LOG_DESTION_FILE;
break;
case LOG_DESTION_DEV_NULL:
log_destion_choice = LOG_DESTION_DEV_NULL;
break;
default:
log_destion_choice = LOG_DESTION_FILE;
break;
}
}
}
void create_system_call_log(void)
{
DIR *dir;
struct dirent *de;
bool is_exist = false;
char *name_ptr = NULL;
errno_t rc;
int rcs;
if ((dir = opendir(sys_log_path)) == NULL) {
write_runlog(ERROR, "%s: opendir %s failed! \n", prefix_name, sys_log_path);
rcs = snprintf_s(system_call_log, MAXPGPATH, MAXPGPATH - 1, "%s", "/dev/null");
securec_check_intval(rcs, (void)rcs);
return;
}
while ((de = readdir(dir)) != NULL) {
if (strstr(de->d_name, SYSTEM_CALL_LOG) == NULL) {
continue;
}
name_ptr = strstr(de->d_name, "-current.log");
if (name_ptr == NULL) {
continue;
}
name_ptr += strlen("-current.log");
if ((*name_ptr) == '\0') {
is_exist = true;
break;
}
}
rc = memset_s(g_systemCallLogName, MAXPGPATH, 0, MAXPGPATH);
securec_check_errno(rc, (void)rc);
rc = memset_s(system_call_log, MAXPGPATH, 0, MAXPGPATH);
securec_check_errno(rc, (void)rc);
rcs = snprintf_s(g_systemCallLogName, MAXPGPATH, MAXPGPATH - 1, "%s%s", SYSTEM_CALL_LOG, curLogFileMark);
securec_check_intval(rcs, (void)rcs);
rcs = snprintf_s(system_call_log, MAXPGPATH, MAXPGPATH - 1, "%s/%s", sys_log_path, g_systemCallLogName);
securec_check_intval(rcs, (void)rcs);
if (is_exist && strstr(de->d_name, "system_call-current") == NULL) {
char oldSystemCallLog[MAXPGPATH] = {0};
rcs = snprintf_s(oldSystemCallLog, MAXPGPATH, MAXPGPATH - 1, "%s/%s", sys_log_path, de->d_name);
securec_check_intval(rcs, (void)rcs);
rcs = rename(oldSystemCallLog, system_call_log);
if (rcs != 0) {
write_runlog(ERROR, "%s: rename log file %s failed! \n", prefix_name, oldSystemCallLog);
}
}
(void)closedir(dir);
(void)chmod(system_call_log, S_IRUSR | S_IWUSR);
}
status_t CreateSysLogFile(void)
{
if (syslogFile != NULL) {
(void)fclose(syslogFile);
syslogFile = NULL;
}
syslogFile = logfile_open(sys_log_path, "a");
if (syslogFile == NULL) {
(void)fprintf(stderr, "cma_main, open log file failed\n");
}
int fd = open("/dev/null", O_RDWR);
if (fd < 0) {
(void)fprintf(stderr, "FATAL cma_main, open /dev/null failed, cma will exit.\n");
return CM_ERROR;
}
(void)dup2(fd, STDOUT_FILENO);
(void)dup2(fd, STDERR_FILENO);
if (fd > STDERR_FILENO) {
(void)close(fd);
}
return CM_SUCCESS;
}
static void InitClientCrt(const char *appPath)
{
errno_t rcs =
snprintf_s(g_tlsPath.caFile, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/share/sslcert/etcd/etcdca.crt", appPath);
securec_check_intval(rcs, (void)rcs);
rcs = snprintf_s(
g_tlsPath.crtFile, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/share/sslcert/etcd/client.crt", appPath);
securec_check_intval(rcs, (void)rcs);
rcs = snprintf_s(
g_tlsPath.keyFile, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/share/sslcert/etcd/client.key", appPath);
securec_check_intval(rcs, (void)rcs);
}
int get_prog_path()
{
char exec_path[MAX_PATH_LEN] = {0};
errno_t rc;
int rcs;
rc = memset_s(g_cmAgentLogPath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
rc = memset_s(g_cmStaticConfigurePath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
rc = memset_s(g_cmManualStartPath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
rc = memset_s(g_cmInstanceManualStartPath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
rc = memset_s(g_cmEtcdManualStartPath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
rc = memset_s(g_cmResumingCnStopPath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
rc = memset_s(g_tlsPath.caFile, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
rc = memset_s(g_tlsPath.crtFile, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
rc = memset_s(g_tlsPath.keyFile, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
rc = memset_s(g_cmClusterResizePath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
rc = memset_s(g_cmClusterReplacePath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
rc = memset_s(instance_maintance_path, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
#ifndef ENABLE_MULTIPLE_NODES
rc = memset_s(g_cmLibnetManualStartPath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
#endif
rc = memset_s(g_autoRepairPath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
rc = memset_s(g_cmManualPausePath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
rc = memset_s(g_cmManualStartingPath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
rc = memset_s(g_cmManualWalRecordPath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
if (GetHomePath(exec_path, sizeof(exec_path)) != 0) {
(void)fprintf(stderr, "Get GAUSSHOME failed, please check.\n");
return -1;
} else {
check_input_for_security(exec_path);
rcs = snprintf_s(
g_logicClusterListPath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/%s", exec_path, LOGIC_CLUSTER_LIST);
securec_check_intval(rcs, (void)rcs);
rcs = snprintf_s(result_path, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/%s", exec_path, STOP_PRIMARY_RESULT);
securec_check_intval(rcs, (void)rcs);
canonicalize_path(result_path);
rcs = snprintf_s(
g_cmStaticConfigurePath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/%s", exec_path, CM_STATIC_CONFIG_FILE);
securec_check_intval(rcs, (void)rcs);
rcs = snprintf_s(
g_cmManualStartPath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/%s", exec_path, CM_CLUSTER_MANUAL_START);
securec_check_intval(rcs, (void)rcs);
rcs = snprintf_s(
g_cmResumingCnStopPath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/%s", exec_path, CM_RESUMING_CN_STOP);
securec_check_intval(rcs, (void)rcs);
rcs = snprintf_s(g_cmInstanceManualStartPath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/%s",
exec_path, CM_INSTANCE_MANUAL_START);
securec_check_intval(rcs, (void)rcs);
rcs = snprintf_s(
g_cmEtcdManualStartPath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/%s", exec_path, CM_ETCD_MANUAL_START);
securec_check_intval(rcs, (void)rcs);
rcs = snprintf_s(g_binPath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin", exec_path);
securec_check_intval(rcs, (void)rcs);
rcs = snprintf_s(
g_cmClusterResizePath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/%s", exec_path, CM_CLUSTER_RESIZE);
securec_check_intval(rcs, (void)rcs);
rcs = snprintf_s(
g_cmClusterReplacePath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/%s", exec_path, CM_CLUSTER_REPLACE);
securec_check_intval(rcs, (void)rcs);
rc = snprintf_s(g_cmagentLockfile, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/cm_agent.lock", exec_path);
securec_check_intval(rc, (void)rc);
canonicalize_path(g_cmagentLockfile);
rcs = snprintf_s(
instance_maintance_path, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/%s", exec_path, INSTANCE_MAINTANCE);
securec_check_intval(rcs, (void)rcs);
canonicalize_path(instance_maintance_path);
#ifndef ENABLE_MULTIPLE_NODES
rcs = snprintf_s(
g_cmLibnetManualStartPath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/%s", exec_path, g_libnetManualStart);
securec_check_intval(rcs, (void)rcs);
#endif
rcs = snprintf_s(g_autoRepairPath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/stop_auto_repair", exec_path);
securec_check_intval(rcs, (void)rcs);
rcs = snprintf_s(
g_cmManualPausePath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/%s", exec_path, CM_CLUSTER_MANUAL_PAUSE);
securec_check_intval(rcs, (void)rcs);
rcs = snprintf_s(
g_cmManualStartingPath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/%s", exec_path, CM_CLUSTER_MANUAL_STARTING);
securec_check_intval(rcs, (void)rcs);
rcs = snprintf_s(
g_cmManualWalRecordPath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/%s", exec_path, CM_CLUSTER_MANUAL_WALRECORD);
securec_check_intval(rcs, (void)rcs);
InitClientCrt(exec_path);
}
return 0;
}
static void SetCmsIndexStr(char *cmServerIdxStr, uint32 strLen, uint32 cmServerIdx, uint32 nodeIdx)
{
char cmServerStr[MAX_PATH_LEN];
errno_t rc = snprintf_s(cmServerStr, MAX_PATH_LEN, MAX_PATH_LEN - 1,
"[%u node:%u, cmserverId:%u, cmServerIndex:%u], ",
cmServerIdx, g_node[nodeIdx].node, g_node[nodeIdx].cmServerId, nodeIdx);
securec_check_intval(rc, (void)rc);
rc = strcat_s(cmServerIdxStr, strLen, cmServerStr);
securec_check_errno(rc, (void)rc);
}
static void initialize_cm_server_node_index(void)
{
uint32 i = 0;
uint32 j = 0;
char cmServerIdxStr[MAX_PATH_LEN] = {0};
uint32 cm_instance_id[CM_PRIMARY_STANDBY_NUM] = {0};
uint32 cmServerNum = 0;
for (i = 0; i < g_node_num; i++) {
if (g_node[i].cmServerLevel == 1) {
cm_instance_id[j] = g_node[i].cmServerId;
j++;
cmServerNum++;
}
}
#undef qsort
qsort(cm_instance_id, cmServerNum, sizeof(uint32), node_index_Comparator);
j = 0;
for (uint32 k = 0; k < cmServerNum; k++) {
for (i = 0; i < g_node_num; i++) {
if (cm_instance_id[k] != g_node[i].cmServerId) {
continue;
}
g_nodeIndexForCmServer[j] = i;
SetCmsIndexStr(cmServerIdxStr, MAX_PATH_LEN, j, i);
j++;
break;
}
}
(void)fprintf(stderr, "[%s]: cmserverNum is %u, and cmserver info is %s.\n",
g_progname, cmServerNum, cmServerIdxStr);
}
int countCnAndDn()
{
uint32 j = 0;
uint32 cn = 0;
uint32 dnPairs = 0;
for (uint32 i = 0; i < g_node_num; i++) {
if (g_node[i].coordinate == 1) {
cn++;
}
if (g_multi_az_cluster) {
dnPairs = dnPairs + g_node[i].datanodeCount;
} else {
for (j = 0; j < g_node[i].datanodeCount; j++) {
if (g_node[i].datanode[j].datanodeRole == PRIMARY_DN) {
dnPairs++;
}
}
}
}
return (int)(cn + dnPairs);
}
int read_config_file_check(void)
{
int status;
int err_no = 0;
int rc;
if (!g_cmAgentFirstStart && (g_node != NULL)) {
return 0;
}
status = read_config_file(g_cmStaticConfigurePath, &err_no);
if (status == 0) {
if (g_nodeHeader.node == 0) {
(void)fprintf(stderr, "current node self is invalid node =%u\n", g_nodeHeader.node);
return -1;
}
g_cmAgentFirstStart = false;
rc = find_node_index_by_nodeid(g_nodeHeader.node, &g_nodeId);
if (rc != 0) {
(void)fprintf(stderr, "find_node_index_by_nodeid failed, nodeId=%u.\n", g_nodeHeader.node);
return -1;
}
rc = find_current_node_by_nodeid();
if (rc != 0) {
(void)fprintf(stderr, "find_current_node_by_nodeid failed, nodeId=%u.\n", g_nodeHeader.node);
return -1;
}
g_cmStaticConfigNeedVerifyToCn = true;
g_cnDnPairsCount = countCnAndDn();
initialize_cm_server_node_index();
int family = GetIpVersion(g_currentNode->sshChannel[0]);
if (family != AF_INET && family != AF_INET6) {
(void)fprintf(stderr, "ip(%s) is invalid, nodeId=%u.\n", g_currentNode->sshChannel[0], g_nodeHeader.node);
return -1;
}
rc = snprintf_s(g_cmAgentLogPath,
MAX_PATH_LEN,
MAX_PATH_LEN - 1,
"%s/%s/%s",
g_currentNode->cmDataPath,
CM_AGENT_DATA_DIR,
CM_AGENT_LOG_FILE);
securec_check_intval(rc, (void)rc);
g_datanodesFailover = (datanode_failover *)malloc(sizeof(datanode_failover) * g_node_num);
if (g_datanodesFailover == NULL) {
(void)fprintf(stderr, "g_datanodesFailover: out of memory\n");
return -1;
}
rc = memset_s(
g_datanodesFailover, sizeof(datanode_failover) * g_node_num, 0, sizeof(datanode_failover) * g_node_num);
securec_check_errno(rc, FREE_AND_RESET(g_datanodesFailover));
g_gtmsFailover = (gtm_failover *)malloc(sizeof(gtm_failover) * g_node_num);
if (g_gtmsFailover == NULL) {
(void)fprintf(stderr, "g_gtmsFailover: out of memory\n");
return -1;
}
rc = memset_s(g_gtmsFailover, sizeof(gtm_failover) * g_node_num, 0, sizeof(gtm_failover) * g_node_num);
securec_check_errno(rc, FREE_AND_RESET(g_gtmsFailover));
g_coordinatorsDrop = (bool *)malloc(sizeof(bool) * g_node_num);
if (g_coordinatorsDrop == NULL) {
(void)fprintf(stderr, "g_coordinatorsDrop: out of memory\n");
return -1;
}
rc = memset_s(g_coordinatorsDrop, sizeof(bool) * g_node_num, 0, sizeof(bool) * g_node_num);
securec_check_errno(rc, FREE_AND_RESET(g_coordinatorsDrop));
g_droppedCoordinatorId = (uint32 *)malloc(sizeof(uint32) * g_node_num);
if (g_droppedCoordinatorId == NULL) {
(void)fprintf(stderr, "g_droppedCoordinatorId: out of memory\n");
return -1;
}
rc = memset_s(g_droppedCoordinatorId, sizeof(uint32) * g_node_num, 0, sizeof(uint32) * g_node_num);
securec_check_errno(rc, FREE_AND_RESET(g_droppedCoordinatorId));
g_cnStatus = (coordinator_status *)malloc(sizeof(coordinator_status) * g_node_num);
if (g_cnStatus == NULL) {
(void)fprintf(stderr, "g_droppedCoordinatorId: out of memory\n");
return -1;
}
rc = memset_s(g_cnStatus, sizeof(coordinator_status) * g_node_num, 0, sizeof(coordinator_status) * g_node_num);
securec_check_errno(rc, FREE_AND_RESET(g_cnStatus));
(void)pthread_rwlock_init(&g_datanodesFailoverLock, NULL);
(void)pthread_rwlock_init(&g_cnDropLock, NULL);
(void)pthread_rwlock_init(&g_coordinatorsCancelLock, NULL);
(void)pthread_rwlock_init(&g_gtmsFailoverLock, NULL);
} else if (status == OUT_OF_MEMORY) {
(void)fprintf(stderr, "read staticNodeConfig failed! out of memory\n");
return -1;
} else {
(void)fprintf(stderr, "read staticNodeConfig failed! errno = %d\n", err_no);
return -1;
}
if (access(g_logicClusterListPath, F_OK) == 0) {
status = read_logic_cluster_config_files(g_logicClusterListPath, &err_no);
char errBuffer[ERROR_LIMIT_LEN] = {0};
switch (status) {
case OPEN_FILE_ERROR: {
write_runlog(FATAL,
"%s: could not open the logic cluster static config file: %s\n",
g_progname,
strerror_r(err_no, errBuffer, ERROR_LIMIT_LEN));
exit(1);
}
case READ_FILE_ERROR: {
char errBuff[ERROR_LIMIT_LEN];
write_runlog(FATAL,
"%s: could not read logic cluster static config files: %s\n",
g_progname,
strerror_r(err_no, errBuff, ERROR_LIMIT_LEN));
exit(1);
}
case OUT_OF_MEMORY:
write_runlog(FATAL, "%s: out of memory\n", g_progname);
exit(1);
default:
break;
}
}
return 0;
}
int node_match_find(const char *node_type, const char *node_port, const char *node_host, const char *node_port1,
const char *node_host1, int *node_index, int *instance_index, int *inode_type)
{
uint32 i;
uint32 j = 0;
*node_index = 0;
*instance_index = 0;
if (*node_type == 'C') {
for (i = 0; i < g_node_num; i++) {
if (g_node[i].coordinate == 1) {
if ((g_node[i].coordinatePort == (uint32)strtol(node_port, NULL, 10)) &&
(strncmp(g_node[i].coordinateListenIP[0], node_host, CM_IP_ALL_NUM_LENGTH) == 0)) {
*inode_type = CM_COORDINATENODE;
*node_index = (int)i;
*instance_index = (int)j;
return 0;
}
}
}
} else if (*node_type == 'D' || *node_type == 'S') {
for (i = 0; i < g_node_num; i++) {
for (j = 0; j < g_node[i].datanodeCount; j++) {
if ((g_node[i].datanode[j].datanodePort == (uint32)strtol(node_port, NULL, 10)) &&
(strncmp(g_node[i].datanode[j].datanodeListenIP[0], node_host, CM_IP_ALL_NUM_LENGTH) == 0)) {
*inode_type = CM_DATANODE;
*node_index = (int)i;
*instance_index = (int)j;
return 0;
}
}
}
} else {
write_runlog(ERROR, "node_type is invalid node_type =%s, node1 is %s:%s, node_host1\n",
node_type, node_host1, node_port1);
}
return -1;
}
static bool ModifyDatanodePort(const char *Keywords, uint32 value, const char *file_path)
{
char modify_cmd[MAXPGPATH * 2];
char fsync_cmd[MAXPGPATH * 2];
char check_cmd[MAXPGPATH * 2];
char result[NAMEDATALEN];
int ret;
int retry = 0;
struct timeval timeOut = {0};
timeOut.tv_sec = 10;
timeOut.tv_usec = 0;
ret = snprintf_s(modify_cmd,
sizeof(modify_cmd),
MAXPGPATH * 2 - 1,
"sed -i \"/^#%s =/c\\%s = %u\" %s/postgresql.conf",
Keywords,
Keywords,
value,
file_path);
securec_check_intval(ret, (void)ret);
ret = snprintf_s(fsync_cmd, sizeof(fsync_cmd), MAXPGPATH * 2 - 1, "fsync %s/postgresql.conf", file_path);
securec_check_intval(ret, (void)ret);
ret = snprintf_s(check_cmd,
sizeof(check_cmd),
MAXPGPATH * 2 - 1,
"grep \"^%s = %u\" %s/postgresql.conf|wc -l",
Keywords,
value,
file_path);
securec_check_intval(ret, (void)ret);
while (retry < MAX_RETRY_TIME) {
ret = ExecuteCmd(modify_cmd, timeOut);
write_runlog(LOG, "update %s/postgresql.conf command:%s\n", file_path, modify_cmd);
if (ret != 0) {
write_runlog(WARNING, "update %s/postgresql.conf failed!%d! %s \n", file_path, ret, modify_cmd);
retry++;
continue;
}
ret = ExecuteCmd(fsync_cmd, timeOut);
write_runlog(LOG, "fsync %s/postgresql.conf command:%s\n", file_path, fsync_cmd);
if (ret != 0) {
write_runlog(WARNING, "fsync %s/postgresql.conf failed!%d! %s \n", file_path, ret, fsync_cmd);
retry++;
continue;
}
if (!ExecuteCmdWithResult(check_cmd, result, NAMEDATALEN)) {
write_runlog(ERROR, "check %s failed, command:%s, errno[%d].\n", Keywords, check_cmd, errno);
retry++;
continue;
}
write_runlog(LOG, "check %s, command:%s\n", Keywords, check_cmd);
if (strtol(result, NULL, 10) != 1) {
write_runlog(WARNING, "update %s failed, retry it:%s\n", Keywords, modify_cmd);
retry++;
continue;
}
write_runlog(LOG, "update %s succeed:%s\n", Keywords, modify_cmd);
return true;
}
write_runlog(ERROR, "update %s failed final:%s\n", Keywords, modify_cmd);
return false;
}
static bool UpdateLibcommPort(const char *file_path, const char *port_name, uint32 port)
{
int ret;
char cmd_buf[MAXPGPATH * 2];
char result[NAMEDATALEN] = {0};
long need_update;
ret = snprintf_s(cmd_buf, sizeof(cmd_buf), MAXPGPATH * 2 - 1, "%s/postgresql.conf", file_path);
securec_check_intval(ret, (void)ret);
* if file not found, that means dn/cn was moved/lost,
* we must return true, otherwise, cm_agent will try to visit the file again and again.
*/
if (access(cmd_buf, F_OK) != 0) {
write_runlog(ERROR, "file not found, instance maybe lost, command: %s, errno[%d].\n", cmd_buf, errno);
return true;
}
ret =
snprintf_s(cmd_buf, sizeof(cmd_buf), MAXPGPATH * 2 - 1, "grep \"%s\" %s/postgresql.conf", port_name, file_path);
securec_check_intval(ret, (void)ret);
if (!ExecuteCmdWithResult(cmd_buf, result, NAMEDATALEN)) {
write_runlog(ERROR, "update failed, command: %s, errno[%d].\n", cmd_buf, errno);
return false;
}
write_runlog(LOG, "command: %s, result: %s\n", cmd_buf, result);
ret = snprintf_s(
cmd_buf, sizeof(cmd_buf), MAXPGPATH * 2 - 1, "grep \"#%s\" %s/postgresql.conf|wc -l", port_name, file_path);
securec_check_intval(ret, (void)ret);
if (!ExecuteCmdWithResult(cmd_buf, result, NAMEDATALEN)) {
write_runlog(ERROR, "update failed, command: %s, errno[%d].\n", cmd_buf, errno);
return false;
}
write_runlog(LOG, "command: %s, result: %s\n", cmd_buf, result);
need_update = strtol(result, NULL, 10);
if (need_update == 1) {
g_cmAgentNeedAlterPgxcNode = true;
return ModifyDatanodePort(port_name, port, file_path);
}
return true;
}
bool UpdateLibcommConfig(void)
{
uint32 j;
uint32 libcomm_sctp_port = 0;
uint32 libcomm_ctrl_port = 0;
bool re = false;
bool result = true;
if (g_currentNode->coordinate == 1) {
libcomm_sctp_port = GetLibcommPort(g_currentNode->DataPath, g_currentNode->coordinatePort, COMM_PORT_TYPE_DATA);
re = UpdateLibcommPort(g_currentNode->DataPath, "comm_sctp_port", libcomm_sctp_port);
if (!re) {
result = false;
}
libcomm_ctrl_port = GetLibcommPort(g_currentNode->DataPath, g_currentNode->coordinatePort, COMM_PORT_TYPE_CTRL);
re = UpdateLibcommPort(g_currentNode->DataPath, "comm_control_port", libcomm_ctrl_port);
if (!re) {
result = false;
}
}
for (j = 0; j < g_currentNode->datanodeCount; j++) {
if (g_multi_az_cluster) {
* In primary multiple standby cluster,
* the DN port comm_sctp_port and comm_comtrol_port
* settings are the same as CN.
*/
libcomm_sctp_port = (g_currentNode->datanode[j].datanodePort + 2);
libcomm_ctrl_port = (g_currentNode->datanode[j].datanodePort + 3);
} else {
libcomm_sctp_port = (g_currentNode->datanode[j].datanodePort +
GetDatanodeNumSort(g_currentNode, g_currentNode->datanode[j].datanodeRole) * 2);
libcomm_ctrl_port = (g_currentNode->datanode[j].datanodePort +
GetDatanodeNumSort(g_currentNode, g_currentNode->datanode[j].datanodeRole) * 2 + 1);
}
re = UpdateLibcommPort(g_currentNode->datanode[j].datanodeLocalDataPath, "comm_sctp_port", libcomm_sctp_port);
if (!re) {
result = false;
}
re =
UpdateLibcommPort(g_currentNode->datanode[j].datanodeLocalDataPath, "comm_control_port", libcomm_ctrl_port);
if (!re) {
result = false;
}
}
return result;
}
bool Is_cn_replacing()
{
struct stat stat_buf = {0};
char instance_replace[MAX_PATH_LEN] = {0};
int rc = snprintf_s(instance_replace,
MAX_PATH_LEN,
MAX_PATH_LEN - 1,
"%s/%s_%u",
g_binPath,
CM_INSTANCE_REPLACE,
g_currentNode->coordinateId);
securec_check_intval(rc, (void)rc);
int state_cn_replace = stat(instance_replace, &stat_buf);
return (state_cn_replace == 0) ? true : false;
}
uint32 cm_get_first_cn_node()
{
for (int32 nodeidx = 0; nodeidx < (int)g_node_num; nodeidx++) {
if (g_node[nodeidx].coordinate == 1) {
return g_node[nodeidx].node;
}
}
return 0;
}
#ifdef __aarch64__
* @Description: process bind cpu
* @IN: instance_index, primary_dn_index, pid
* @Return: void
*/
void process_bind_cpu(uint32 instance_index, uint32 primary_dn_index, pgpid_t pid)
{
int ret;
int rcs;
bool dn_need_bind = true;
bool datanode_is_primary = false;
uint8 physical_cpu_num = (uint8)PHYSICAL_CPU_NUM;
uint32 bind_start_cpu = 0;
uint32 bind_end_cpu = 0;
uint32 dn_res = g_datanode_primary_num % physical_cpu_num;
char command[MAX_PATH_LEN] = {0};
if (agent_process_cpu_affinity == 0) {
return;
}
if (dn_res) {
dn_need_bind = (primary_dn_index < (g_datanode_primary_num - dn_res));
}
datanode_is_primary =
g_dn_report_msg_ok
? g_dnReportMsg[instance_index].dnStatus.reportMsg.local_status.local_role == INSTANCE_ROLE_PRIMARY
: PRIMARY_DN == g_currentNode->datanode[instance_index].datanodeRole;
if (datanode_is_primary && dn_need_bind) {
* 1. calculate start_core and end_core,
* total_cpu_core_num / physical_cpu_num gets the cpu core num of a single physical cpu
* primary_dn_index % physical_cpu_num gets the index of "socket_group"
*/
bind_start_cpu = (primary_dn_index % physical_cpu_num) * (total_cpu_core_num / physical_cpu_num);
bind_end_cpu = (primary_dn_index % physical_cpu_num + 1) * (total_cpu_core_num / physical_cpu_num) - 1;
rcs = snprintf_s(command, sizeof(command), sizeof(command) - 1,
"taskset -pac %u-%u %ld 1>/dev/null",
bind_start_cpu,
bind_end_cpu,
pid);
} else {
* For those not belongs to primary dn, do reset affinity process
* 2. build taskset cammand to reset affinity
*/
bind_start_cpu = 0;
bind_end_cpu = total_cpu_core_num - 1;
rcs = snprintf_s(command,
sizeof(command),
sizeof(command) - 1,
"taskset -pac %u-%u %ld 1>/dev/null",
bind_start_cpu,
bind_end_cpu,
pid);
}
securec_check_intval(rcs, (void)rcs);
ret = system(command);
if (ret != 0) {
write_runlog(ERROR, "run system command failed %d! %s, errno=%d.\n", ret, command, errno);
}
}
#endif
void switch_system_call_log(const char *file_name)
{
#define MAX_SYSTEM_CALL_LOG_SIZE (16 * 1024 * 1024)
pg_time_t current_time;
struct tm *systm = NULL;
char currentTime[LOG_MAX_TIMELEN] = {0};
char command[MAXPGPATH] = {0};
char logFileBuff[MAXPGPATH] = {0};
char historyLogName[MAXPGPATH] = {0};
Assert(file_name != NULL);
long filesize;
struct stat statbuff;
int ret = stat(file_name, &statbuff);
if (ret == -1) {
write_runlog(WARNING, "stat system call log error, ret=%d, errno=%d.\n", ret, errno);
return;
} else {
filesize = statbuff.st_size;
}
if (filesize > MAX_SYSTEM_CALL_LOG_SIZE) {
current_time = time(NULL);
systm = localtime(¤t_time);
if (systm != NULL) {
(void)strftime(currentTime, LOG_MAX_TIMELEN, "-%Y-%m-%d_%H%M%S", systm);
} else {
write_runlog(WARNING, "switch_system_call_log get localtime failed.");
}
int rcs = snprintf_s(logFileBuff, MAXPGPATH, MAXPGPATH - 1, "%s%s.log", SYSTEM_CALL_LOG, currentTime);
securec_check_intval(rcs, (void)rcs);
rcs = snprintf_s(historyLogName, MAXPGPATH, MAXPGPATH - 1, "%s/%s", sys_log_path, logFileBuff);
securec_check_intval(rcs, (void)rcs);
rcs = snprintf_s(command, MAXPGPATH, MAXPGPATH - 1,
"cp %s %s;> %s", system_call_log, historyLogName, system_call_log);
securec_check_intval(rcs, (void)rcs);
rcs = system(command);
if (rcs != 0) {
write_runlog(ERROR, "failed to switch system_call logfile. cmd:%s. return:(%d,%d), erron=%d.\n",
command, rcs, WEXITSTATUS(rcs), errno);
} else {
write_runlog(LOG, "switch system_call logfile successfully. cmd:%s.\n", command);
}
}
(void)chmod(file_name, S_IRUSR | S_IWUSR);
return;
}
void CheckGDBAndCaptureStack()
{
if (system("which gdb > /dev/null 2>&1") == 0) {
char command[MAX_PATH_LEN];
char timestamp[MAX_PATH_LEN];
time_t now = time(NULL);
struct tm *tm_info = localtime(&now);
strftime(timestamp, sizeof(timestamp), "%Y%m%d_%H%M%S", tm_info);
errno_t rc = snprintf_s(command, MAX_PATH_LEN, MAX_PATH_LEN - 1, "gstack %d >> %s/cm_agent_stack-%s.log",
getpid(), sys_log_path, timestamp);
securec_check_intval(rc, (void)rc);
system(command);
write_runlog(LOG, "Captured stack trace using gstack for process %d.\n", getpid());
} else {
write_runlog(LOG, "gdb not found, skipping stack capture.\n");
}
}
void CheckActivityTimeout(int index)
{
time_t currentTime = time(NULL);
if (difftime(currentTime, threadActivities[index].lastActiveTime) > ACTIVITY_TIMEOUT) {
write_runlog(WARNING, "Thread ID: %lu has timed out.\n",
threadActivities[index].threadId);
if (difftime(currentTime, lastStackCaptureTime) > STACK_CAPTURE_TIMEOUT) {
CheckGDBAndCaptureStack();
lastStackCaptureTime = currentTime;
} else {
write_runlog(WARNING, "Stack capture skipped for Thread ID: %lu, last capture was within 30 seconds.\n",
threadActivities[index].threadId);
}
}
}
void CheckAllThreadActivities()
{
for (int i = 0; i < activities_index; i++) {
pthread_rwlock_wrlock(&activitiesMutex);
CheckActivityTimeout(i);
pthread_rwlock_unlock(&activitiesMutex);
}
}
void server_loop(void)
{
int pid;
int pstat;
int rc;
uint32 recv_count = 0;
uint32 msgPoolCount = 0;
timespec startTime = {0, 0};
timespec endTime = {0, 0};
struct stat statbuf = {0};
const int msgPoolInfoPrintTime = 60 * 1000 * 1000;
thread_name = "main";
(void)clock_gettime(CLOCK_MONOTONIC, &startTime);
(void)clock_gettime(CLOCK_MONOTONIC, &g_disconnectTime);
const int pauseLogInterval = 5;
int pauseLogTimes = 0;
for (;;) {
if (g_shutdownRequest) {
cm_sleep(5);
continue;
}
if (access(g_cmManualPausePath, F_OK) == 0) {
g_isPauseArbitration = true;
if (pauseLogTimes == 0) {
write_runlog(LOG, "The cluster has been paused.\n");
}
++pauseLogTimes;
pauseLogTimes = pauseLogTimes % pauseLogInterval;
} else {
g_isPauseArbitration = false;
pauseLogTimes = 0;
}
if (access(g_cmManualStartingPath, F_OK) == 0) {
g_isStarting = true;
} else {
g_isStarting = false;
}
if (access(g_cmManualWalRecordPath, F_OK) == 0) {
g_enableWalRecord = true;
} else {
g_enableWalRecord = false;
}
(void)clock_gettime(CLOCK_MONOTONIC, &endTime);
if (g_isStart) {
g_suppressAlarm = true;
if (endTime.tv_sec - startTime.tv_sec >= 300) {
g_suppressAlarm = false;
g_isStart = false;
}
}
if (recv_count >= (agent_report_interval * 1000 * 1000) / AGENT_RECV_CYCLE) {
pid = waitpid(-1, &pstat, WNOHANG);
if (pid > 0) {
write_runlog(LOG, "child process have die! pid is %d exit status is %d\n ", pid, pstat);
}
switch_system_call_log(system_call_log);
clean_system_alarm_log(system_alarm_log, sys_log_path);
the directory $GAUSSHOME/bin, the g_cmagentLockfile will lost, agent should not exit */
if ((stat(g_cmagentLockfile, &statbuf) != 0) && (undocumentedVersion == 0)) {
write_runlog(FATAL, "lock file doesn't exist.\n");
exit(1);
}
rc = read_config_file_check();
if (rc < 0) {
write_runlog(ERROR, "read_config_file_check failed when start in server_loop!\n");
}
if (g_node == NULL) {
cm_sleep(5);
continue;
}
recv_count = 0;
}
if (msgPoolCount > (msgPoolInfoPrintTime / AGENT_RECV_CYCLE)) {
PrintMsgBufPoolUsage(LOG);
msgPoolCount = 0;
}
check_thread_state();
if (g_gotParameterReload == 1) {
ReloadParametersFromConfigfile();
g_gotParameterReload = 0;
}
if (!g_enableWalRecord) {
CheckAllThreadActivities();
}
CmUsleep(AGENT_RECV_CYCLE);
recv_count++;
msgPoolCount++;
}
}
static void DoHelp(void)
{
(void)printf(_("%s is a utility to monitor an instance.\n\n"), g_progname);
(void)printf(_("Usage:\n"));
(void)printf(_(" %s\n"), g_progname);
(void)printf(_(" %s 0\n"), g_progname);
(void)printf(_(" %s 1\n"), g_progname);
(void)printf(_(" %s 2\n"), g_progname);
(void)printf(_(" %s 3\n"), g_progname);
(void)printf(_(" %s normal\n"), g_progname);
(void)printf(_(" %s abnormal\n"), g_progname);
(void)printf(_("\nCommon options:\n"));
(void)printf(_(" -?, -h, --help show this help, then exit\n"));
(void)printf(_(" -V, --version output version information, then exit\n"));
(void)printf(_("\nlocation of the log information options:\n"));
(void)printf(_(" 0 LOG_DESTION_FILE\n"));
(void)printf(_(" 1 LOG_DESTION_SYSLOG\n"));
(void)printf(_(" 2 LOG_DESTION_FILE\n"));
(void)printf(_(" 3 LOG_DESTION_DEV_NULL\n"));
(void)printf(_("\nstarted mode options:\n"));
(void)printf(_(" normal cm_agent is started normally\n"));
(void)printf(_(" abnormal cm_agent is started by killed\n"));
}
* Replace character to another.
*/
int replaceStr(char *sSrc, const char *sMatchStr, const char *sReplaceStr)
{
char caNewString[MAX_PATH_LEN];
errno_t rc;
if (sMatchStr == NULL) {
return -1;
}
char *FindPos = strstr(sSrc, sMatchStr);
if (FindPos == NULL) {
return 0;
}
while (FindPos != NULL) {
rc = memset_s(caNewString, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
long StringLen = FindPos - sSrc;
rc = strncpy_s(caNewString, MAX_PATH_LEN, sSrc, (size_t)StringLen);
securec_check_errno(rc, (void)rc);
rc = strcat_s(caNewString, MAX_PATH_LEN, sReplaceStr);
securec_check_errno(rc, (void)rc);
rc = strcat_s(caNewString, MAX_PATH_LEN, FindPos + strlen(sMatchStr));
securec_check_errno(rc, (void)rc);
rc = strcpy_s(sSrc, MAX_PATH_LEN, caNewString);
securec_check_errno(rc, (void)rc);
FindPos = strstr(sSrc, sMatchStr);
}
return 0;
}
* Cut time from trace name.
* This time will be used to sort traces.
*/
void cutTimeFromFileLog(const char *fileName, char *pattern, uint32 patternLen, char *strTime)
{
errno_t rc;
char subStr2[MAX_PATH_LEN] = {'\0'};
char subStr5[MAX_PATH_LEN] = {'\0'};
char *saveStr = NULL;
char *saveStr2 = NULL;
char subStr3[MAX_PATH_LEN] = {'\0'};
char tempTimeStamp[MAX_TIME_LEN] = {'\0'};
rc = memcpy_s(subStr2, MAX_PATH_LEN, fileName, strlen(fileName) + 1);
securec_check_errno(rc, (void)rc);
rc = memcpy_s(subStr5, MAX_PATH_LEN, fileName, strlen(fileName) + 1);
securec_check_errno(rc, (void)rc);
char *subStr4 = strstr(subStr5, "-");
char *subStr = strtok_r(subStr2, "-", &saveStr);
if (subStr == NULL) {
write_runlog(ERROR, "file path name get failed.\n");
return;
}
rc = snprintf_s(subStr3, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s%s", subStr, "-");
securec_check_intval(rc, (void)rc);
rc = memcpy_s(pattern, patternLen, subStr3, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
if (strstr(fileName, "-current.log") != NULL) {
rc = snprintf_s(tempTimeStamp, MAX_TIME_LEN, MAX_TIME_LEN - 1, "%s", MAX_LOGFILE_TIMESTAMP);
securec_check_intval(rc, (void)rc);
rc = memcpy_s(strTime, MAX_TIME_LEN, tempTimeStamp, MAX_TIME_LEN);
securec_check_errno(rc, (void)rc);
return;
}
if (subStr4 != NULL) {
subStr = strtok_r(subStr4, ".", &saveStr2);
if (subStr != NULL) {
(void)replaceStr(subStr, "-current", "");
if (subStr != NULL) {
(void)replaceStr(subStr, "-", "");
if (subStr != NULL) {
(void)replaceStr(subStr, "_", "");
if (is_digit_string(subStr)) {
rc = memcpy_s(strTime, MAX_TIME_LEN, subStr, strlen(subStr) + 1);
securec_check_errno(rc, (void)rc);
}
}
}
}
}
}
* Read all traces by log pattern,including zip file and non zip file.
* Trace information are file time,file size,file path.These traces are
* saved in the global variable.
*/
int readFileList(const char *basePath, LogFile *logFile, uint32 *count, int64 *totalSize, uint32 maxCount)
{
errno_t rc;
DIR *dir;
struct dirent *ptr = NULL;
char base[MAX_PATH_LEN] = {'\0'};
char path[MAX_PATH_LEN] = {'\0'};
char strTime[MAX_TIME_LEN] = {'\0'};
char pattern[MAX_PATH_LEN] = {'\0'};
if ((dir = opendir(basePath)) == NULL) {
write_runlog(ERROR, "could not open file %s", basePath);
return -1;
}
while (*count < maxCount && (ptr = readdir(dir)) != NULL) {
struct stat stat_buf;
if (strcmp(ptr->d_name, ".") == 0 || strcmp(ptr->d_name, "..") == 0) {
continue;
}
rc = snprintf_s(path, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/%s", basePath, ptr->d_name);
securec_check_intval(rc, (void)rc);
if (unlikely(stat(path, &stat_buf) < 0)) {
write_runlog(LOG, "could not stat file %s\n", path);
continue;
}
if (S_ISREG(stat_buf.st_mode) && isLogFile(ptr->d_name)) {
cutTimeFromFileLog(ptr->d_name, pattern, sizeof(pattern), strTime);
if (strTime[0] != 0) {
if (logFile != NULL) {
*totalSize += stat_buf.st_size;
rc = memcpy_s(logFile[*count].fileName, MAX_PATH_LEN, path, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
rc = memcpy_s(logFile[*count].basePath, MAX_PATH_LEN, basePath, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
rc = memcpy_s(logFile[*count].timestamp, MAX_TIME_LEN, strTime, MAX_TIME_LEN);
securec_check_errno(rc, (void)rc);
rc = memcpy_s(logFile[*count].pattern, MAX_PATH_LEN, pattern, MAX_PATH_LEN);
securec_check_errno(rc, (void)rc);
rc = memcpy_s(&logFile[*count].fileSize, sizeof(int64), &stat_buf.st_size, sizeof(int64));
securec_check_errno(rc, (void)rc);
}
*count += 1;
}
} else if (S_ISDIR(stat_buf.st_mode)) {
rc = memset_s(base, sizeof(base), '\0', sizeof(base));
securec_check_errno(rc, (void)rc);
rc = strcpy_s(base, MAX_PATH_LEN, basePath);
securec_check_errno(rc, (void)rc);
rc = strcat_s(base, MAX_PATH_LEN, "/");
securec_check_errno(rc, (void)rc);
rc = strcat_s(base, MAX_PATH_LEN, ptr->d_name);
securec_check_errno(rc, (void)rc);
if (readFileList(base, logFile, count, totalSize, maxCount) < 0) {
write_runlog(ERROR, "readFileList() fail.");
}
}
}
(void)closedir(dir);
return 0;
}
static int cmagent_unlock(void)
{
int ret = flock(fileno(g_lockfile), LOCK_UN);
if (g_lockfile != NULL) {
(void)fclose(g_lockfile);
g_lockfile = NULL;
}
return ret;
}
static int cmagent_lock(void)
{
int ret;
struct stat statbuf = {0};
if (stat(g_cmagentLockfile, &statbuf) != 0) {
char content[MAX_PATH_LEN] = {0};
g_lockfile = fopen(g_cmagentLockfile, PG_BINARY_W);
if (g_lockfile == NULL) {
(void)fprintf(stderr, "FATAL %s: can't open lock file \"%s\" : %s\n",
g_progname, g_cmagentLockfile, strerror(errno));
exit(1);
}
(void)chmod(g_cmagentLockfile, S_IRUSR | S_IWUSR);
if (fwrite(content, MAX_PATH_LEN, 1, g_lockfile) != 1) {
(void)fclose(g_lockfile);
g_lockfile = NULL;
(void)fprintf(stderr,
"FATAL %s: can't write lock file \"%s\" : %s\n",
g_progname, g_cmagentLockfile, strerror(errno));
exit(1);
}
(void)fclose(g_lockfile);
g_lockfile = NULL;
(void)chmod(g_cmagentLockfile, S_IRUSR | S_IWUSR);
}
if ((g_lockfile = fopen(g_cmagentLockfile, PG_BINARY_W)) == NULL) {
(void)fprintf(stderr, "FATAL %s: could not open lock file \"%s\" : %s\n",
g_progname, g_cmagentLockfile, strerror(errno));
exit(1);
}
if (SetFdCloseExecFlag(g_lockfile) < 0) {
(void)fprintf(stderr, "%s: can't set file flag\"%s\" : %s\n", g_progname, g_cmagentLockfile, strerror(errno));
}
ret = flock(fileno(g_lockfile), LOCK_EX | LOCK_NB);
return ret;
}
void GetAgentConfigEx()
{
if (get_config_param(configDir, "enable_cn_auto_repair", g_enableCnAutoRepair, sizeof(g_enableCnAutoRepair)) < 0) {
(void)fprintf(stderr, "get_config_param() get enable_cn_auto_repair fail.\n");
}
if (get_config_param(configDir, "enable_log_compress", g_enableLogCompress, sizeof(g_enableLogCompress)) < 0) {
(void)fprintf(stderr, "get_config_param() get enable_log_compress fail.\n");
}
if (get_config_param(configDir, "enable_vtable", g_enableVtable, sizeof(g_enableVtable)) < 0) {
(void)fprintf(stderr, "get_config_param() get enable_vtable fail.\n");
}
if (get_config_param(configDir, "enable_ssl", g_enableMesSsl, sizeof(g_enableMesSsl)) < 0) {
(void)fprintf(stderr, "get_config_param() get enable_ssl fail.\n");
}
if (get_config_param(configDir, "security_mode", g_enableOnlineOrOffline, sizeof(g_enableOnlineOrOffline)) < 0) {
(void)fprintf(stderr, "get_config_param() get security_mode fail.\n");
}
if (get_config_param(configDir, "incremental_build", g_enableIncrementalBuild, sizeof(g_enableIncrementalBuild)) < 0) {
(void)fprintf(stderr, "get_config_param() get incremental_build fail.\n");
}
if (get_config_param(configDir, "unix_socket_directory", g_unixSocketDirectory, sizeof(g_unixSocketDirectory)) <
0) {
(void)fprintf(stderr, "get_config_param() get unix_socket_directory fail.\n");
} else {
check_input_for_security(g_unixSocketDirectory);
}
if (get_config_param(configDir, "voting_disk_path", g_votingDiskPath, sizeof(g_votingDiskPath)) < 0) {
(void)fprintf(stderr, "get_config_param() get voting_disk_path fail.\n");
}
canonicalize_path(g_votingDiskPath);
g_diskTimeout = get_uint32_value_from_config(configDir, "disk_timeout", 200);
log_max_size = get_int_value_from_config(configDir, "log_max_size", 10240);
log_saved_days = get_uint32_value_from_config(configDir, "log_saved_days", 90);
log_max_count = get_uint32_value_from_config(configDir, "log_max_count", 10000);
g_cmaRhbItvl = get_uint32_value_from_config(configDir, "agent_rhb_interval", 1000);
g_sslOption.expire_time = get_uint32_value_from_config(configDir, "ssl_cert_expire_alert_threshold",
CM_DEFAULT_SSL_EXPIRE_THRESHOLD);
g_sslCertExpireCheckInterval = get_uint32_value_from_config(configDir, "ssl_cert_expire_check_interval",
SECONDS_PER_DAY);
if (g_sslOption.expire_time < CM_MIN_SSL_EXPIRE_THRESHOLD ||
g_sslOption.expire_time > CM_MAX_SSL_EXPIRE_THRESHOLD) {
write_runlog(ERROR, "invalid ssl expire alert threshold %u, must between %u and %u\n",
g_sslOption.expire_time, CM_MIN_SSL_EXPIRE_THRESHOLD, CM_MAX_SSL_EXPIRE_THRESHOLD);
}
}
static void GetAlarmConf()
{
char alarmPath[MAX_PATH_LEN] = {0};
int rcs = GetHomePath(alarmPath, sizeof(alarmPath));
if (rcs != EOK) {
write_runlog(ERROR, "Get GAUSSHOME failed, please check.\n");
return;
}
canonicalize_path(alarmPath);
int rc =
snprintf_s(g_alarmConfigDir, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/alarmConfig.conf", alarmPath);
securec_check_intval(rc, (void)rc);
GetAlarmConfig(g_alarmConfigDir);
}
int get_agent_global_params_from_configfile()
{
int rc =
snprintf_s(configDir, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/cm_agent/cm_agent.conf", g_currentNode->cmDataPath);
securec_check_intval(rc, (void)rc);
check_input_for_security(configDir);
canonicalize_path(configDir);
if (cmagent_lock() == -1) {
return -1;
}
GetAlarmConf();
get_log_paramter(configDir);
GetStringFromConf(configDir, sys_log_path, sizeof(sys_log_path), "log_dir");
check_input_for_security(sys_log_path);
get_build_mode(configDir);
get_start_mode(configDir);
get_connection_mode(configDir);
GetStringFromConf(configDir, g_environmentThreshold, sizeof(g_environmentThreshold), "environment_threshold");
LoadDiskCheckConfig(configDir);
GetStringFromConf(configDir, g_dbServiceVip, sizeof(g_dbServiceVip), "db_service_vip");
if (g_dbServiceVip[0] == '\0') {
write_runlog(LOG, "parameter \"db_service_vip\" is not provided, please check!\n");
} else if (!IsIPAddrValid(g_dbServiceVip)) {
write_runlog(ERROR, "value of parameter \"db_service_vip\" is invalid, please check!\n");
return -1;
}
agent_report_interval = get_uint32_value_from_config(configDir, "agent_report_interval", 1);
agent_heartbeat_timeout = get_uint32_value_from_config(configDir, "agent_heartbeat_timeout", 8);
agent_connect_timeout = get_uint32_value_from_config(configDir, "agent_connect_timeout", 1);
agent_backup_open = (ClusterRole)get_uint32_value_from_config(configDir, "agent_backup_open", CLUSTER_PRIMARY);
agent_connect_retries = get_uint32_value_from_config(configDir, "agent_connect_retries", 15);
agent_check_interval = get_uint32_value_from_config(configDir, "agent_check_interval", 2);
g_diskUsageThreshold =
get_uint32_value_from_config(configDir, "diskusage_threshold_value_check", diskUsageDefaultThreshold);
agent_kill_instance_timeout = get_uint32_value_from_config(configDir, "agent_kill_instance_timeout", 0);
agent_phony_dead_check_interval = get_uint32_value_from_config(configDir, "agent_phony_dead_check_interval", 10);
enable_gtm_phony_dead_check = get_uint32_value_from_config(configDir, "enable_gtm_phony_dead_check", 1);
g_enableE2ERto = (uint32)get_int_value_from_config(configDir, "enable_e2e_rto", 0);
g_disasterRecoveryType =
(DisasterRecoveryType)get_uint32_value_from_config(configDir, "disaster_recovery_type", DISASTER_RECOVERY_NULL);
agent_phony_dead_check_interval = g_enableE2ERto == 1 ? 1 : agent_phony_dead_check_interval;
g_ssDoubleClusterMode =
(SSDoubleClusterMode)get_uint32_value_from_config(configDir, "ss_double_cluster_mode", SS_DOUBLE_NULL);
log_threshold_check_interval =
get_uint32_value_from_config(configDir, "log_threshold_check_interval", log_threshold_check_interval);
undocumentedVersion = get_uint32_value_from_config(configDir, "upgrade_from", 0);
dilatation_shard_count_for_disk_capacity_alarm = get_uint32_value_from_config(
configDir, "dilatation_shard_count_for_disk_capacity_alarm", dilatation_shard_count_for_disk_capacity_alarm);
if (get_config_param(configDir, "enable_dcf", g_agentEnableDcf, sizeof(g_agentEnableDcf)) < 0) {
write_runlog(ERROR, "get_config_param() get enable_dcf fail.\n");
}
#ifndef ENABLE_MULTIPLE_NODES
if (get_config_param(configDir, "enable_fence_dn", g_enableFenceDn, sizeof(g_enableFenceDn)) < 0) {
write_runlog(ERROR, "get_config_param() get enable_fence_dn fail.\n");
}
#endif
GetEventTrigger();
#ifdef __aarch64__
agent_process_cpu_affinity = get_uint32_value_from_config(configDir, "process_cpu_affinity", 0);
if (agent_process_cpu_affinity > CPU_AFFINITY_MAX) {
(void)fprintf(stderr, "CM parameter 'process_cpu_affinity':%d is bigger than limit:%d\n",
agent_process_cpu_affinity, CPU_AFFINITY_MAX);
agent_process_cpu_affinity = 0;
}
total_cpu_core_num = get_nprocs();
(void)fprintf(stdout, "total_cpu_core_num is %d, agent_process_cpu_affinity is %d\n",
total_cpu_core_num, agent_process_cpu_affinity);
#endif
GetAgentConfigEx();
return 0;
}
static status_t InitSendDdbOperRes()
{
if (g_currentNode->coordinate == 0) {
return CM_SUCCESS;
}
(void)pthread_rwlock_init(&(g_gtmSendDdbOper.lock), NULL);
(void)pthread_rwlock_init(&(g_gtmCmDdbOperRes.lock), NULL);
size_t sendLen = sizeof(CltSendDdbOper);
CltSendDdbOper *sendOper = (CltSendDdbOper *)malloc(sendLen);
if (sendOper == NULL) {
write_runlog(ERROR, "sendOper is NULL, cma will exit.\n");
return CM_ERROR;
}
errno_t rc = memset_s(sendOper, sendLen, 0, sendLen);
securec_check_errno(rc, FREE_AND_RESET(sendOper));
g_gtmSendDdbOper.sendOper = sendOper;
size_t operResLen = sizeof(CmSendDdbOperRes);
CmSendDdbOperRes *ddbOperRes = (CmSendDdbOperRes *)malloc(operResLen);
if (ddbOperRes == NULL) {
free(sendOper);
sendOper = NULL;
write_runlog(ERROR, "ddbOperRes is NULL, cma will exit.\n");
return CM_ERROR;
}
rc = memset_s(ddbOperRes, operResLen, 0, operResLen);
securec_check_errno(rc, FREE_AND_RESET(ddbOperRes));
g_gtmCmDdbOperRes.ddbOperRes = ddbOperRes;
return CM_SUCCESS;
}
static void InitNeedInfoRes()
{
size_t syncListLen = sizeof(DnSyncListInfo) * CM_MAX_DATANODE_PER_NODE;
errno_t rc = memset_s(g_dnSyncListInfo, syncListLen, 0, syncListLen);
securec_check_errno(rc, (void)rc);
size_t doWriteLen = sizeof(CmDoWriteOper) * CM_MAX_DATANODE_PER_NODE;
rc = memset_s(g_cmDoWriteOper, doWriteLen, 0, doWriteLen);
securec_check_errno(rc, (void)rc);
}
static inline void InitResReportMsg()
{
errno_t rc = memset_s(&g_resReportMsg, sizeof(OneNodeResStatusInfo), 0, sizeof(OneNodeResStatusInfo));
securec_check_errno(rc, (void)rc);
InitResStatCommInfo(&g_resReportMsg.resStat);
(void)pthread_rwlock_init(&(g_resReportMsg.rwlock), NULL);
}
static void CreateCusResThread()
{
if (IsCusResExistLocal()) {
CreateRecvClientMessageThread();
CreateSendMessageToClientThread();
CreateProcessMessageThread();
InitResReportMsg();
CreateDefResStatusCheckThread();
CreateCusResIsregCheckThread();
} else {
write_runlog(LOG, "[CLIENT] no resource, start client thread is unnecessary.\n");
}
}
static status_t CmaReadCusResConf()
{
int ret = ReadCmConfJson((void*)write_runlog);
if (!IsReadConfJsonSuccess(ret)) {
write_runlog(FATAL, "read cm conf json failed, ret=%d, reason=\"%s\".\n", ret, ReadConfJsonFailStr(ret));
return CM_ERROR;
}
if (InitAllResStat() != CM_SUCCESS) {
write_runlog(FATAL, "init res status failed.\n");
return CM_ERROR;
}
if (InitLocalResConf() != CM_SUCCESS) {
write_runlog(FATAL, "init local res conf failed.\n");
return CM_ERROR;
}
return CM_SUCCESS;
}
static void InitAgentGlobalVariable()
{
for (uint32 i = 0; i < GetLocalResConfCount(); ++i) {
if (strcmp(g_resConf[i].resName, "dss") == 0 || strcmp(g_resConf[i].resName, "dms_res") == 0) {
g_isStorageWithDMSorDSS = true;
write_runlog(LOG, "This node has dms or dss enabled.\n");
return;
}
}
}
void InitActivity()
{
threadActivities = (ThreadActivity*)malloc(MAX_THREADS * sizeof(ThreadActivity));
(void)pthread_rwlock_init(&activitiesMutex, NULL);
activities_index = 0;
}
void AddThreadActivity(int *index, pthread_t threadId)
{
pthread_rwlock_wrlock(&activitiesMutex);
*index = activities_index++;
threadActivities[*index].threadId = threadId;
threadActivities[*index].lastActiveTime = time(NULL);
pthread_rwlock_unlock(&activitiesMutex);
}
void UpdateThreadActivity(int index)
{
pthread_rwlock_wrlock(&activitiesMutex);
threadActivities[index].lastActiveTime = time(NULL);
pthread_rwlock_unlock(&activitiesMutex);
}
int main(int argc, char** argv)
{
uid_t uid = getuid();
if (uid == 0) {
(void)printf("current user is the root user (uid = 0), exit.\n");
return 1;
}
int status;
uint32 i;
size_t lenth = 0;
int *thread_index = NULL;
errno_t rc;
const int maxArgcNum = 2;
bool &isSharedStorageMode = GetIsSharedStorageMode();
if (argc > maxArgcNum) {
(void)printf(_("the argv is error, try cm_agent -h for more information!\n"));
return -1;
}
GetCmdlineOpt(argc, argv);
g_progname = "cm_agent";
prefix_name = g_progname;
if (argc > 1) {
if (strcmp(argv[1], "-h") == 0 || strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) {
DoHelp();
exit(0);
} else if (strcmp(argv[1], "-V") == 0 || strcmp(argv[1], "--version") == 0) {
(void)puts("cm_agent " DEF_CM_VERSION);
exit(0);
} else if (strcmp("normal", argv[1]) == 0) {
g_isStart = true;
lenth = strlen(argv[1]);
rc = memset_s(argv[1], lenth, 0, lenth);
securec_check_errno(rc, (void)rc);
} else if (strcmp("abnormal", argv[1]) == 0) {
g_isStart = false;
lenth = strlen(argv[1]);
rc = memset_s(argv[1], lenth, 0, lenth);
securec_check_errno(rc, (void)rc);
}
}
(void)syscalllockInit(&g_cmEnvLock);
init_signal_mask();
(void)sigprocmask(SIG_SETMASK, &block_sig, NULL);
setup_signal_handle(SIGHUP, reload_cmagent_parameters);
setup_signal_handle(SIGUSR1, RecvSigusrSingle);
#ifdef ENABLE_MULTIPLE_NODES
setup_signal_handle(SIGUSR2, SetFlagToUpdatePortForCnDn);
#endif
status = get_prog_path();
if (status < 0) {
(void)fprintf(stderr, "get_prog_path failed!\n");
return -1;
}
pw = getpwuid(getuid());
if (pw == NULL || pw->pw_name == NULL) {
(void)fprintf(stderr, "can not get current user name.\n");
return -1;
}
SetEnvSupportIpV6(CheckSupportIpV6());
if (RegistOpensslExitSignal(g_progname)) {
return -1;
}
status = CmSSlConfigInit(true);
if (status < 0) {
(void)fprintf(stderr, "read ssl cerfication files when start!\n");
return -1;
}
status = read_config_file_check();
if (status < 0) {
(void)fprintf(stderr, "read_config_file_check failed when start!\n");
return -1;
}
max_logic_cluster_name_len = (max_logic_cluster_name_len < strlen("logiccluster_name"))
? (uint32)strlen("logiccluster_name")
: max_logic_cluster_name_len;
(void)logfile_init();
if (get_agent_global_params_from_configfile() == -1) {
(void)fprintf(stderr, "Another cm_agent command is still running, start failed !\n");
return -1;
}
if (sys_log_path[0] == '\0') {
rc = strncpy_s(sys_log_path, sizeof(sys_log_path), g_currentNode->cmDataPath, MAXPGPATH - 1);
securec_check_errno(rc, (void)rc);
rc = strncat_s(sys_log_path, sizeof(sys_log_path), "/cm_agent/log", strlen("/cm_agent/log"));
securec_check_errno(rc, (void)rc);
(void)mkdir(sys_log_path, S_IRWXU);
} else {
if (sys_log_path[0] == '/') {
(void)CmMkdirP(sys_log_path, S_IRWXU);
} else {
char buf[MAXPGPATH] = {0};
rc = memset_s(buf, sizeof(buf), 0, MAXPGPATH);
securec_check_errno(rc, (void)rc);
rc = strncpy_s(buf, sizeof(buf), g_currentNode->cmDataPath, MAXPGPATH - 1);
securec_check_errno(rc, (void)rc);
rc = strncat_s(buf, sizeof(buf), "/cm_agent/", strlen("/cm_agent/"));
securec_check_errno(rc, (void)rc);
rc = strncat_s(buf, sizeof(buf), sys_log_path, strlen(sys_log_path));
securec_check_errno(rc, (void)rc);
rc = memcpy_s(sys_log_path, sizeof(sys_log_path), buf, MAXPGPATH);
securec_check_errno(rc, (void)rc);
(void)mkdir(sys_log_path, S_IRWXU);
}
}
status_t st = CreateSysLogFile();
if (st != CM_SUCCESS) {
exit(-1);
}
rc = memset_s(system_call_log, MAXPGPATH, 0, MAXPGPATH);
securec_check_errno(rc, (void)rc);
create_system_call_log();
create_system_alarm_log(sys_log_path);
print_environ();
if (g_currentNode->datanodeCount > CM_MAX_DATANODE_PER_NODE) {
write_runlog(FATAL,
"%u datanodes deployed on this node more than limit(%d)\n",
g_currentNode->datanodeCount,
CM_MAX_DATANODE_PER_NODE);
exit(1);
}
AlarmEnvInitialize();
InitializeAlarmItem(g_currentNode);
rc = snprintf_s(g_agentDataDir, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/cm_agent/", g_currentNode->cmDataPath);
securec_check_intval(rc, (void)rc);
(void)atexit(stop_flag);
if (CmaReadCusResConf() != CM_SUCCESS) {
exit(-1);
}
InitAgentGlobalVariable();
CmServerCmdProcessorInit();
InitActivity();
status = CreateCheckNetworkThread();
if (status != 0) {
exit(status);
}
#ifdef ENABLE_MULTIPLE_NODES
if (g_currentNode->gtm == 1) {
(void)pthread_rwlock_init(&(g_gtmReportMsg.lk_lock), NULL);
CreateGTMStatusCheckThread();
}
if (g_currentNode->coordinate == 1) {
(void)pthread_rwlock_init(&(g_cnReportMsg.lk_lock), NULL);
CreateCNStatusCheckThread();
CreateCCNStatusCheckThread();
CreateCNBackupStatusCheckThread();
}
#endif
InitNeedInfoRes();
if (g_currentNode->datanodeCount > 0) {
thread_index = (int *)malloc(sizeof(int) * g_currentNode->datanodeCount);
if (thread_index == NULL) {
write_runlog(FATAL, "out of memory\n");
exit(1);
}
for (i = 0; i < g_currentNode->datanodeCount; i++) {
thread_index[i] = (int)i;
#ifdef __aarch64__
g_datanode_primary_num += (PRIMARY_DN == g_currentNode->datanode[i].datanodeRole) ? 1 : 0;
g_datanode_primary_and_standby_num += (DUMMY_STANDBY_DN != g_currentNode->datanode[i].datanodeRole) ? 1 : 0;
#endif
}
for (i = 0; i < g_currentNode->datanodeCount; i++) {
(void)pthread_rwlock_init(&(g_dnReportMsg[i].lk_lock), NULL);
(void)pthread_rwlock_init(&(g_dnSyncListInfo[i].lk_lock), NULL);
(void)pthread_rwlock_init(&(g_cmDoWriteOper[i].lock), NULL);
int *ind = thread_index + i;
CreateDNStatusCheckThread(ind);
CreateDNDataDirectoryCheckThread(ind);
CreateDNConnectionStatusCheckThread(ind);
CreateDNCheckSyncListThread(ind);
CreateDNCheckAvailableSyncThread(ind);
#ifdef ENABLE_MULTIPLE_NODES
CreateDNBackupStatusCheckThread(ind);
CreateDNStorageScalingAlarmThread(ind);
#endif
if (g_enableWalRecord) {
CreateWRFloatIpCheckThread(ind);
}
}
}
status = cmagent_getenv("GAUSSLOG", g_logBasePath, sizeof(g_logBasePath));
if (status != EOK) {
write_runlog(FATAL, "get env GAUSSLOG fail.\n");
exit(status);
}
isSharedStorageMode = IsSharedStorageMode();
AllocCmaMsgQueueMemory();
AllQueueInit();
MsgPoolInit(MAX_MSG_BUF_POOL_SIZE, MAX_MSG_BUF_POOL_COUNT);
st = InitSendDdbOperRes();
if (st != CM_SUCCESS) {
write_runlog(FATAL, "failed to InitSendDdbOperRes.\n");
exit(-1);
}
check_input_for_security(g_logBasePath);
CreatePhonyDeadCheckThread();
CreateStartAndStopThread();
CreateFaultDetectThread();
CreateConnCmsPThread();
CreateCheckUpgradeModeThread();
CreateRhbCheckThreads();
CreateVotingDiskThread();
CreateCusResThread();
int err = CreateSendAndRecvCmsMsgThread();
if (err != 0) {
write_runlog(FATAL, "Failed to create send and recv thread: error %d\n", err);
exit(err);
}
err = CreateProcessSendCmsMsgThread();
if (err != 0) {
write_runlog(FATAL, "Failed to create send msg thread: error %d\n", err);
exit(err);
}
err = CreateProcessRecvCmsMsgThread();
if (err != 0) {
write_runlog(FATAL, "Failed to create process recv msg thread: error %d\n", err);
exit(err);
}
CreateKerberosStatusCheckThread();
CreateDiskUsageCheckThread();
CreateOnDemandRedoCheckThread();
CreateDiskHealthCheckThread();
#ifdef ENABLE_XALARMD
CreateXalarmEventCheckThread();
#endif
err = CreateCheckSysStatusThread();
if (err != 0) {
write_runlog(FATAL, "Failed to create check system status thread: error %d\n", err);
exit(err);
}
#ifdef ENABLE_MULTIPLE_NODES
err = CreateCheckNodeStatusThread();
if (err != 0) {
write_runlog(FATAL, "Failed to create check node status thread: error %d\n", err);
exit(err);
}
if (g_currentNode->coordinate > 0) {
err = CreateCnDnConnectCheckThread();
if (err != 0) {
write_runlog(FATAL, "Failed to create check conn status thread: error %d\n", err);
exit(err);
}
CreatePgxcNodeCheckThread();
}
if (g_currentNode->coordinate > 0) {
CreateGtmModeThread();
}
#endif
if (g_currentNode->etcd) {
(void)pthread_rwlock_init(&(g_etcdReportMsg.lk_lock), NULL);
CreateETCDStatusCheckThread();
CreateETCDConnectionStatusCheckThread();
}
if (CreateLogFileCompressAndRemoveThread() != 0) {
write_runlog(FATAL, "CreateLogFileCompressAndRemoveThread failed!\n");
exit(-1);
}
server_loop();
(void)cmagent_unlock();
write_runlog(LOG, "cm_agent exit\n");
if (thread_index != NULL) {
FREE_AND_RESET(thread_index);
}
exit(status);
}
uint32 GetLibcommDefaultPort(uint32 base_port, int port_type)
{
if (port_type == COMM_PORT_TYPE_DATA) {
if (security_mode) {
return COMM_DATA_DFLT_PORT;
} else {
return (base_port + 2);
}
} else {
if (security_mode) {
return COMM_CTRL_DFLT_PORT;
} else {
return (base_port + 3);
}
}
}
bool ExecuteCmdWithResult(char *cmd, char *result, int resultLen)
{
FILE *cmd_fd = popen(cmd, "r");
if (cmd_fd == NULL) {
write_runlog(ERROR, "popen %s failed, errno[%d].\n", cmd, errno);
return false;
}
if (fgets(result, resultLen - 1, cmd_fd) == NULL) {
(void)pclose(cmd_fd);
write_runlog(LOG, "fgets result for %s failed, errno[%d].\n", cmd, errno);
return false;
}
(void)pclose(cmd_fd);
return true;
}
uint32 GetLibcommPort(const char *file_path, uint32 base_port, int port_type)
{
char get_cmd[MAXPGPATH * 2];
char result[NAMEDATALEN] = {0};
int retry_cnt = 0;
uint32 port = 0;
const char *Keywords = NULL;
if (port_type == COMM_PORT_TYPE_DATA) {
Keywords = "comm_sctp_port";
} else {
Keywords = "comm_control_port";
}
int ret = snprintf_s(get_cmd,
sizeof(get_cmd),
MAXPGPATH * 2 - 1,
"grep \"^%s\" %s/postgresql.conf|awk \'{print $3}\'|tail -1",
Keywords,
file_path);
securec_check_intval(ret, (void)ret);
while (retry_cnt < MAX_RETRY_TIME) {
if (!ExecuteCmdWithResult(get_cmd, result, NAMEDATALEN)) {
retry_cnt++;
continue;
}
port = (uint32)strtol(result, NULL, 10);
if (port == 0 || port > 65535) {
port = GetLibcommDefaultPort(base_port, port_type);
write_runlog(
WARNING, "Custom %s: %ld is invalid, use the default:%u.\n", Keywords, strtol(result, NULL, 10), port);
return port;
} else {
write_runlog(LOG, "Custom %s: %u has found.\n", Keywords, port);
return port;
}
}
port = GetLibcommDefaultPort(base_port, port_type);
write_runlog(LOG, "No custom %s found, use the default:%u.\n", Keywords, port);
return port;
}
static EventTriggerType GetTriggerTypeFromStr(const char *typeStr)
{
for (int i = EVENT_START; i < EVENT_COUNT; ++i) {
if (strcmp(typeStr, triggerTypeStringMap[i].typeStr) == 0) {
return triggerTypeStringMap[i].type;
}
}
write_runlog(ERROR, "Event trigger type %s is not supported.\n", typeStr);
return EVENT_UNKNOWN;
}
* check trigger item, key and value can't be empty and must be string,
* value must be shell script file, current user has right permission.
*/
static status_t CheckEventTriggersItem(const cJSON *item)
{
if (!cJSON_IsString(item)) {
write_runlog(ERROR, "The trigger value must be string.\n");
return CM_ERROR;
}
char *valuePtr = item->valuestring;
if (valuePtr == NULL || strlen(valuePtr) == 0) {
write_runlog(ERROR, "The trigger value can't be empty.\n");
return CM_ERROR;
}
if (valuePtr[0] != '/') {
write_runlog(ERROR, "The trigger script path must be absolute path.\n");
return CM_ERROR;
}
const char *extention = ".sh";
const size_t shExtLen = strlen(extention);
size_t pathLen = strlen(valuePtr);
if (pathLen < shExtLen ||
strncmp((valuePtr + (pathLen - shExtLen)), extention, shExtLen) != 0) {
write_runlog(ERROR, "The trigger value %s is not shell script.\n", valuePtr);
return CM_ERROR;
}
if (access(valuePtr, F_OK) != 0) {
write_runlog(ERROR, "The trigger script %s is not a file or does not exist.\n", valuePtr);
return CM_ERROR;
}
if (access(valuePtr, R_OK | X_OK) != 0) {
write_runlog(ERROR, "Current user has no permission to access the "
"trigger script %s.\n", valuePtr);
return CM_ERROR;
}
return CM_SUCCESS;
}
* event_triggers sample:
* {
* "on_start": "/dir/on_start.sh",
* "on_stop": "/dir/on_stop.sh",
* "on_failover": "/dir/on_failover.sh",
* "on_switchover": "/dir/on_switchover.sh"
* }
*/
static void ParseEventTriggers(const char *value)
{
if (value == NULL || value[0] == 0) {
write_runlog(WARNING, "The value of event_triggers is empty.\n");
return;
}
if (strlen(value) > MAX_PATH_LEN) {
write_runlog(ERROR, "The string value \"%s\" is longer than 1024.\n", value);
return;
}
cJSON *root = NULL;
root = cJSON_Parse(value);
if (!root) {
write_runlog(ERROR, "The value of event_triggers is not a json.\n");
return;
}
if (cJSON_IsArray(root)) {
write_runlog(ERROR, "The value of event_triggers can't be a json item array.\n");
cJSON_Delete(root);
return;
}
int triggerNums[EVENT_COUNT] = {0};
cJSON *item = root->child;
* so a temporary backup is needed, to avoid partial modifications
*/
char *eventTriggers[EVENT_COUNT] = {NULL};
bool isValueInvalid = false;
while (item != NULL) {
if (CheckEventTriggersItem(item) == CM_ERROR) {
isValueInvalid = true;
break;
}
char *typeStr = item->string;
EventTriggerType type = GetTriggerTypeFromStr(typeStr);
if (type == EVENT_UNKNOWN) {
write_runlog(ERROR, "The trigger type %s does support.\n", typeStr);
isValueInvalid = true;
break;
}
char *valuePtr = item->valuestring;
++triggerNums[type];
if (triggerNums[type] > 1) {
write_runlog(ERROR, "Duplicated trigger %s are supported.\n", typeStr);
isValueInvalid = true;
break;
}
eventTriggers[type] = (char*)CmMalloc(strlen(valuePtr) + 1);
int ret = snprintf_s(eventTriggers[type], MAX_PATH_LEN,
MAX_PATH_LEN - 1, "%s", valuePtr);
securec_check_intval(ret, (void)ret);
item = item->next;
}
if (isValueInvalid) {
for (int i = 0; i < EVENT_COUNT; ++i) {
if (eventTriggers[i] != NULL) {
FREE_AND_RESET(eventTriggers[i]);
}
}
cJSON_Delete(root);
return;
}
for (int i = 0; i < EVENT_COUNT; ++i) {
if (eventTriggers[i] == NULL) {
if (g_eventTriggers[i] != NULL) {
FREE_AND_RESET(g_eventTriggers[i]);
}
} else {
FREE_AND_RESET(g_eventTriggers[i]);
g_eventTriggers[i] = (char*)CmMalloc(strlen(eventTriggers[i]) + 1);
int ret = snprintf_s(g_eventTriggers[i], MAX_PATH_LEN,
MAX_PATH_LEN - 1, "%s", eventTriggers[i]);
securec_check_intval(ret, (void)ret);
FREE_AND_RESET(eventTriggers[i]);
write_runlog(LOG, "Event trigger %s was added, script path = %s.\n",
triggerTypeStringMap[i].typeStr, g_eventTriggers[i]);
}
}
cJSON_Delete(root);
}
void GetEventTrigger()
{
char eventTriggerString[MAX_PATH_LEN] = {0};
if (get_config_param(configDir, "event_triggers", eventTriggerString, MAX_PATH_LEN) < 0) {
write_runlog(ERROR, "get_config_param() get event_triggers fail.\n");
return;
}
ParseEventTriggers(eventTriggerString);
}
void ExecuteEventTrigger(const EventTriggerType triggerType, int32 staPrimId)
{
if (g_eventTriggers[triggerType] == NULL) {
return;
}
write_runlog(LOG, "Event trigger %s was triggered.\n", triggerTypeStringMap[triggerType].typeStr);
char execTriggerCmd[MAX_COMMAND_LEN] = {0};
int rc;
if (staPrimId != INVALID_ID && triggerType == EVENT_FAILOVER) {
rc = snprintf_s(execTriggerCmd, MAX_COMMAND_LEN, MAX_COMMAND_LEN - 1,
SYSTEMQUOTE "%s %d >> %s 2>&1 &" SYSTEMQUOTE, g_eventTriggers[triggerType], staPrimId, system_call_log);
} else {
rc = snprintf_s(execTriggerCmd, MAX_COMMAND_LEN, MAX_COMMAND_LEN - 1,
SYSTEMQUOTE "%s >> %s 2>&1 &" SYSTEMQUOTE, g_eventTriggers[triggerType], system_call_log);
}
securec_check_intval(rc, (void)rc);
write_runlog(LOG, "event trigger command: \"%s\".\n", execTriggerCmd);
RunCmd(execTriggerCmd);
}