* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* settings.cpp
* Defines GUC options for parallel recovery.
*
* IDENTIFICATION
* src/gausskernel/storage/access/transam/parallel_recovery/settings.cpp
*
* -------------------------------------------------------------------------
*/
#include <stdio.h>
#include <unistd.h>
#ifdef __USE_NUMA
#include <numa.h>
#endif
#include "postgres.h"
#include "knl/knl_variable.h"
#include "utils/guc.h"
#include "commands/copy.h"
#include "access/multi_redo_settings.h"
#include "access/multi_redo_api.h"
#include "threadpool/threadpool_controler.h"
static uint32 ComputeRecoveryParallelism(int);
static uint32 GetCPUCount();
static const int bufSize = 1024;
void ConfigRecoveryParallelism()
{
char buf[16];
if (g_instance.attr.attr_storage.recovery_parse_workers > 1) {
g_instance.comm_cxt.predo_cxt.redoType = EXTREME_REDO;
g_instance.attr.attr_storage.batch_redo_num = g_instance.attr.attr_storage.recovery_parse_workers;
uint32 total_recovery_parallelism = g_instance.attr.attr_storage.batch_redo_num * 2 +
g_instance.attr.attr_storage.recovery_redo_workers_per_paser_worker *
g_instance.attr.attr_storage.batch_redo_num +
TRXN_REDO_MANAGER_NUM + TRXN_REDO_WORKER_NUM + XLOG_READER_NUM;
if (IsOndemandExtremeRtoMode()) {
total_recovery_parallelism = total_recovery_parallelism + ONDEMAND_AUXILIARY_WORKER_NUM +
g_instance.attr.attr_storage.batch_redo_num;
}
sprintf_s(buf, sizeof(buf), "%u", total_recovery_parallelism);
ereport(LOG, (errmsg("ConfigRecoveryParallelism, parse workers:%d, "
"redo workers per parse worker:%d, total workernums is %u",
g_instance.attr.attr_storage.recovery_parse_workers,
g_instance.attr.attr_storage.recovery_redo_workers_per_paser_worker,
total_recovery_parallelism)));
g_supportHotStandby = g_instance.attr.attr_storage.EnableHotStandby;
SetConfigOption("recovery_parallelism", buf, PGC_POSTMASTER, PGC_S_OVERRIDE);
} else if (g_instance.attr.attr_storage.max_recovery_parallelism > 1) {
g_instance.comm_cxt.predo_cxt.redoType = PARALLEL_REDO;
uint32 true_max_recovery_parallelism = g_instance.attr.attr_storage.max_recovery_parallelism;
if (true_max_recovery_parallelism > MOST_FAST_RECOVERY_LIMIT) {
true_max_recovery_parallelism = MOST_FAST_RECOVERY_LIMIT;
}
sprintf_s(buf, sizeof(buf), "%u", ComputeRecoveryParallelism(true_max_recovery_parallelism));
ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG),
errmsg("ConfigRecoveryParallelism, true_max_recovery_parallelism:%u, "
"max_recovery_parallelism:%d",
true_max_recovery_parallelism, g_instance.attr.attr_storage.max_recovery_parallelism)));
SetConfigOption("recovery_parallelism", buf, PGC_POSTMASTER, PGC_S_OVERRIDE);
}
}
static uint32 ComputeRecoveryParallelism(int hint)
{
* Reciprocal of the shares of CPU used for recovery. The idea is that
* using the default value the standby is able to keep up with the master
* (assuming the standby and the master use the same hardware), while on
* machines with fewer CPUs, the user is able to boost up recovery
* performance by using more CPUs. A capped maximum is used to protect
* users from setting hugh values.
*
* The default is to use 1/32 of all CPUs. On beefy machines, the capped
* maximum is 1/4 of all CPUs. On smaller machines, the capped maximum
* is the number of CPUs or 8, whichever is smaller.
*/
static const uint32 DEFAULT_CPU_SHARE = 32;
static const uint32 MAX_CPU_SHARE = 4;
static const uint32 MIN_ALLOWED_MAX_PARALLELISM = 8;
uint32 g_cpu_count = 0;
if (g_cpu_count == 0)
g_cpu_count = GetCPUCount();
uint32 default_parallelism = g_cpu_count / DEFAULT_CPU_SHARE;
uint32 max_parallelism;
if (g_cpu_count < MIN_ALLOWED_MAX_PARALLELISM)
max_parallelism = g_cpu_count;
else if (g_cpu_count / MAX_CPU_SHARE < MIN_ALLOWED_MAX_PARALLELISM)
max_parallelism = MIN_ALLOWED_MAX_PARALLELISM;
else
max_parallelism = g_cpu_count / MAX_CPU_SHARE;
uint32 actual_parallelism;
if (hint <= 0)
actual_parallelism = default_parallelism;
else if (((uint32)hint) < max_parallelism)
actual_parallelism = hint;
else
actual_parallelism = max_parallelism;
if (actual_parallelism < 1)
actual_parallelism = 1;
ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG),
errmsg("Recovery parallelism, cpu count = %u, max = %d, actual = %u", g_cpu_count, hint,
actual_parallelism)));
return actual_parallelism;
}
static uint32 GetCPUCount()
{
#ifdef _SC_NPROCESSORS_ONLN
return (uint32)sysconf(_SC_NPROCESSORS_ONLN);
#else
static const uint32 DEFAULT_CPU_COUNT = 64;
return DEFAULT_CPU_COUNT;
#endif
}
void ParseBindCpuInfo(RedoCpuBindControl *control)
{
char* attr = TrimStrQuote(g_instance.attr.attr_storage.redo_bind_cpu_attr, true);
if (attr == NULL) {
return;
}
char* ptoken = NULL;
char* psave = NULL;
const char* pdelimiter = ":";
ptoken = TrimStr(strtok_r(attr, pdelimiter, &psave));
ptoken = pg_strtolower(ptoken);
if (ptoken == NULL) {
return;
}
int bindNum = 0;
if (strncmp("nobind", ptoken, strlen("nobind")) == 0) {
control->bindType = REDO_NO_CPU_BIND;
pfree_ext(ptoken);
return;
} else if (strncmp("cpubind", ptoken, strlen("cpubind")) == 0) {
control->bindType = REDO_CPU_BIND;
control->isBindCpuArr = (bool*)palloc0(sizeof(bool) * control->totalCpuNum);
bindNum = ThreadPoolControler::ParseRangeStr(psave, control->isBindCpuArr, control->totalCpuNum, "cpubind");
} else if (strncmp("nodebind", ptoken, strlen("nodebind")) == 0) {
control->bindType = REDO_NODE_BIND;
control->isBindNumaArr = (bool*)palloc0(sizeof(bool) * control->totalCpuNum);
bindNum = ThreadPoolControler::ParseRangeStr(psave, control->isBindNumaArr, control->totalNumaNum, "nodebind");
} else {
ereport(FATAL, (errcode(ERRCODE_OPERATE_INVALID_PARAM), errmsg("Invalid attribute for multi redo."),
errdetail("redo bind config Only support 'nobind', 'cpubind', and 'nodebind'.")));
}
if (bindNum == 0) {
ereport(FATAL, (errcode(ERRCODE_OPERATE_INVALID_PARAM), errmsg("Invalid attribute for multi redo."),
errdetail("Can not find valid CPU for thread binding, there are two possible reasons:\n"
"1. These CPUs are not active, use lscpu to check On-line CPU(s) list.\n"
"2. The process has been bind to other CPUs and there is no intersection,"
"use taskset -pc to check process CPU bind info.\n")));
}
pfree_ext(ptoken);
}
bool CpuCanConfiged(RedoCpuBindControl *contrl, int cpuid, int numaid)
{
switch (contrl->bindType) {
case REDO_NODE_BIND:
return (contrl->isBindNumaArr[numaid] && contrl->isMcsCpuArr[cpuid] && CPU_ISSET(cpuid, &contrl->cpuSet));
case REDO_CPU_BIND:
return (contrl->isBindCpuArr[cpuid] && contrl->isMcsCpuArr[cpuid] && CPU_ISSET(cpuid, &contrl->cpuSet));
default:
return false;
}
return false;
}
void ConfigBindCpuInfo(RedoCpuBindControl *contrl)
{
contrl->isMcsCpuArr = ThreadPoolControler::GetMcsCpuInfo(contrl->totalCpuNum);
NumaCpuId *sysNumaCpuIdList = (NumaCpuId*)palloc0(sizeof(NumaCpuId) * contrl->totalCpuNum);
int sysNumaCpuIdNum = 0;
ThreadPoolControler::GetActiveCpu(sysNumaCpuIdList, &sysNumaCpuIdNum);
if (sysNumaCpuIdNum == 0) {
pfree_ext(sysNumaCpuIdList);
return;
}
contrl->cpuArr = (int*)palloc0(sizeof(int) * contrl->totalCpuNum);
for (int i = 0; i < sysNumaCpuIdNum; ++i) {
int cpuid = sysNumaCpuIdList[i].cpuId;
int numaid = sysNumaCpuIdList[i].numaId;
if (CpuCanConfiged(contrl, cpuid, numaid)) {
contrl->cpuArr[contrl->activeCpuNum++] = cpuid;
}
}
pfree_ext(sysNumaCpuIdList);
ereport(LOG, (errmsg("ConfigBindCpuInfo redo bind cpu num: %d.", contrl->activeCpuNum)));
CPU_ZERO(&contrl->configCpuSet);
for (int i = 0; i < contrl->activeCpuNum; ++i) {
CPU_SET(contrl->cpuArr[i], &contrl->configCpuSet);
}
}
bool CheckRedoBindConfigValid(RedoCpuBindControl *contrl)
{
if (contrl->bindType == REDO_NO_CPU_BIND) {
return false;
}
if (contrl->totalNumaNum == 0 || contrl->totalCpuNum == 0) {
ereport(WARNING, (errmsg("CheckRedoBindConfigValid: Fail to read cpu num(%d) or numa num(%d).",
contrl->totalNumaNum, contrl->totalCpuNum)));
return false;
}
#ifdef __USE_NUMA
int numaNodeNum = numa_max_node() + 1;
if (numaNodeNum <= 1) {
ereport(WARNING, (errmsg("CheckRedoBindConfigValid No multiple NUMA nodes available: %d.", numaNodeNum)));
return false;
} else if (contrl->totalNumaNum != numaNodeNum) {
ereport(WARNING, (errmsg("Cannot activate NUMA distribute because NUMA nodes are unavailable.")));
return false;
} else {
return true;
}
#endif
return false;
}
void ProcessRedoCpuBindInfo()
{
RedoCpuBindControl *contrl = &g_instance.comm_cxt.predo_cxt.redoCpuBindcontrl;
ThreadPoolControler::GetInstanceBind(&contrl->cpuSet);
ThreadPoolControler::GetCpuAndNumaNum(&contrl->totalCpuNum, &contrl->totalNumaNum);
ParseBindCpuInfo(contrl);
if (CheckRedoBindConfigValid(contrl)) {
ConfigBindCpuInfo(contrl);
}
}
void BindRedoThreadToSpecifiedCpu(knl_thread_role thread_role)
{
#ifdef __USE_NUMA
if (thread_role == STARTUP || thread_role == PAGEREDO) {
RedoCpuBindControl *contrl = &g_instance.comm_cxt.predo_cxt.redoCpuBindcontrl;
if (contrl->activeCpuNum > 0) {
int ret = pthread_setaffinity_np(gs_thread_self(), sizeof(cpu_set_t), &contrl->configCpuSet);
if (ret != 0) {
ereport(WARNING, (errmsg("Fail to attach %d thread %lu active cpu num %d, %d", (int)thread_role,
gs_thread_self(), contrl->activeCpuNum, ret)));
}
}
}
#endif
}