* Copyright (c) 2021 Huawei Technologies Co.,Ltd.
*
* CM is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* cma_common.cpp
* cma common functions
*
* IDENTIFICATION
* src/cm_agent/cma_common.cpp
*
* -------------------------------------------------------------------------
*/
#include <sys/stat.h>
#include <sys/vfs.h>
#include <sys/wait.h>
#include <string>
#include <algorithm>
#include "cm/cm_c.h"
#include "cm/cm_elog.h"
#include "cm/pqsignal.h"
#include "cm/libpq-fe.h"
#include "cm/libpq-int.h"
#include "cma_global_params.h"
#include "cma_connect.h"
#include "cma_network_check.h"
#include "cma_instance_management.h"
#include "cma_instance_management_res.h"
#include "cma_common.h"
#include "cma_disk_check.h"
static const int DISK_USAGE_DEFAULT_THRESHOLD = 90;
void save_thread_id(pthread_t thrId)
{
int length = (int)(sizeof(g_threadId) / sizeof(g_threadId[0]));
int32 i;
for (i = 0; i < length; i++) {
if (g_threadId[i] == 0) {
g_threadId[i] = thrId;
break;
}
}
if (i == length) {
write_runlog(WARNING, "the num(%d) of array for thread state is not enough.\n", length);
}
}
void set_thread_state(pthread_t thrId)
{
if (thrId == 0) {
return;
}
int length = (int)(sizeof(g_threadId) / sizeof(g_threadId[0]));
for (int32 i = 0; i < length; i++) {
if (g_threadId[i] == thrId) {
struct timespec now = {0};
(void)clock_gettime(CLOCK_MONOTONIC, &now);
g_thread_state[i] = now.tv_sec;
g_threadName[i] = thread_name;
break;
}
}
}
int cmagent_getenv(const char *env_var, char *output_env_value, uint32 env_value_len)
{
return cm_getenv(env_var, output_env_value, env_value_len);
}
const char *type_int_to_str_name(InstanceTypes ins_type)
{
switch (ins_type) {
case INSTANCE_CN:
return "coordinator";
case INSTANCE_DN:
return "datanode";
case INSTANCE_GTM:
return "gtm";
case INSTANCE_CM:
return "cm_server";
case INSTANCE_FENCED:
return "fenced";
case INSTANCE_KERBEROS:
return "krb5kdc";
default:
return "unknown";
}
}
const char *GetDnProcessName(void)
{
return g_clusterType == V3SingleInstCluster ? ZENGINE_BIN_NAME : DATANODE_BIN_NAME;
}
const char *type_int_to_str_binname(InstanceTypes ins_type)
{
switch (ins_type) {
case INSTANCE_CN:
case INSTANCE_FENCED:
return COORDINATE_BIN_NAME;
case INSTANCE_DN:
return GetDnProcessName();
case INSTANCE_GTM:
return GTM_BIN_NAME;
case INSTANCE_CM:
return CM_SERVER_BIN_NAME;
case INSTANCE_KERBEROS:
return KERBEROS_BIN_NAME;
default:
return "unknown";
}
}
static void SigAlarmHandler(int arg)
{
;
}
static int GetChildCmdResult(int status)
{
if (WIFEXITED(status)) {
return WEXITSTATUS(status);
} else if (WIFSIGNALED(status) && WTERMSIG(status)) {
return CM_EXECUTE_CMD_TIME_OUT;
} else {
return -1;
}
}
int ExecuteCmd(const char *command, struct timeval timeout)
{
#ifndef WIN32
pid_t pid = -1;
pid_t child = 0;
struct sigaction ign = {};
struct sigaction intact = {};
struct sigaction quitact = {};
sigset_t newsigblock, oldsigblock;
struct itimerval write_timeout;
errno_t rc;
int childStatus = 0;
rc = memset_s(&ign, sizeof(struct sigaction), 0, sizeof(struct sigaction));
securec_check_errno(rc, (void)rc);
if (command == NULL) {
write_runlog(ERROR, "ExecuteCmd invalid command.\n");
return 1;
}
* Ignore SIGINT and SIGQUIT, block SIGCHLD. Remember to save existing
* signal dispositions.
*/
ign.sa_handler = SIG_IGN;
(void)sigemptyset(&ign.sa_mask);
ign.sa_flags = 0;
(void)sigaction(SIGINT, &ign, &intact);
(void)sigaction(SIGQUIT, &ign, &quitact);
(void)sigemptyset(&newsigblock);
(void)sigaddset(&newsigblock, SIGCHLD);
(void)sigprocmask(SIG_BLOCK, &newsigblock, &oldsigblock);
switch (pid = fork()) {
case -1:
break;
case 0:
write_timeout.it_value.tv_sec = timeout.tv_sec;
write_timeout.it_value.tv_usec = timeout.tv_usec;
write_timeout.it_interval.tv_sec = 0;
write_timeout.it_interval.tv_usec = 0;
(void)setitimer(ITIMER_REAL, &write_timeout, NULL);
(void)signal(SIGALRM, SigAlarmHandler);
* Restore original signal dispositions and exec the command.
*/
(void)sigaction(SIGINT, &intact, NULL);
(void)sigaction(SIGQUIT, &quitact, NULL);
(void)sigprocmask(SIG_SETMASK, &oldsigblock, NULL);
(void)execl("/bin/sh", "sh", "-c", command, (char *)0);
(void)signal(SIGALRM, SIG_IGN);
_exit(127);
default:
child = pid;
write_runlog(DEBUG1, "ExecuteCmd: %s, pid:%d. start!\n", command, pid);
if (pid != waitpid(pid, &childStatus, 0)) {
(void)kill(child, SIGKILL);
pid = -1;
(void)wait(NULL);
write_runlog(ERROR, "ExecuteCmd: %s, child:%d will reset. end!\n", command, child);
}
write_runlog(DEBUG1, "ExecuteCmd: %s, pid:%d, childStatus %d end!\n", command, pid, childStatus);
(void)signal(SIGALRM, SIG_IGN);
break;
}
(void)sigaction(SIGINT, &intact, NULL);
(void)sigaction(SIGQUIT, &quitact, NULL);
(void)sigprocmask(SIG_SETMASK, &oldsigblock, NULL);
if (pid == -1) {
write_runlog(ERROR, "ExecuteCmd: %s, failed errno:%d.\n", command, errno);
}
return ((pid == -1) ? -1 : GetChildCmdResult(childStatus));
#else
return -1;
#endif
}
* Read parameter from cm_agent.conf by accurate parameter name.
*/
int get_config_param(const char *config_file, const char *srcParam, char *destParam, int destLen)
{
char buf[MAXPGPATH] = {'\0'};
char *subStr = NULL;
char *saveptr1 = NULL;
errno_t rc;
if (config_file == NULL || srcParam == NULL || destParam == NULL) {
write_runlog(
ERROR, "Get parameter failed,confDir=%s,srcParam = %s, destParam=%s.\n", config_file, srcParam, destParam);
return -1;
}
FILE *fd = fopen(config_file, "re");
if (fd == NULL) {
write_runlog(FATAL, "Open configure file failed \n");
exit(1);
}
while (!feof(fd)) {
(void)fgets(buf, MAXPGPATH, fd);
buf[MAXPGPATH - 1] = 0;
if (is_comment_line(buf) == 1) {
continue;
}
subStr = strstr(buf, srcParam);
if (subStr == NULL) {
continue;
}
subStr = strtok_r(buf, "=", &saveptr1);
if (subStr == NULL) {
continue;
}
if (strcmp(trim(subStr), srcParam) == 0) {
break;
}
}
if (saveptr1 != NULL) {
subStr = trim(saveptr1);
subStr = strtok_r(subStr, "#", &saveptr1);
subStr = strtok_r(subStr, "\n", &saveptr1);
subStr = strtok_r(subStr, "\r", &saveptr1);
char *trimStr = trim(subStr);
if (trimStr != NULL && strcmp(trimStr, "''") == 0) {
trimStr = "";
}
if (trimStr == NULL || strlen(trimStr) + 1 > (size_t)destLen) {
#ifdef ENABLE_UT
(void)fclose(fd);
return -1;
#endif
write_runlog(FATAL, "The value of parameter %s is invalid.\n", srcParam);
exit(1);
}
rc = memcpy_s(destParam, strlen(trimStr) + 1, trimStr, strlen(trimStr) + 1);
securec_check_errno(rc, (void)rc);
}
(void)fclose(fd);
return 0;
}
void get_connection_mode(char *config_file)
{
const char *srcStr = "enable_xc_maintenance_mode";
char dstStr[10] = {'\0'};
if (get_config_param(config_file, srcStr, dstStr, sizeof(dstStr)) < 0) {
write_runlog(ERROR, "get_config_param() get enable_xc_maintenance_mode fail.\n");
return;
}
if (strlen(dstStr) != 0) {
if (IsBoolCmParamTrue(dstStr)) {
enable_xc_maintenance_mode = true;
} else if (IsBoolCmParamFalse(dstStr)) {
enable_xc_maintenance_mode = false;
} else {
enable_xc_maintenance_mode = true;
write_runlog(ERROR, "invalid value for parameter \"enable_xc_maintenance_mode\" in %s.\n", config_file);
}
} else {
enable_xc_maintenance_mode = true;
write_runlog(ERROR, "Get config parameter \"enable_xc_maintenance_mode\" value failed!\n");
}
return;
}
void get_start_mode(char *config_file)
{
const char *srcStr = "security_mode";
char dstStr[10] = {'\0'};
if (get_config_param(config_file, srcStr, dstStr, sizeof(dstStr)) < 0) {
write_runlog(ERROR, "get_config_param() get security_mode fail.\n");
return;
}
if (strlen(dstStr) != 0) {
if (IsBoolCmParamTrue(dstStr)) {
security_mode = true;
} else if (IsBoolCmParamFalse(dstStr)) {
security_mode = false;
} else {
security_mode = true;
write_runlog(FATAL, "invalid value for parameter \"security_mode\" in %s.\n", config_file);
}
} else {
security_mode = true;
write_runlog(FATAL, "Get config parameter \"security_mode\" value failed!\n");
}
return;
}
void get_build_mode(char* config_file)
{
const char *srcStr = "incremental_build";
char dstStr[10] = {'\0'};
if (get_config_param(config_file, srcStr, dstStr, sizeof(dstStr)) < 0) {
write_runlog(ERROR, "get_config_param() get incremental_build fail.\n");
return;
}
if (strlen(dstStr) != 0) {
if (IsBoolCmParamTrue(dstStr)) {
incremental_build = true;
} else if (IsBoolCmParamFalse(dstStr)) {
incremental_build = false;
} else {
incremental_build = true;
write_runlog(FATAL, "invalid value for parameter \"incremental_build\" in %s.\n", config_file);
}
} else {
incremental_build = true;
write_runlog(FATAL, "Get config parameter \"incremental_build\" value failed!\n");
}
return;
}
void ReloadParametersFromConfig()
{
write_runlog(LOG, "reload cm_agent parameters from config file.\n");
GetAlarmConfig(g_alarmConfigDir);
get_log_paramter(configDir);
get_build_mode(configDir);
get_start_mode(configDir);
GetStringFromConf(configDir, g_environmentThreshold, sizeof(g_environmentThreshold), "environment_threshold");
agent_report_interval = (uint32)(get_int_value_from_config(configDir, "agent_report_interval", 1));
agent_heartbeat_timeout = (uint32)(get_int_value_from_config(configDir, "agent_heartbeat_timeout", 8));
agent_connect_timeout = (uint32)(get_int_value_from_config(configDir, "agent_connect_timeout", 1));
agent_connect_retries = (uint32)(get_int_value_from_config(configDir, "agent_connect_retries", 15));
agent_check_interval = (uint32)(get_int_value_from_config(configDir, "agent_check_interval", 2));
g_diskUsageThreshold =
(uint32)(get_int_value_from_config(configDir, "diskusage_threshold_value_check", DISK_USAGE_DEFAULT_THRESHOLD));
agent_kill_instance_timeout = (uint32)get_int_value_from_config(configDir, "agent_kill_instance_timeout", 0);
agent_phony_dead_check_interval =
(uint32)get_int_value_from_config(configDir, "agent_phony_dead_check_interval", 10);
enable_gtm_phony_dead_check = (uint32)get_int_value_from_config(configDir, "enable_gtm_phony_dead_check", 1);
g_disasterRecoveryType =
(DisasterRecoveryType)get_uint32_value_from_config(configDir, "disaster_recovery_type", DISASTER_RECOVERY_NULL);
g_enableE2ERto = (uint32)get_int_value_from_config(configDir, "enable_e2e_rto", 0);
if (g_enableE2ERto == 1) {
agent_phony_dead_check_interval = 1;
}
g_ssDoubleClusterMode =
(SSDoubleClusterMode)get_uint32_value_from_config(configDir, "ss_double_cluster_mode", SS_DOUBLE_NULL);
if (agent_backup_open == CLUSTER_OBS_STANDBY) {
agent_backup_open = (ClusterRole)get_uint32_value_from_config(configDir, "agent_backup_open", CLUSTER_PRIMARY);
}
log_threshold_check_interval =
get_uint32_value_from_config(configDir, "log_threshold_check_interval", log_threshold_check_interval);
undocumentedVersion = get_uint32_value_from_config(configDir, "upgrade_from", 0);
dilatation_shard_count_for_disk_capacity_alarm = get_uint32_value_from_config(
configDir, "dilatation_shard_count_for_disk_capacity_alarm", dilatation_shard_count_for_disk_capacity_alarm);
g_diskTimeout = get_uint32_value_from_config(configDir, "disk_timeout", 200);
GetEventTrigger();
LoadDiskCheckConfig(configDir);
}
void ReloadParametersFromConfigfile()
{
ReloadParametersFromConfig();
if (get_config_param(configDir, "enable_cn_auto_repair", g_enableCnAutoRepair, sizeof(g_enableCnAutoRepair)) < 0) {
write_runlog(ERROR, "get_config_param() get enable_cn_auto_repair fail.\n");
}
if (get_config_param(configDir, "enable_log_compress", g_enableLogCompress, sizeof(g_enableLogCompress)) < 0) {
write_runlog(ERROR, "get_config_param() get enable_log_compress fail.\n");
}
if (get_config_param(configDir, "enable_vtable", g_enableVtable, sizeof(g_enableVtable)) < 0) {
write_runlog(ERROR, "get_config_param() get enable_vtable fail.\n");
}
if (get_config_param(configDir, "security_mode", g_enableOnlineOrOffline, sizeof(g_enableOnlineOrOffline)) < 0) {
write_runlog(ERROR, "get_config_param() get security_mode fail.\n");
}
if (get_config_param(configDir, "incremental_build", g_enableIncrementalBuild, sizeof(g_enableIncrementalBuild)) < 0) {
write_runlog(ERROR, "get_config_param() get incremental_build fail.\n");
}
if (get_config_param(configDir, "unix_socket_directory", g_unixSocketDirectory, sizeof(g_unixSocketDirectory)) <
0) {
write_runlog(ERROR, "get_config_param() get unix_socket_directory fail.\n");
} else {
check_input_for_security(g_unixSocketDirectory);
}
if (get_config_param(configDir, "db_service_vip", g_dbServiceVip, sizeof(g_dbServiceVip)) < 0) {
write_runlog(ERROR, "get_config_param() get db_service_vip fail.\n");
}
log_max_size = get_int_value_from_config(configDir, "log_max_size", 10240);
log_saved_days = (uint32)get_int_value_from_config(configDir, "log_saved_days", 90);
log_max_count = (uint32)get_int_value_from_config(configDir, "log_max_count", 10000);
#ifndef ENABLE_MULTIPLE_NODES
if (get_config_param(configDir, "enable_fence_dn", g_enableFenceDn, sizeof(g_enableFenceDn)) < 0) {
write_runlog(ERROR, "get_config_param() get enable_fence_dn fail.\n");
}
#endif
write_runlog(LOG,
"reload cm_agent parameters:\n"
" log_min_messages=%d, maxLogFileSize=%d, sys_log_path=%s, \n alarm_component=%s, "
"alarm_report_interval=%d, dilatation_shard_count_for_disk_capacity_alarm:%u, \n"
" agent_heartbeat_timeout=%u, agent_report_interval=%u, agent_connect_timeout=%u, \n"
" agent_connect_retries=%u, agent_check_interval=%u, diskusage_threshold_value_check=%u, \n"
"agent_kill_instance_timeout=%u,"
" agent_phony_dead_check_interval=%u, enable_gtm_phony_dead_check=%u, disk_timeout=%u, \n"
" log_threshold_check_interval=%u, log_max_size=%ld, log_max_count=%u, log_saved_days=%u, "
"upgrade_from=%u,\n enable_cn_auto_repair=%s, enable_log_compress=%s, security_mode=%d,\n"
" incremental_build=%d, unix_socket_directory=%s, disk_check_timeout=%u, disk_check_interval=%u, \n"
"disk_check_buffer_size=%u, enable_xalarmd_slow_disk_check=%d.\n"
#ifndef ENABLE_MULTIPLE_NODES
"enable_e2e_rto=%u, disaster_recovery_type=%d, environment_threshold=%s,\n"
" db_service_vip=%s, enable_fence_dn=%s, ss_double_cluster_mode=%d, agent_backup_open=%d\n",
#else
"enable_e2e_rto=%u, disaster_recovery_type=%d, environment_threshold=%s\n",
#endif
log_min_messages,
maxLogFileSize,
sys_log_path,
g_alarmComponentPath,
g_alarmReportInterval,
dilatation_shard_count_for_disk_capacity_alarm,
agent_heartbeat_timeout,
agent_report_interval,
agent_connect_timeout,
agent_connect_retries,
agent_check_interval,
g_diskUsageThreshold,
agent_kill_instance_timeout,
agent_phony_dead_check_interval,
enable_gtm_phony_dead_check,
g_diskTimeout,
log_threshold_check_interval,
log_max_size,
log_max_count,
log_saved_days,
undocumentedVersion,
g_enableCnAutoRepair,
g_enableLogCompress,
security_mode,
incremental_build,
g_unixSocketDirectory,
GetDiskCheckTimeout(),
GetDiskCheckInterval(),
GetDiskCheckBufferSize(),
g_enableXalarmdFeature,
#ifndef ENABLE_MULTIPLE_NODES
g_enableE2ERto,
g_disasterRecoveryType,
g_environmentThreshold,
g_dbServiceVip,
g_enableFenceDn,
g_ssDoubleClusterMode,
agent_backup_open);
#else
g_environmentThreshold);
#endif
}
int ReadDBStateFile(GaussState *state, const char *statePath)
{
if (state == NULL) {
write_runlog(LOG, "Could not get information from gaussdb.state\n");
return -1;
}
FILE *statef = fopen(statePath, "re");
if (statef == NULL) {
if (errno == ENOENT) {
char errBuffer[ERROR_LIMIT_LEN];
write_runlog(LOG,
"gaussdb state file \"%s\" is not exist, could not get the build infomation: %s\n",
statePath,
strerror_r(errno, errBuffer, ERROR_LIMIT_LEN));
} else {
char errBuffer[ERROR_LIMIT_LEN];
write_runlog(LOG,
"open gaussdb state file \"%s\" failed, could not get the build infomation: %s\n",
statePath,
strerror_r(errno, errBuffer, ERROR_LIMIT_LEN));
}
return -1;
}
if ((fread(state, 1, sizeof(GaussState), statef)) == 0) {
write_runlog(LOG, "get gaussdb state infomation from the file \"%s\" failed\n", statePath);
(void)fclose(statef);
return -1;
}
write_runlog(DEBUG1, "gaussdb state file: state:%d, mode:%d.\n", state->state, state->mode);
(void)fclose(statef);
return 0;
}
void UpdateDBStateFile(const char *path, const GaussState *state)
{
int ret;
char tempPath[CM_PATH_LENGTH] = {0};
if (state == NULL || path == NULL) {
return;
}
ret = snprintf_s(tempPath, CM_PATH_LENGTH, CM_PATH_LENGTH - 1, "%s.temp", path);
securec_check_intval(ret, (void)ret);
FILE *fp = fopen(tempPath, "w");
if (fp == NULL) {
write_runlog(ERROR, "open file \"%s\" failed.\n", tempPath);
return;
}
if (chmod(tempPath, S_IRUSR | S_IWUSR) == -1) {
write_runlog(ERROR, "chmod file \"%s\" failed.\n", tempPath);
(void)fclose(fp);
return;
}
if (fwrite(state, 1, sizeof(GaussState), fp) == 0) {
write_runlog(ERROR, "write file \"%s\" failed.\n", tempPath);
(void)fclose(fp);
return;
}
(void)fclose(fp);
(void)rename(tempPath, path);
return;
}
pgpid_t get_pgpid(char *pid_path, uint32 len)
{
if (len == 0) {
write_runlog(ERROR, "pidPath(%s) len is 0.\n", pid_path);
return 0;
}
long pid;
canonicalize_path(pid_path);
FILE *pidf = fopen(pid_path, "re");
if (pidf == NULL) {
write_runlog(DEBUG5, "could not open PID file \"%s\"\n", pid_path);
return 0;
}
if (fscanf_s(pidf, "%ld", &pid) != 1) {
write_runlog(ERROR, "invalid data in PID file \"%s\"\n", pid_path);
(void)fclose(pidf);
return -1;
}
(void)fclose(pidf);
return (pgpid_t)pid;
}
bool is_process_alive(pgpid_t pid)
{
if (pid == getpid() || pid == getppid()) {
return false;
}
if (kill((int)pid, 0) == 0) {
return true;
}
return false;
}
int check_disc_state(uint32 instanceId)
{
struct timespec now = {0};
(void)clock_gettime(CLOCK_MONOTONIC, &now);
if ((now.tv_sec - g_check_disc_state > (long)g_threadDeadEffectiveTime) && g_check_disc_state != 0 &&
instanceId == g_checkDiscInstanceNow) {
write_runlog(LOG, "check disc phony dead for instance(%u) is invalid.\n", g_checkDiscInstanceNow);
return -1;
}
if ((now.tv_sec - g_check_disc_state > 120) && g_check_disc_state != 0) {
write_runlog(LOG,
"the instance check disc take %ld second, instance is %u.\n",
now.tv_sec - g_check_disc_state,
g_checkDiscInstanceNow);
}
return 0;
}
void set_disc_check_state(uint32 instanceId, long *check_disc_state, bool update)
{
g_checkDiscInstanceNow = instanceId;
struct timeval now = {0};
(void)gettimeofday(&now, NULL);
if (instanceId == 0) {
if (now.tv_sec - (*check_disc_state) > CM_DISK_TIMEOUT) {
write_runlog(LOG, "the instance check disc take %ld second.\n", now.tv_sec - (*check_disc_state));
}
if (update == true) {
g_check_disc_state = 0;
}
} else {
*check_disc_state = now.tv_sec;
if (update == true) {
g_check_disc_state = now.tv_sec;
}
}
}
bool agentCheckDisc(const char *path)
{
char write_test_file[MAXPGPATH] = {0};
int rc;
char buf[2] = {0};
Assert(path != NULL);
rc = snprintf_s(
write_test_file, sizeof(write_test_file), sizeof(write_test_file) - 1, "%s/disc_readonly_test", path);
securec_check_intval(rc, (void)rc);
check_input_for_security(write_test_file);
canonicalize_path(write_test_file);
errno = 0;
FILE *fd = fopen(write_test_file, "we");
if (fd != NULL) {
errno = 0;
if (fwrite("1", sizeof("1"), 1, fd) != 1) {
if (errno == EROFS || errno == EIO) {
(void)fclose(fd);
write_runlog(LOG, "could not write disc test file, ERRNO : %d\n", errno);
(void)remove(write_test_file);
return false;
}
}
(void)fclose(fd);
fd = fopen(write_test_file, "re");
if (fd != NULL) {
if (fread(buf, sizeof("1"), 1, fd) != 1) {
if (errno == ENOSPC) {
(void)fclose(fd);
write_runlog(LOG, "could not read disc test file, ERRNO : %d\n", errno);
(void)remove(write_test_file);
return false;
}
}
(void)fclose(fd);
}
(void)remove(write_test_file);
return true;
} else {
int save_errno = errno;
write_runlog(LOG, "could not open disc test file, ERRNO : %d\n", save_errno);
if (save_errno == EROFS || save_errno == EACCES || save_errno == ENOENT || save_errno == EIO ||
save_errno == ENOSPC) {
return false;
}
return true;
}
}
void set_instance_not_exist_alarm_value(int *val, int state)
{
if (*val != UNKNOWN_BAD_REASON) {
return;
}
*val = state;
}
* @Description: when cn start, record datanode pid
*
*/
void record_pid(const char *DataPath)
{
int rcs;
int ret;
char command[MAXPGPATH];
const char *dnName = GetDnProcessName();
rcs = snprintf_s(command,
MAXPGPATH,
MAXPGPATH - 1,
"ps -ux | grep %s | grep %s| awk '{print \"start instance pid: \"$2"
", \"data path: \"$14}' >> \"%s\" 2>&1 &",
DataPath,
dnName,
system_call_log);
securec_check_intval(rcs, (void)rcs);
ret = system(command);
if (ret != 0) {
write_runlog(ERROR, "run system command failed %d! %s, errno=%d.\n", ret, command, errno);
} else {
write_runlog(DEBUG1, "run system command success %s \n", command);
}
}
uint32 GetDatanodeNumSort(const staticNodeConfig *p_node_config, uint32 sort)
{
uint32 j;
uint32 cur_dn_num = 0;
for (j = 0; j < p_node_config->datanodeCount; j++) {
if (sort == p_node_config->datanode[j].datanodeRole) {
cur_dn_num++;
}
}
return cur_dn_num;
}
int search_HA_node(uint32 localPort, uint32 LocalHAListenCount, char LocalHAIP[][CM_IP_LENGTH], uint32 peerPort,
uint32 PeerHAListenCount, char PeerHAIP[][CM_IP_LENGTH], uint32 *node_index, uint32 *instance_index,
uint32 loal_role)
{
uint32 i;
uint32 max_node_count;
char input_local_listen_ip[CM_IP_ALL_NUM_LENGTH];
char input_peer_listen_ip[CM_IP_ALL_NUM_LENGTH];
char local_listen_ip[CM_IP_ALL_NUM_LENGTH];
char peer_listen_ip[CM_IP_ALL_NUM_LENGTH];
uint32 j;
errno_t rc;
*node_index = 0;
*instance_index = 0;
max_node_count = g_node_num;
rc = memset_s(input_local_listen_ip, CM_IP_ALL_NUM_LENGTH, 0, CM_IP_ALL_NUM_LENGTH);
securec_check_errno(rc, (void)rc);
rc = memset_s(input_peer_listen_ip, CM_IP_ALL_NUM_LENGTH, 0, CM_IP_ALL_NUM_LENGTH);
securec_check_errno(rc, (void)rc);
listen_ip_merge(LocalHAListenCount, LocalHAIP, input_local_listen_ip, CM_IP_ALL_NUM_LENGTH);
listen_ip_merge(PeerHAListenCount, PeerHAIP, input_peer_listen_ip, CM_IP_ALL_NUM_LENGTH);
for (i = 0; i < max_node_count; i++) {
for (j = 0; j < g_node[i].datanodeCount; j++) {
if (g_multi_az_cluster) {
bool be_continue = true;
uint32 primary_dn_idx = 0;
for (uint32 dnId = 0; dnId < g_dn_replication_num - 1; dnId++) {
be_continue = true;
if (g_node[i].datanode[j].peerDatanodes[dnId].datanodePeerRole == PRIMARY_DN) {
be_continue = false;
primary_dn_idx = dnId;
if ((g_node[i].datanode[j].datanodeLocalHAPort != peerPort) ||
(g_node[i].datanode[j].peerDatanodes[dnId].datanodePeerHAPort != localPort)) {
be_continue = true;
}
break;
}
}
if (be_continue) {
continue;
}
rc = memset_s(local_listen_ip, CM_IP_ALL_NUM_LENGTH, 0, CM_IP_ALL_NUM_LENGTH);
securec_check_errno(rc, (void)rc);
rc = memset_s(peer_listen_ip, CM_IP_ALL_NUM_LENGTH, 0, CM_IP_ALL_NUM_LENGTH);
securec_check_errno(rc, (void)rc);
listen_ip_merge(g_node[i].datanode[j].datanodeLocalHAListenCount,
g_node[i].datanode[j].datanodeLocalHAIP,
local_listen_ip,
CM_IP_ALL_NUM_LENGTH);
if (g_node[i].datanode[j].peerDatanodes[primary_dn_idx].datanodePeerRole == PRIMARY_DN) {
listen_ip_merge(g_node[i].datanode[j].peerDatanodes[primary_dn_idx].datanodePeerHAListenCount,
g_node[i].datanode[j].peerDatanodes[primary_dn_idx].datanodePeerHAIP,
peer_listen_ip,
CM_IP_ALL_NUM_LENGTH);
}
if ((strncmp(local_listen_ip, input_peer_listen_ip, CM_IP_ALL_NUM_LENGTH) == 0) &&
(strncmp(peer_listen_ip, input_local_listen_ip, CM_IP_ALL_NUM_LENGTH) == 0)) {
*node_index = i;
*instance_index = j;
return 0;
}
} else {
if (loal_role == g_node[i].datanode[j].datanodePeerRole) {
if ((g_node[i].datanode[j].datanodeLocalHAPort != peerPort) ||
(g_node[i].datanode[j].datanodePeerHAPort != localPort)) {
continue;
}
} else if (loal_role == g_node[i].datanode[j].datanodePeer2Role) {
if ((g_node[i].datanode[j].datanodeLocalHAPort != peerPort) ||
(g_node[i].datanode[j].datanodePeer2HAPort != localPort)) {
continue;
}
} else {
continue;
}
rc = memset_s(local_listen_ip, CM_IP_ALL_NUM_LENGTH, 0, CM_IP_ALL_NUM_LENGTH);
securec_check_errno(rc, (void)rc);
rc = memset_s(peer_listen_ip, CM_IP_ALL_NUM_LENGTH, 0, CM_IP_ALL_NUM_LENGTH);
securec_check_errno(rc, (void)rc);
listen_ip_merge(g_node[i].datanode[j].datanodeLocalHAListenCount,
g_node[i].datanode[j].datanodeLocalHAIP,
local_listen_ip,
CM_IP_ALL_NUM_LENGTH);
if (loal_role == g_node[i].datanode[j].datanodePeerRole) {
listen_ip_merge(g_node[i].datanode[j].datanodePeerHAListenCount,
g_node[i].datanode[j].datanodePeerHAIP,
peer_listen_ip,
CM_IP_ALL_NUM_LENGTH);
} else if (loal_role == g_node[i].datanode[j].datanodePeer2Role) {
listen_ip_merge(g_node[i].datanode[j].datanodePeer2HAListenCount,
g_node[i].datanode[j].datanodePeer2HAIP,
peer_listen_ip,
CM_IP_ALL_NUM_LENGTH);
}
if ((strncmp(local_listen_ip, input_peer_listen_ip, CM_IP_ALL_NUM_LENGTH) == 0) &&
(strncmp(peer_listen_ip, input_local_listen_ip, CM_IP_ALL_NUM_LENGTH) == 0)) {
*node_index = i;
*instance_index = j;
return 0;
}
}
}
}
return -1;
}
int agentCheckPort(uint32 port)
{
char buf[MAXPGPATH] = {0};
char localip[MAXPGPATH] = {0};
char peerip[MAXPGPATH] = {0};
char other[MAXPGPATH] = {0};
uint32 ls = 0;
uint32 localport = 0;
uint32 peerport = 0;
uint32 status = 0;
int rc = 0;
FILE *fp = fopen(PROC_NET_TCP, "re");
if (fp == NULL) {
write_runlog(ERROR, "can not open file \"%s\"\n", PROC_NET_TCP);
return -1;
}
if (fgets(buf, MAXPGPATH - 1, fp) == NULL) {
write_runlog(ERROR, "can not read file \"%s\"\n", PROC_NET_TCP);
(void)fclose(fp);
return -1;
}
while (fgets(buf, MAXPGPATH - 1, fp) != NULL) {
localport = 0;
status = 0;
rc = sscanf_s(buf,
"%u: %[0-9A-Fa-f]:%X %[0-9A-Fa-f]:%X %X %s",
&ls,
localip,
MAXPGPATH,
&localport,
peerip,
MAXPGPATH,
&peerport,
&status,
other,
MAXPGPATH);
if (rc != 7) {
write_runlog(ERROR, "get value by sscanf_s return error:%d, %s:%d \n", rc, __FUNCTION__, __LINE__);
continue;
}
securec_check_intval(rc, (void)rc);
if (localport == port && status == LISTEN) {
(void)fclose(fp);
write_runlog(WARNING,
"port:%u already in use. /proc/net/tcp:\n%s \n%s",
localport,
" sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode ",
buf);
char command[MAXPGPATH] = {0};
rc = snprintf_s(command,
MAXPGPATH,
MAXPGPATH - 1,
SYSTEMQUOTE "lsof -i:%u | grep -E \'COMMAND|LISTEN\' >> \"%s\" 2>&1 " SYSTEMQUOTE,
localport,
system_call_log);
securec_check_intval(rc, (void)rc);
rc = system(command);
if (rc != 0) {
write_runlog(ERROR, "run system command failed %d! %s, errno=%d.\n", rc, command, errno);
}
write_runlog(LOG, "CheckPort: system(command:%s).\n", command);
return 1;
}
}
(void)fclose(fp);
return 0;
}
* @brief Get the disk usage of a specified directory like the df command.
*
* @param pathName The directory path of the disk usage.
* @return uint32 Get the Data path disk usage.
*/
uint32 GetDiskUsageForPath(const char *pathName)
{
int ret;
uint32 percent = 0;
struct statfs diskInfo = {0};
if (pathName == NULL) {
return percent;
}
ret = statfs(pathName, &diskInfo);
if (ret < 0) {
write_runlog(ERROR, "[%s][line:%d] GetDiskUsageForPath %s disk usage failed! errno:%d err:%s.\n",
__FUNCTION__, __LINE__, pathName, errno, strerror(errno));
return percent;
}
if (diskInfo.f_blocks == 0 || (diskInfo.f_blocks - diskInfo.f_bfree + diskInfo.f_bavail) == 0) {
return percent;
}
* The df command is used to obtain an integer percentage without decimals.
* The forward method is used here, not the rounding method.
* Therefore, the system rounds the value by 1 in the program.
*/
percent = (uint32)((diskInfo.f_blocks - diskInfo.f_bfree) * 100 /
(diskInfo.f_blocks - diskInfo.f_bfree + diskInfo.f_bavail));
return (percent <= 100) ? percent : 100;
}
uint32 CheckDiskForLogPath(void)
{
uint32 percent = 0;
char currentLogPath[MAX_PATH_LEN] = {0};
int isLogPath = cmagent_getenv("GAUSSLOG", currentLogPath, sizeof(currentLogPath));
if (isLogPath != EOK) {
write_runlog(DEBUG1, "[%s][line:%d] CheckDiskForLogPath: GAUSSLOG get fail!\n", __FUNCTION__, __LINE__);
return percent;
}
check_input_for_security(currentLogPath);
percent = GetDiskUsageForPath(currentLogPath);
return percent;
}
int ExecuteSystemCmd(const char *cmd, int32 logLevel, int32 *errCode)
{
int ret = system(cmd);
if (ret == -1) {
const int32 noChildProcessesErrCode = 10;
int32 tmpLevel = (errno == noChildProcessesErrCode) ? logLevel : ERROR;
write_runlog(tmpLevel, "Fail to execute command %s, and errno=%d.\n", cmd, errno);
return ERROR_EXECUTE_CMD;
}
if (WIFEXITED(ret)) {
if (WEXITSTATUS(ret) == 0) {
return SUCCESS_EXECUTE_CMD;
}
if (strstr(cmd, "arping") != NULL && WEXITSTATUS(ret) == 1) {
return SUCCESS_EXECUTE_CMD;
}
}
write_runlog(logLevel, "Fail to execute command %s, script exit code %d.\n", cmd, WEXITSTATUS(ret));
if (errCode != NULL) {
*errCode = WEXITSTATUS(ret);
}
return FAILED_EXECUTE_CMD;
}
void CheckDnNicDown(uint32 index)
{
dataNodeInfo *dnInfo = &(g_currentNode->datanode[index]);
g_nicDown[index] = ((!GetNicStatus(dnInfo->datanodeId, CM_INSTANCE_TYPE_DN)) ||
(!GetNicStatus(dnInfo->datanodeId, CM_INSTANCE_TYPE_DN, NETWORK_TYPE_HA)) ||
(!GetNicStatus(g_currentNode->cmAgentId, CM_INSTANCE_TYPE_CMA)));
if (g_nicDown[index]) {
write_runlog(WARNING, "nic related with datanode(%s) not up.\n", dnInfo->datanodeLocalDataPath);
}
}
bool DnManualStop(uint32 index)
{
struct stat instanceStatBuf = {0};
struct stat clusterStatBuf = {0};
int rcs;
char instanceManualStartPath[MAX_PATH_LEN] = {0};
rcs = snprintf_s(instanceManualStartPath,
MAX_PATH_LEN,
MAX_PATH_LEN - 1,
"%s_%u",
g_cmInstanceManualStartPath,
g_currentNode->datanode[index].datanodeId);
securec_check_intval(rcs, (void)rcs);
if (stat(instanceManualStartPath, &instanceStatBuf) == 0 || stat(g_cmManualStartPath, &clusterStatBuf) == 0) {
return true;
}
return false;
}
bool DirectoryIsDestoryed(const char *path)
{
struct dirent *xlde;
bool ret = true;
DIR *xldir = opendir(path);
if (xldir == NULL) {
return false;
}
while ((xlde = readdir(xldir)) != NULL) {
if (strcmp(xlde->d_name, ".") == 0 || strcmp(xlde->d_name, "..") == 0) {
continue;
}
ret = false;
break;
}
(void)closedir(xldir);
return ret;
}
bool IsDirectoryDestoryed(const char *path)
{
struct dirent *xlde;
bool ret = true;
DIR *xldir = opendir(path);
if (xldir == NULL) {
return false;
}
while ((xlde = readdir(xldir)) != NULL) {
if (strcmp(xlde->d_name, ".") == 0 || strcmp(xlde->d_name, "..") == 0) {
continue;
}
ret = false;
break;
}
(void)closedir(xldir);
return ret;
}
void CheckDnDiskDamage(uint32 index)
{
long check_disc_state = 0;
char instanceName[CM_NODE_NAME] = {0};
int ret = snprintf_s(instanceName, sizeof(instanceName), sizeof(instanceName) - 1,
"%s_%u", "dn", g_currentNode->datanode[index].datanodeId);
securec_check_intval(ret, (void)ret);
AlarmType alarmType = ALM_AT_Resume;
bool dnManualStop = DnManualStop(index);
char diskName[MAX_DEVICE_DIR] = {0};
if (!dnManualStop) {
set_disc_check_state(g_currentNode->datanode[index].datanodeId, &check_disc_state, true);
bool cdt = (IsDirectoryDestoryed(g_currentNode->datanode[index].datanodeLocalDataPath) ||
!agentCheckDisc(g_currentNode->datanode[index].datanodeLocalDataPath) || !agentCheckDisc(g_logBasePath));
if (cdt) {
write_runlog(ERROR,
"data path disc writable test failed, %s.\n",
g_currentNode->datanode[index].datanodeLocalDataPath);
g_dnDiskDamage[index] = true;
alarmType = ALM_AT_Fault;
GetDiskNameByDataPath(g_currentNode->datanode[index].datanodeLocalDataPath, diskName, MAX_DEVICE_DIR);
} else {
g_dnDiskDamage[index] = false;
}
set_disc_check_state(0, &check_disc_state, true);
} else {
g_dnDiskDamage[index] = false;
g_dnBuild[index] = false;
write_runlog(DEBUG1,
"%d, dn(%u) the g_dnBuild[%u] is set to false.\n",
__LINE__,
g_currentNode->datanode[index].datanodeId,
index);
}
ReportDiskDamageAlarm(alarmType, instanceName, index, diskName);
}
bool CheckMaintanceCluster()
{
char execPath[MAX_PATH_LEN] = {0};
char maintanceClusterFlagPath[MAX_PATH_LEN] = {0};
char maintanceInstanceFlagPath[MAX_PATH_LEN] = {0};
char upgradeFlagPath[MAX_PATH_LEN] = {0};
int rc = cmagent_getenv("GAUSSHOME", execPath, sizeof(execPath));
if (rc != EOK) {
write_runlog(ERROR, "Line:%d Get GAUSSHOME failed, please check.\n", __LINE__);
return false;
}
rc = snprintf_s(
maintanceClusterFlagPath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/%s", execPath, "mainstance_cluster_status");
securec_check_intval(rc, (void)rc);
if (access(maintanceClusterFlagPath, F_OK) == 0) {
write_runlog(LOG, "Line:%d Get maintance cluster flag success.\n", __LINE__);
return true;
}
rc = snprintf_s(
maintanceInstanceFlagPath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/bin/%s", execPath, "mainstance_instance");
securec_check_intval(rc, (void)rc);
if (access(maintanceInstanceFlagPath, F_OK) == 0) {
write_runlog(LOG, "Line:%d Get maintance instance flag success.\n", __LINE__);
return true;
}
rc = cmagent_getenv("PGHOST", execPath, sizeof(execPath));
if (rc != EOK) {
write_runlog(ERROR, "Line:%d Get PGHOST failed, please check.\n", __LINE__);
return false;
}
rc = snprintf_s(upgradeFlagPath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/binary_upgrade", execPath);
securec_check_intval(rc, (void)rc);
if (access(upgradeFlagPath, F_OK) == 0) {
write_runlog(LOG, "Line:%d Get upgrade flag success.\n", __LINE__);
return true;
}
return false;
}
void ReportCMAEventAlarm(Alarm* alarmItem, AlarmAdditionalParam* additionalParam)
{
bool isMaintainCluster = CheckMaintanceCluster();
if (!isMaintainCluster) {
AlarmReporter(alarmItem, ALM_AT_Event, additionalParam);
} else {
write_runlog(ERROR, "Line %d:Maintaining cluster:no event alarm is generated, maintancefalg: %d\n",
__LINE__, isMaintainCluster);
}
}
bool CheckStartDN()
{
if (!GetIsSharedStorageMode()) {
return true;
}
if (undocumentedVersion != 0 && undocumentedVersion < DORADO_UPGRADE_VERSION) {
return !IsNodeOfflineFromEtcd(g_nodeId, CM_AGENT);
}
if (g_doradoIp[0] == '\0') {
write_runlog(LOG, "get dorado ip is NULL.\n");
return true;
}
if (strcmp(g_doradoIp, "unknown") == 0) {
write_runlog(ERROR, "get dorado ip failed.\n");
return true;
}
if (strcmp(g_doradoIp, g_currentNode->sshChannel[0]) == 0) {
write_runlog(DEBUG1, "Line:%d Get ignore node(%s) successfully.\n", __LINE__, g_currentNode->sshChannel[0]);
return false;
}
return true;
}
static void RefreshQueryBarrier(const cm_to_agent_barrier_info *barrierRespMsg)
{
errno_t rc = 0;
if (barrierRespMsg->queryBarrier[0] == '\0') {
write_runlog(LOG, "[RefreshQueryBarrier] queryBarrier is null.\n");
return;
}
write_runlog(LOG, "[RefreshQueryBarrier] queryBarrier is %s.\n", barrierRespMsg->queryBarrier);
if (g_agentQueryBarrier[0] == '\0' ||
strncmp(g_agentQueryBarrier, barrierRespMsg->queryBarrier, BARRIERLEN - 1) < 0) {
rc = memcpy_s(g_agentQueryBarrier, BARRIERLEN - 1, barrierRespMsg->queryBarrier, BARRIERLEN - 1);
securec_check_errno(rc, (void)rc);
write_runlog(LOG, "[RefreshQueryBarrier]querybarrier info refresh, querybarrier: %s.\n", g_agentQueryBarrier);
} else {
write_runlog(LOG, "[RefreshQueryBarrier] the new barrier(%s) is not bigger than the old(%s).\n",
barrierRespMsg->queryBarrier, g_agentQueryBarrier);
return;
}
}
static void RefreshTargetBarrier(const cm_to_agent_barrier_info *barrierRespMsg)
{
errno_t rc = 0;
if (barrierRespMsg->targetBarrier[0] == '\0' ||
strncmp(barrierRespMsg->targetBarrier, barrierRespMsg->queryBarrier, BARRIERLEN - 1) > 0) {
write_runlog(LOG, "[RefreshTargetBarrier] targetBarrier is invalid, targetBarrier is %s.\n",
barrierRespMsg->targetBarrier);
return;
}
if (g_agentTargetBarrier[0] == '\0' ||
strncmp(g_agentTargetBarrier, barrierRespMsg->targetBarrier, BARRIERLEN - 1) < 0) {
rc = memcpy_s(g_agentTargetBarrier, BARRIERLEN - 1, barrierRespMsg->targetBarrier, BARRIERLEN - 1);
securec_check_errno(rc, (void)rc);
write_runlog(LOG, "[RefreshTargetBarrier]targetbarrier info refresh, targetbarrier: %s.\n",
g_agentTargetBarrier);
} else {
write_runlog(LOG, "[RefreshTargetBarrier] the new barrier(%s) is not bigger than the old(%s).\n",
barrierRespMsg->targetBarrier, g_agentTargetBarrier);
return;
}
}
int ProcessDnBarrierInfoResp(const cm_to_agent_barrier_info *barrierRespMsg)
{
RefreshQueryBarrier(barrierRespMsg);
RefreshTargetBarrier(barrierRespMsg);
return 0;
}
#if ((defined(ENABLE_MULTIPLE_NODES)) || (defined(ENABLE_PRIVATEGAUSS)))
static int GetDnInstanceIdStr(const CmToAgentGsGucSyncList *msgTypeDoGsGuc, char *ids, size_t idsLen)
{
errno_t rc = 0;
uint32 instanceId = msgTypeDoGsGuc->instanceId;
const DatanodeSyncList *expectSyncList = &(msgTypeDoGsGuc->dnSyncList);
bool flag = false;
int index = 0;
size_t len = 0;
for (; index < expectSyncList->count; ++index) {
if (expectSyncList->dnSyncList[index] == instanceId) {
continue;
}
if (!flag) {
rc = snprintf_s(ids, idsLen, idsLen - 1, "dn_%u", expectSyncList->dnSyncList[index]);
securec_check_intval(rc, (void)rc);
flag = true;
} else {
len = strlen(ids);
if (len >= (idsLen - 1)) {
return index;
}
rc = snprintf_s(ids + len, (idsLen - len), ((idsLen - len) - 1),
", dn_%u", expectSyncList->dnSyncList[index]);
securec_check_intval(rc, (void)rc);
}
}
return index;
}
static void GetSyncListString(const DatanodeSyncList *syncList, char *syncListString, size_t maxLen)
{
errno_t rc = 0;
size_t strLen = 0;
if (syncList->count <= 0) {
rc = strcpy_s(syncListString, maxLen, "sync list is empty");
securec_check_errno(rc, (void)rc);
return;
}
if (maxLen <= 1) {
write_runlog(ERROR, "maxLen is 1 or 0.\n");
return;
}
for (int index = 0; index < syncList->count; ++index) {
strLen = strlen(syncListString);
if (strLen >= (maxLen - 1)) {
return;
}
if (index == syncList->count - 1) {
rc = snprintf_s(
syncListString + strLen, maxLen - strLen, (maxLen - strLen) - 1, "%u", syncList->dnSyncList[index]);
} else {
rc = snprintf_s(
syncListString + strLen, maxLen - strLen, (maxLen - strLen) - 1, "%u, ", syncList->dnSyncList[index]);
}
securec_check_intval(rc, (void)rc);
}
}
static void SetCmDoWrite(uint32 idx)
{
(void)pthread_rwlock_wrlock(&(g_cmDoWriteOper[idx].lock));
g_cmDoWriteOper[idx].doWrite = true;
(void)pthread_rwlock_unlock(&(g_cmDoWriteOper[idx].lock));
write_runlog(LOG, "receive modifing syncList from cms, and need to do write oper.\n");
}
int ProcessGsGucDnCommand(const CmToAgentGsGucSyncList *msgTypeDoGsGuc)
{
char gsGucCommand[MAXPGPATH] = {0};
char dnInstanceIds[MAXPGPATH] = {0};
char syncListStr[MAXPGPATH] = {0};
int syncStandbyNum = GetDnInstanceIdStr(msgTypeDoGsGuc, dnInstanceIds, sizeof(dnInstanceIds));
if (syncStandbyNum == 0) {
write_runlog(ERROR, "syncStandbyNum is 0.\n");
return -1;
}
GetSyncListString(&(msgTypeDoGsGuc->dnSyncList), syncListStr, sizeof(syncListStr));
write_runlog(LOG, "dnSyncList is [%s], syncStandbyNum is %d, dnInstanceIds is %s.\n",
syncListStr, syncStandbyNum, dnInstanceIds);
syncStandbyNum = syncStandbyNum / 2;
uint32 dnIndex;
bool result = true;
for (dnIndex = 0; dnIndex < g_currentNode->datanodeCount; dnIndex++) {
if (g_currentNode->datanode[dnIndex].datanodeId == msgTypeDoGsGuc->instanceId) {
break;
}
}
const char *dnSyncMode = "ANY";
errno_t rc = snprintf_s(gsGucCommand, MAXPGPATH, MAXPGPATH - 1,
"gs_guc reload -Z datanode -D %s -c \"synchronous_standby_names = '%s NODE %d(%s)'\" >> %s 2>&1 ",
g_currentNode->datanode[dnIndex].datanodeLocalDataPath, dnSyncMode, syncStandbyNum, dnInstanceIds,
system_call_log);
securec_check_intval(rc, (void)rc);
write_runlog(LOG, "gsGucCommand is %s.\n", gsGucCommand);
rc = system(gsGucCommand);
if (rc != 0) {
write_runlog(ERROR, "Execute %s failed: , errno=%d.\n", gsGucCommand, errno);
result = false;
} else {
write_runlog(LOG, "Execute %s success: \n", gsGucCommand);
result = true;
}
if (result) {
SetCmDoWrite(dnIndex);
}
return 0;
}
#endif
void PrintInstanceStack(const char* dataPath, bool isPrintedOnce)
{
if (isPrintedOnce) {
return;
}
char command[CM_MAX_COMMAND_LEN];
errno_t rc = snprintf_s(command, CM_MAX_COMMAND_LEN, CM_MAX_COMMAND_LEN - 1,
"gs_ctl stack -D %s >> \"%s\" 2>&1 &",
dataPath, system_call_log);
securec_check_intval(rc, (void)rc);
int ret = system(command);
if (ret != 0) {
write_runlog(ERROR, "[%s] command:%s failed %d! errno=%d.\n", __FUNCTION__, command, ret, errno);
} else {
write_runlog(LOG, "[%s] command:%s success\n", __FUNCTION__, command);
}
}
uint32 GetMaxLenOfPartition(void)
{
FILE *fp;
char line[1024];
char partitionName[1024];
uint32 maxPartitionLen = 0;
char command[CM_MAX_COMMAND_LEN];
errno_t rc = snprintf_s(command, CM_MAX_COMMAND_LEN, CM_MAX_COMMAND_LEN - 1,
"df |awk '{print $NF}' |grep ^/");
securec_check_intval(rc, (void)rc);
fp = popen(command, "r");
if (fp == NULL) {
write_runlog(ERROR, "[%s] command: %s failed! errno=%d.\n", __FUNCTION__, command, errno);
return MAX_PATH_LEN;
}
while (fgets(line, sizeof(line)-1, fp)!= NULL) {
sscanf(line, "%s", partitionName);
maxPartitionLen = maxPartitionLen > strlen(partitionName)? maxPartitionLen: strlen(partitionName);
}
(void)pclose(fp);
return maxPartitionLen;
}
uint32 GetMaxPercentFromDirList(const vector<string> v_linkPathInfo)
{
uint32 maxDiskUsage = 0;
uint32 diskUsage = 0;
for (unsigned i = 0; i < v_linkPathInfo.size(); i++) {
diskUsage = GetDiskUsageForPath(v_linkPathInfo[i].c_str());
maxDiskUsage = maxDiskUsage > diskUsage ? maxDiskUsage : diskUsage;
}
return maxDiskUsage;
}
uint32 GetDiskUsageForLinkPath(const char *pathName)
{
vector<string> v_linkPathInfo;
uint32 maxPartitionLen = GetMaxLenOfPartition();
const char *pgDirList[] = {"base", "global", "pg_xlog", "pg_tblspc"};
char entryObsolutePath[MAX_PATH_LEN];
char entryObsoluteRealPath[MAX_PATH_LEN];
char pgTblspcObsolutePath[MAX_PATH_LEN];
int rc = 0;
for (unsigned i = 0; i < sizeof(pgDirList) / sizeof(pgDirList[0]); i++) {
rc = memset_s(entryObsolutePath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void) rc);
rc = memset_s(entryObsoluteRealPath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void) rc);
snprintf_s(entryObsolutePath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/%s", pathName, pgDirList[i]);
int size = readlink(entryObsolutePath, entryObsoluteRealPath, MAX_PATH_LEN - 1);
if (size > 0) {
for (unsigned ii = maxPartitionLen -1; ii < strlen(entryObsoluteRealPath); ii++) {
if (entryObsoluteRealPath[ii] == '/') {
entryObsoluteRealPath[ii] = '\0';
}
}
v_linkPathInfo.push_back(entryObsoluteRealPath);
}
}
rc = memset_s(pgTblspcObsolutePath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void) rc);
snprintf_s(pgTblspcObsolutePath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/%s", pathName, "pg_tblspc");
DIR *dir;
struct dirent *entry;
if ((dir = opendir(pgTblspcObsolutePath)) == nullptr) {
return GetMaxPercentFromDirList(v_linkPathInfo);
}
while ((entry = readdir(dir)) != nullptr) {
if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) {
continue;
}
rc = memset_s(entryObsolutePath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void) rc);
rc = memset_s(entryObsoluteRealPath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void) rc);
if (entry->d_type == DT_LNK) {
snprintf_s(entryObsolutePath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/%s", pgTblspcObsolutePath,
entry->d_name);
int size = readlink(entryObsolutePath, entryObsoluteRealPath, MAX_PATH_LEN - 1);
if (size > 0) {
for (unsigned i = maxPartitionLen -1; i < strlen(entryObsoluteRealPath); i++) {
if (entryObsoluteRealPath[i] == '/'){
entryObsoluteRealPath[i] = '\0';
}
}
v_linkPathInfo.push_back(entryObsoluteRealPath);
}
}
}
(void)closedir(dir);
v_linkPathInfo.erase(unique(v_linkPathInfo.begin(), v_linkPathInfo.end()), v_linkPathInfo.end());
return GetMaxPercentFromDirList(v_linkPathInfo);
}
bool IsLinkPathDestoryedOrDamaged(const char *pathName)
{
const char *pgDirList[] = {"base", "global", "pg_xlog"};
char entryAbsolutePath[MAX_PATH_LEN];
char pgTblspcAbsolutePath[MAX_PATH_LEN];
errno_t rc = 0;
bool cdt;
for (unsigned i = 0; i < sizeof(pgDirList) / sizeof(pgDirList[0]); i++) {
rc = memset_s(entryAbsolutePath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void) rc);
rc = snprintf_s(entryAbsolutePath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/%s", pathName, pgDirList[i]);
securec_check_intval(rc, (void)rc);
cdt = IsDirectoryDestoryed(entryAbsolutePath) || !agentCheckDisc(entryAbsolutePath);
if (cdt) {
write_runlog(ERROR, "disc writable test failed, %s.\n", entryAbsolutePath);
return true;
}
}
rc = memset_s(pgTblspcAbsolutePath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void) rc);
rc = snprintf_s(pgTblspcAbsolutePath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/%s", pathName, "pg_tblspc");
securec_check_intval(rc, (void)rc);
DIR *dir;
struct dirent *entry;
if ((dir = opendir(pgTblspcAbsolutePath)) == nullptr) {
write_runlog(ERROR, "disc writable test failed, %s.\n", pgTblspcAbsolutePath);
return true;
}
while ((entry = readdir(dir)) != nullptr) {
if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) {
continue;
}
rc = memset_s(entryAbsolutePath, MAX_PATH_LEN, 0, MAX_PATH_LEN);
securec_check_errno(rc, (void) rc);
if (entry->d_type == DT_LNK) {
rc = snprintf_s(entryAbsolutePath, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/%s",
pgTblspcAbsolutePath, entry->d_name);
securec_check_intval(rc, (void)rc);
cdt = IsDirectoryDestoryed(entryAbsolutePath) || !agentCheckDisc(entryAbsolutePath);
if (cdt) {
write_runlog(ERROR, "disc writable test failed, %s.\n", entryAbsolutePath);
(void)closedir(dir);
return true;
}
}
}
(void)closedir(dir);
return false;
}
void *DiskUsageCheckMain(void *arg)
{
thread_name = "DiskUsageCheck";
pthread_t threadId = pthread_self();
write_runlog(LOG, "Disk usage check thread start, threadid %lu.\n", threadId);
int index = -1;
AddThreadActivity(&index, threadId);
for (;;) {
if (g_shutdownRequest) {
cm_sleep(5);
continue;
}
#ifdef ENABLE_MULTIPLE_NODES
CheckDiskForCNDataPath();
#endif
CheckDiskForDNDataPath();
UpdateThreadActivity(index);
cm_sleep(1);
}
return NULL;
}
void *PGControlDataCheckMain(void *arg)
{
thread_name = "PGDataControlCheck";
pthread_t threadId = pthread_self();
write_runlog(LOG, "PG Control Data check thread start, threadid %lu.\n", threadId);
for (;;) {
if (g_shutdownRequest || !g_isStorageWithDMSorDSS) {
cm_sleep(5);
continue;
}
PGDataControlCheck();
cm_sleep(1);
}
return NULL;
}
bool FindDnIdxInCurNode(uint32 instId, uint32 *dnIdx, const char *str)
{
for (uint32 i = 0; i < g_currentNode->datanodeCount; ++i) {
if (g_currentNode->datanode[i].datanodeId == instId) {
*dnIdx = i;
return true;
}
}
write_runlog(ERROR, "%s cannot find the instId(%u) in current node.\n", str, instId);
return false;
}
CmResConfList *CmaGetResConfByResName(const char *resName)
{
for (uint32 i = 0; i < GetLocalResConfCount(); ++i) {
if (strcmp(g_resConf[i].resName, resName) == 0) {
return &g_resConf[i];
}
}
write_runlog(ERROR, "in local res conf, can't find res(%s).\n", resName);
return NULL;
}
static void GetParameterValueFromConfigFile(const char* file_path, const char* paraName)
{
char get_cmd[MAXPGPATH * 2];
char result[NAMEDATALEN] = {0};
int retry_cnt = 0;
int ret = snprintf_s(get_cmd,
sizeof(get_cmd),
MAXPGPATH * 2 - 1,
"grep \"^%s\" %s/postgresql.conf|awk -F= \'{print $2}\'|tail -1|grep -o \"[[:alnum:]]*\"",
paraName,
file_path);
securec_check_intval(ret, (void)ret);
while (retry_cnt < MAX_RETRY_TIME) {
if (!ExecuteCmdWithResult(get_cmd, result, NAMEDATALEN)) {
retry_cnt++;
continue;
}
result[strlen(result) - 1] = '\0';
write_runlog(LOG, "Get value of %s: %s\n", paraName, result);
if (strcmp(result, "on") == 0 || strcmp(result, "true") == 0 || strcmp(result, "1") == 0 ||
strcmp(result, "ON") == 0 || strcmp(result, "TRUE") == 0) {
* The bit of g_onDemandRealTimeBuildStatus means:
*|Get By SQL | Get By File | Value|
*|0x 0 | 0 | 0 |
*/
g_onDemandRealTimeBuildStatus |= 0x3;
return;
} else {
g_onDemandRealTimeBuildStatus |= 0x2;
return;
}
}
write_runlog(LOG, "The parameter %s is not found, use the default\n", paraName);
return;
}
void check_datanode_realtime_build_status_by_file(agent_to_cm_datanode_status_report *reportMsg, const char *dataPath)
{
if (g_onDemandRealTimeBuildStatus & 0x2) {
return;
}
const char* paraName = "ss_enable_ondemand_realtime_build";
GetParameterValueFromConfigFile(dataPath, paraName);
write_runlog(LOG, "ondemand_realtime_build_status by file is %d\n", g_onDemandRealTimeBuildStatus);
reportMsg->local_status.realtime_build_status = (g_onDemandRealTimeBuildStatus & 0x1);
}