* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* memprot.cpp
*
* The implementation of memory protection. We implement a mechanism to enforce
* a quota on the total memory consumption at query level.
*
* IDENTIFICATION
* src/common/backend/utils/mmgr/memprot.cpp
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "postmaster/postmaster.h"
#include "miscadmin.h"
#include "utils/aset.h"
#include "utils/atomic.h"
#include "utils/memprot.h"
#include "executor/exec/execStream.h"
#include "tcop/tcopprot.h"
#include "pgstat.h"
#include "pgxc/pgxc.h"
#include "libcomm/libcomm.h"
#include "replication/dataprotocol.h"
#include "replication/walprotocol.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
volatile int32 shareTrackedMemChunks = 0;
int64 shareTrackedBytes = 0;
int64 storageTrackedBytes = 0;
int32 peakChunksSharedContext = 0;
unsigned int chunkSizeInBits = BITS_IN_MB;
int physicalMemQuotaInChunks = 0;
int eliminateErrorsMemoryBytes = 1 * 1024 * 512;
int32 maxChunksPerProcess = 0;
volatile int32 processMemInChunks = 200;
int32 peakChunksPerProcess = 0;
int32 comm_original_memory = 0;
int32 maxSharedMemory = 0;
int32 backendReservedMemInChunk = 0;
volatile int32 backendUsedMemInChunk = 0;
volatile int32 dynmicTrackedMemChunks = 200;
THR_LOCAL TimestampTz last_print_timestamp = 0;
* This is the virtual function table for Memory Functions
*/
MemoryProtectFuncDef GenericFunctions = {MemoryProtectFunctions::gs_memprot_malloc<MEM_THRD>,
MemoryProtectFunctions::gs_memprot_free<MEM_THRD>,
MemoryProtectFunctions::gs_memprot_realloc<MEM_THRD>,
MemoryProtectFunctions::gs_posix_memalign<MEM_THRD>};
MemoryProtectFuncDef SessionFunctions = {MemoryProtectFunctions::gs_memprot_malloc<MEM_SESS>,
MemoryProtectFunctions::gs_memprot_free<MEM_SESS>,
MemoryProtectFunctions::gs_memprot_realloc<MEM_SESS>,
MemoryProtectFunctions::gs_posix_memalign<MEM_SESS>};
MemoryProtectFuncDef SharedFunctions = {MemoryProtectFunctions::gs_memprot_malloc<MEM_SHRD>,
MemoryProtectFunctions::gs_memprot_free<MEM_SHRD>,
MemoryProtectFunctions::gs_memprot_realloc<MEM_SHRD>,
MemoryProtectFunctions::gs_posix_memalign<MEM_SHRD>};
extern uint64 g_searchserver_memory;
extern bool is_searchserver_api_load();
extern void* get_searchlet_resource_info(int* used_mem, int* peak_mem);
void gs_output_memory_info(void);
#ifdef MEMORY_CONTEXT_CHECKING
* inline function for memory enjection
*/
bool gs_memory_enjection(void)
{
if (MEMORY_FAULT_PERCENT && !t_thrd.xact_cxt.bInAbortTransaction && !t_thrd.int_cxt.CritSectionCount &&
!(AmPostmasterProcess()) && IsNormalProcessingMode() && t_thrd.utils_cxt.memNeedProtect &&
(gs_random() % MAX_MEMORY_FAULT_PERCENT <= MEMORY_FAULT_PERCENT)) {
return true;
}
return false;
}
#endif
* check if the node is on heavy memory status now?
* is strict is true, we'll do some pre-judgement.
*/
FORCE_INLINE bool gs_sysmemory_busy(int64 used, bool strict)
{
if (!GS_MP_INITED)
return false;
int64 percent = (processMemInChunks * 100) / maxChunksPerProcess;
int usedInChunk = used >> chunkSizeInBits;
if ((g_instance.wlm_cxt->stat_manager.comp_count > 1 || strict) && percent >= LOW_PROCMEM_MARK &&
percent <= HIGH_PROCMEM_MARK) {
if (usedInChunk >= LOW_WORKMEM_CHUNK &&
usedInChunk > (double)maxChunksPerProcess / 2000 *
((HIGH_PROCMEM_MARK - percent) * (HIGH_PROCMEM_MARK - percent) + 200))
return true;
}
if (percent > HIGH_PROCMEM_MARK && usedInChunk > Min(maxChunksPerProcess / 100, LOW_WORKMEM_CHUNK))
return true;
return false;
}
bool gs_sysmemory_avail(int64 requestedBytes)
{
if (!GS_MP_INITED)
return true;
int64 newsize = t_thrd.utils_cxt.trackedBytes + requestedBytes;
int32 newszChunk = (uint64)newsize >> chunkSizeInBits;
if (newszChunk > t_thrd.utils_cxt.trackedMemChunks &&
(processMemInChunks + newszChunk - t_thrd.utils_cxt.trackedMemChunks) > maxChunksPerProcess)
return false;
return true;
}
void gs_find_abnormal_memctx(MemoryContext context)
{
#ifndef ENABLE_MEMORY_CHECK
AllocSet aset = (AllocSet)context;
#else
AsanSet aset = (AsanSet)context;
#endif
if (context->type == T_SharedAllocSetContext || context->type == T_MemalignSharedAllocSetContext) {
if (aset->totalSpace > SELF_SHARED_MEMCTX_LIMITATION) {
write_stderr("----debug_query_id=%lu, WARNING: the shared memory context '%s' is using %d MB size larger "
"than %d MB.\n",
u_sess->debug_query_id,
context->name,
(int)(aset->totalSpace >> BITS_IN_MB),
SELF_SHARED_MEMCTX_LIMITATION >> BITS_IN_MB);
}
} else {
if (aset->totalSpace > (Size)(aset->maxSpaceSize + SELF_GENRIC_MEMCTX_LIMITATION)) {
write_stderr("----debug_query_id=%lu, WARNING: the common memory context '%s' is using %d MB size larger "
"than %d MB.\n",
u_sess->debug_query_id,
context->name,
(int)(aset->totalSpace >> BITS_IN_MB),
(int)(aset->maxSpaceSize >> BITS_IN_MB));
}
}
}
void gs_recursive_verify_memctx(MemoryContext context, bool is_shared)
{
MemoryContext child;
PG_TRY();
{
CHECK_FOR_INTERRUPTS();
if (is_shared) {
MemoryContextLock(context);
}
gs_find_abnormal_memctx(context);
for (child = context->firstchild; child != NULL; child = child->nextchild) {
if (child->is_shared == is_shared) {
gs_recursive_verify_memctx(child, is_shared);
}
}
}
PG_CATCH();
{
if (is_shared) {
MemoryContextUnlock(context);
}
PG_RE_THROW();
}
PG_END_TRY();
if (is_shared) {
MemoryContextUnlock(context);
}
}
void gs_memprot_reset_beyondchunk(void)
{
t_thrd.utils_cxt.beyondChunk = 0;
}
void gs_display_query_string(uint64 sessionid)
{
if (t_thrd.shemem_ptr_cxt.mySessionMemoryEntry != NULL &&
t_thrd.shemem_ptr_cxt.mySessionMemoryEntry->sessionid != sessionid) {
PgBackendStatus* beentry = pgstat_get_backend_single_entry(sessionid);
if (beentry != NULL && *(beentry->st_activity) != '\0') {
write_stderr(
"----debug_query_id=%lu, It is not the current session and beentry info : "
"datid<%u>, app_name<%s>, query_id<%lu>, tid<%lu>, lwtid<%d>, parent_sessionid<%lu>, thread_level<%d>, "
"query_string<%s>.\n",
u_sess->debug_query_id,
beentry->st_databaseid,
beentry->st_appname ? beentry->st_appname : "unnamed thread",
beentry->st_queryid,
beentry->st_procpid,
beentry->st_tid,
beentry->st_parent_sessionid,
beentry->st_thread_level,
beentry->st_activity);
}
}
}
* get the info from sessionMemoryEntry
*/
void gs_display_query_by_sessionentry(uint64* saveThreadid, uint64* maxThreadid)
{
* find all user information, get its complicated queries
*/
SessionLevelMemory *entry = NULL;
SessionLevelMemory *tmpentry = NULL;
SessionLevelMemory localEntry;
int entryIndex = 0;
int compCnt = 0;
int beyCnt = 0;
int beySize = 0;
int allEstMemory = 0;
int curCostMemory = 0;
int allCostMemory = 0;
int maxCurCostMemory = 0;
int maxEstCostMemory = 0;
int saveThreadCost = 0;
int saveThreadEst = 0;
for (entryIndex = 0; entryIndex < SessionMemoryArraySize; entryIndex++) {
entry = &(t_thrd.shemem_ptr_cxt.sessionMemoryArray[entryIndex]);
int ss_rc = memcpy_s(&localEntry, sizeof(localEntry), entry, sizeof(SessionLevelMemory));
securec_check(ss_rc, "\0", "\0");
if (entry->isValid == false || entry->iscomplex == 0)
continue;
tmpentry = &localEntry;
allEstMemory += tmpentry->estimate_memory;
curCostMemory = (int)((unsigned int)(tmpentry->queryMemInChunks - tmpentry->initMemInChunks)
<< (chunkSizeInBits - BITS_IN_MB));
allCostMemory += curCostMemory;
compCnt++;
if (curCostMemory > tmpentry->estimate_memory) {
if (tmpentry->estimate_memory)
beyCnt++;
if ((curCostMemory - tmpentry->estimate_memory) > beySize) {
beySize = (curCostMemory - tmpentry->estimate_memory);
*saveThreadid = tmpentry->sessionid;
saveThreadCost = curCostMemory;
saveThreadEst = tmpentry->estimate_memory;
}
}
if (curCostMemory > maxCurCostMemory) {
maxCurCostMemory = curCostMemory;
*maxThreadid = tmpentry->sessionid;
maxEstCostMemory = tmpentry->estimate_memory;
}
}
if (allEstMemory || curCostMemory)
write_stderr("----debug_query_id=%lu, Total estimated Memory is %d MB, total current cost Memory is %d MB, "
"the difference is %d MB."
"The count of complicated queries is %d and the count of uncontrolled queries is %d.\n",
u_sess->debug_query_id,
allEstMemory,
allCostMemory,
allCostMemory - allEstMemory,
compCnt,
beyCnt);
if (*saveThreadid == *maxThreadid) {
write_stderr("----debug_query_id=%lu, The abnormal query thread id %lu."
"It current used memory is %d MB and estimated memory is %d MB."
"It also is the query which costs the maximum memory.\n",
u_sess->debug_query_id,
*saveThreadid,
saveThreadCost,
saveThreadEst);
gs_display_query_string(*maxThreadid);
} else {
if (*saveThreadid) {
write_stderr("----debug_query_id=%lu, The abnormal query thread id %lu."
"It current used memory is %d MB and estimated memory is %d MB.\n",
u_sess->debug_query_id,
*saveThreadid,
saveThreadCost,
saveThreadEst);
gs_display_query_string(*saveThreadid);
}
if (*maxThreadid) {
write_stderr("----debug_query_id=%lu, The query which costs the maximum memory is %lu."
"It current used memory is %d MB and estimated memory is %d MB.\n",
u_sess->debug_query_id,
*maxThreadid,
maxCurCostMemory,
maxEstCostMemory);
gs_display_query_string(*maxThreadid);
}
}
}
void gs_display_uncontrolled_query(int* procIdx)
{
if (t_thrd.comm_cxt.LibcommThreadType != LIBCOMM_NONE || t_thrd.shemem_ptr_cxt.sessionMemoryArray == NULL)
return;
uint64 saveSessionid = 0;
uint64 maxSessionid = 0;
gs_display_query_by_sessionentry(&saveSessionid, &maxSessionid);
if (saveSessionid || maxSessionid) {
volatile PGPROC* proc = NULL;
int idx = 0;
PG_TRY();
{
HOLD_INTERRUPTS();
for (idx = 0; idx < (int)g_instance.proc_base->allProcCount; idx++) {
proc = g_instance.proc_base_all_procs[idx];
*procIdx = idx;
if (proc->sessMemorySessionid == saveSessionid || proc->sessMemorySessionid == maxSessionid) {
(void)syscalllockAcquire(&((PGPROC*)proc)->deleMemContextMutex);
if (NULL != proc->topmcxt)
gs_recursive_verify_memctx(proc->topmcxt, false);
(void)syscalllockRelease(&((PGPROC*)proc)->deleMemContextMutex);
}
}
RESUME_INTERRUPTS();
}
PG_CATCH();
{
if (*procIdx < (int)g_instance.proc_base->allProcCount) {
proc = g_instance.proc_base_all_procs[*procIdx];
(void)syscalllockRelease(&((PGPROC*)proc)->deleMemContextMutex);
}
PG_RE_THROW();
}
PG_END_TRY();
}
}
void gs_output_memory_info(void)
{
int max_dynamic_memory = (int)(maxChunksPerProcess << (chunkSizeInBits - BITS_IN_MB));
int dynamic_used_memory = processMemInChunks << (chunkSizeInBits - BITS_IN_MB);
int dynamic_peak_memory = (int)(peakChunksPerProcess << (chunkSizeInBits - BITS_IN_MB));
int dynamic_used_shrctx = (int)(shareTrackedMemChunks << (chunkSizeInBits - BITS_IN_MB));
int dynamic_peak_shrctx = (int)(peakChunksSharedContext << (chunkSizeInBits - BITS_IN_MB));
int max_sctpcomm_memory = comm_original_memory;
int sctpcomm_used_memory = (int)(gs_get_comm_used_memory() >> BITS_IN_MB);
int sctpcomm_peak_memory = (int)(gs_get_comm_peak_memory() >> BITS_IN_MB);
int comm_global_memctx = (gs_get_comm_context_memory() >> BITS_IN_MB);
int gpu_max_dynamic_memory = 0;
int gpu_dynamic_used_memory = 0;
int gpu_dynamic_peak_memory = 0;
int large_storage_memory = (int)(storageTrackedBytes >> BITS_IN_MB);
int procIdx = 0;
#ifdef ENABLE_MULTIPLE_NODES
if (is_searchserver_api_load()) {
void* mem_info = get_searchlet_resource_info(&gpu_dynamic_used_memory, &gpu_dynamic_peak_memory);
if (mem_info != NULL) {
gpu_max_dynamic_memory = g_searchserver_memory;
pfree(mem_info);
}
}
#endif
write_stderr("----debug_query_id=%lu, Memory information of whole process in MB:"
"max_dynamic_memory: %d, dynamic_used_memory: %d, dynamic_peak_memory: %d, "
"dynamic_used_shrctx: %d, dynamic_peak_shrctx: %d, "
"max_sctpcomm_memory: %d, sctpcomm_used_memory: %d, sctpcomm_peak_memory: %d, comm_global_memctx: %d, "
"gpu_max_dynamic_memory: %d, gpu_dynamic_used_memory: %d, gpu_dynamic_peak_memory: %d, "
"large_storage_memory: %d.\n",
u_sess->debug_query_id,
max_dynamic_memory,
dynamic_used_memory,
dynamic_peak_memory,
dynamic_used_shrctx,
dynamic_peak_shrctx,
max_sctpcomm_memory,
sctpcomm_used_memory,
sctpcomm_peak_memory,
comm_global_memctx,
gpu_max_dynamic_memory,
gpu_dynamic_used_memory,
gpu_dynamic_peak_memory,
large_storage_memory);
int real_shrctx_size = dynamic_used_shrctx - sctpcomm_used_memory;
if (real_shrctx_size > MAX_SHARED_MEMCTX_LIMITATION) {
write_stderr("----debug_query_id=%lu, WARNING: the memory used in shared memory context is %d beyond %d, "
"it may cause the out of memory.\n",
u_sess->debug_query_id,
real_shrctx_size,
MAX_SHARED_MEMCTX_LIMITATION);
gs_recursive_verify_memctx(g_instance.instance_context, true);
if (max_dynamic_memory != 0 && (real_shrctx_size > MAX_SHARED_MEMCTX_SIZE) &&
(real_shrctx_size * 100 / max_dynamic_memory) > 60) {
write_stderr(
"----debug_query_id=%lu, FATAL: share memory context is out of control!\n", u_sess->debug_query_id);
}
}
if (sctpcomm_used_memory > MAX_COMM_USED_SIZE) {
write_stderr("----debug_query_id=%lu, WARNING: the memory used in communication layer is beyond %d, "
"it may cause the out of memory.\n",
u_sess->debug_query_id,
MAX_COMM_USED_SIZE);
if (max_dynamic_memory != 0 && (sctpcomm_used_memory > max_sctpcomm_memory) &&
((sctpcomm_used_memory * 100 / max_dynamic_memory) > 60)) {
write_stderr(
"----debug_query_id=%lu, FATAL: communication memory is out of control!\n", u_sess->debug_query_id);
}
}
gs_display_uncontrolled_query(&procIdx);
}
template <bool flag>
void gs_memprot_failed(int64 sz, MemType type)
{
t_thrd.utils_cxt.beyondChunk += 50;
uint64 debug_query_id = (u_sess == NULL) ? 0 : u_sess->debug_query_id;
if (flag) {
if (type == MEM_THRD)
write_stderr(
"----debug_query_id=%lu, memory allocation failed due to reaching the database memory limitation."
" Current thread is consuming about %u MB, allocating %ld bytes.\n",
debug_query_id,
(uint32)t_thrd.utils_cxt.trackedMemChunks << (chunkSizeInBits - BITS_IN_MB),
sz);
else
write_stderr(
"----debug_query_id=%lu, memory allocation failed due to reaching the database memory limitation."
" Current session is consuming about %u MB, allocating %ld bytes.\n",
debug_query_id,
(uint32)u_sess->stat_cxt.trackedMemChunks << (chunkSizeInBits - BITS_IN_MB),
sz);
} else {
if (type == MEM_THRD)
write_stderr(
"----debug_query_id=%lu, FATAL: memory allocation failed due to reaching the OS memory limitation."
" Current thread is consuming about %u MB, allocating %ld bytes.\n",
debug_query_id,
(uint32)t_thrd.utils_cxt.trackedMemChunks << (chunkSizeInBits - BITS_IN_MB),
sz);
else
write_stderr(
"----debug_query_id=%lu, FATAL: memory allocation failed due to reaching the OS memory limitation."
" Current session is consuming about %u MB, allocating %ld bytes.\n",
debug_query_id,
(uint32)u_sess->stat_cxt.trackedMemChunks << (chunkSizeInBits - BITS_IN_MB),
sz);
write_stderr("----debug_query_id=%lu, Please check the sysctl configuration and GUC variable "
"g_instance.attr.attr_memory.max_process_memory.\n",
debug_query_id);
}
}
static bool MemoryIsNotEnough(int32 currentMem, int32 maxMem, bool needProtect)
{
if (currentMem < maxMem) {
return false;
}
if ((u_sess && u_sess->attr.attr_memory.disable_memory_protect) ||
!t_thrd.utils_cxt.memNeedProtect || !needProtect) {
return false;
}
if ((t_thrd.int_cxt.CritSectionCount != 0) ||
(AmPostmasterProcess()) || t_thrd.xact_cxt.bInAbortTransaction) {
return false;
}
if (currentMem >= (maxMem + t_thrd.utils_cxt.beyondChunk)) {
write_stderr("ERROR: memory alloc failed. Current role is: %d, maxMem is: %d, currMem is: %d.\n",
t_thrd.role, maxMem, currentMem);
return true;
}
return false;
}
* Reserve num of chunks for current thread. The reservation has to be
* valid against query level memory quota.
*/
template <MemType type>
static bool memTracker_ReserveMemChunks(int32 numChunksToReserve, bool needProtect)
{
int32 total = 0;
volatile int32 *currSize = NULL;
int32 *maxSize = NULL;
Assert(0 < numChunksToReserve);
if (t_thrd.utils_cxt.backend_reserved) {
currSize = &backendUsedMemInChunk;
maxSize = &backendReservedMemInChunk;
} else {
currSize = &processMemInChunks;
maxSize = &maxChunksPerProcess;
}
total = pg_atomic_add_fetch_u32((volatile uint32*)currSize, numChunksToReserve);
if (MemoryIsNotEnough(total, *maxSize, needProtect)) {
(void)pg_atomic_sub_fetch_u32((volatile uint32*)currSize, numChunksToReserve);
return false;
}
if (total < *maxSize) {
t_thrd.utils_cxt.beyondChunk = 0;
}
if (t_thrd.shemem_ptr_cxt.mySessionMemoryEntry && type != MEM_SHRD) {
total = gs_atomic_add_32(&(t_thrd.shemem_ptr_cxt.mySessionMemoryEntry->queryMemInChunks), numChunksToReserve);
if (t_thrd.shemem_ptr_cxt.mySessionMemoryEntry->peakChunksQuery < total)
gs_lock_test_and_set(&(t_thrd.shemem_ptr_cxt.mySessionMemoryEntry->peakChunksQuery), total);
}
if (peakChunksPerProcess < processMemInChunks + backendUsedMemInChunk) {
peakChunksPerProcess = processMemInChunks + backendUsedMemInChunk;
int threshold = PROCMEM_HIGHWATER_THRESHOLD >> (chunkSizeInBits - BITS_IN_MB);
if ((backendUsedMemInChunk + processMemInChunks) >
(backendReservedMemInChunk + maxChunksPerProcess + threshold)) {
TimestampTz current = GetCurrentTimestamp();
if (u_sess != NULL && TimestampDifferenceExceeds(last_print_timestamp, current, 60000)) {
uint32 processMemMB =
(uint32)(backendUsedMemInChunk + processMemInChunks) << (chunkSizeInBits - BITS_IN_MB);
uint32 reserveMemMB = (uint32)numChunksToReserve << (chunkSizeInBits - BITS_IN_MB);
write_stderr("WARNING: process memory allocation %u MB, pid %lu, "
"thread self memory %ld bytes, new %ld bytes allocated, statement(%s).\n",
processMemMB, t_thrd.proc_cxt.MyProcPid, t_thrd.utils_cxt.trackedBytes, (int64)reserveMemMB << BITS_IN_MB,
(t_thrd.postgres_cxt.debug_query_string != NULL) ? t_thrd.postgres_cxt.debug_query_string : "NULL");
last_print_timestamp = current;
}
}
}
return true;
}
* Reserve requestedBytes from the memory tracking system.
*
* We use BITS_IN_MB sized chunk as the unit to reserve the
* memory for newly requested bytes for performance reason.
* If the newly requested bytes can fit into the previous
* reserved chunk, it does not reserve a new chunk.
*/
template <MemType type>
bool memTracker_ReserveMem(int64 requestedBytes, bool needProtect)
{
bool status = true;
int64 tb = 0;
int32 tc = 0;
int32 needChunk = 0;
if (type == MEM_SHRD) {
tc = (uint64)shareTrackedBytes >> chunkSizeInBits;
tb = gs_atomic_add_64(&shareTrackedBytes, requestedBytes);
gs_lock_test_and_set(&shareTrackedMemChunks, tc);
} else if (type == MEM_THRD) {
tc = t_thrd.utils_cxt.trackedMemChunks;
t_thrd.utils_cxt.trackedBytes += requestedBytes;
tb = t_thrd.utils_cxt.trackedBytes;
} else {
tc = u_sess->stat_cxt.trackedMemChunks;
u_sess->stat_cxt.trackedBytes += requestedBytes;
tb = u_sess->stat_cxt.trackedBytes;
}
if (type != MEM_SHRD) {
tb = tb + eliminateErrorsMemoryBytes;
}
int32 newszChunk = (uint64)tb >> chunkSizeInBits;
if (newszChunk > tc) {
needChunk = newszChunk - tc;
status = memTracker_ReserveMemChunks<type>(needChunk, needProtect);
}
if (status == false) {
if (type == MEM_SHRD)
gs_atomic_add_64(&shareTrackedBytes, -requestedBytes);
else if (type == MEM_THRD)
t_thrd.utils_cxt.trackedBytes -= requestedBytes;
else
u_sess->stat_cxt.trackedBytes -= requestedBytes;
} else {
if (type == MEM_SHRD) {
(void)pg_atomic_add_fetch_u32((volatile uint32*)&shareTrackedMemChunks, needChunk);
if (shareTrackedMemChunks > peakChunksSharedContext)
peakChunksSharedContext = shareTrackedMemChunks;
} else {
if (type == MEM_THRD)
t_thrd.utils_cxt.trackedMemChunks = newszChunk;
else
u_sess->stat_cxt.trackedMemChunks = newszChunk;
if (needChunk != 0) {
(void)pg_atomic_add_fetch_u32((volatile uint32*)&dynmicTrackedMemChunks, needChunk);
}
t_thrd.utils_cxt.basedBytesInQueryLifeCycle += requestedBytes;
if(t_thrd.utils_cxt.basedBytesInQueryLifeCycle > t_thrd.utils_cxt.peakedBytesInQueryLifeCycle)
t_thrd.utils_cxt.peakedBytesInQueryLifeCycle = t_thrd.utils_cxt.basedBytesInQueryLifeCycle;
}
}
return status;
}
* Releases "reduction" number of chunks to the query.
*/
template <MemType type>
static void memTracker_ReleaseMemChunks(int reduction)
{
int total = 0;
Assert(0 <= reduction);
if (t_thrd.utils_cxt.backend_reserved) {
total = pg_atomic_sub_fetch_u32((volatile uint32*)&backendUsedMemInChunk, reduction);
} else {
total = pg_atomic_sub_fetch_u32((volatile uint32*)&processMemInChunks, reduction);
}
if (t_thrd.shemem_ptr_cxt.mySessionMemoryEntry && type != MEM_SHRD) {
total = gs_atomic_add_32(&(t_thrd.shemem_ptr_cxt.mySessionMemoryEntry->queryMemInChunks), -reduction);
}
}
* Releases requested size bytes memory.
*
* For performance reason this method accumulates free requests until it has
* enough bytes to free a whole chunk.
*/
template <MemType type>
void memTracker_ReleaseMem(int64 toBeFreedRequested)
{
int32 tc = 0;
int64 tb = 0;
* We need this adjustment as gaussdb may request to free more memory than it reserved, apparently
* because a bug somewhere that tries to release memory for allocations made before the memory
* tracking system was initialized.
*/
if (type == MEM_SHRD)
tb = shareTrackedBytes;
else if (type == MEM_THRD)
tb = t_thrd.utils_cxt.trackedBytes;
else
tb = u_sess->stat_cxt.trackedBytes;
int64 toBeFreed = Min(tb, toBeFreedRequested);
if (0 == toBeFreed) {
return;
}
if (type == MEM_SHRD) {
tc = (uint64)shareTrackedBytes >> chunkSizeInBits;
tb = gs_atomic_add_64(&shareTrackedBytes, -toBeFreed);
gs_lock_test_and_set(&shareTrackedMemChunks, tc);
} else if (type == MEM_THRD) {
tc = t_thrd.utils_cxt.trackedMemChunks;
t_thrd.utils_cxt.trackedBytes -= toBeFreed;
t_thrd.utils_cxt.basedBytesInQueryLifeCycle -= toBeFreed;
tb = t_thrd.utils_cxt.trackedBytes;
} else {
tc = u_sess->stat_cxt.trackedMemChunks;
u_sess->stat_cxt.trackedBytes -= toBeFreed;
tb = u_sess->stat_cxt.trackedBytes;
t_thrd.utils_cxt.basedBytesInQueryLifeCycle -= toBeFreed;
}
if (type != MEM_SHRD) {
tb = tb + eliminateErrorsMemoryBytes;
}
int newszChunk = (uint64)tb >> chunkSizeInBits;
if (newszChunk < tc) {
int reduction = tc - newszChunk;
memTracker_ReleaseMemChunks<type>(reduction);
if (type == MEM_SHRD)
(void)pg_atomic_sub_fetch_u32((volatile uint32*)&shareTrackedMemChunks, reduction);
else {
if (type == MEM_THRD)
t_thrd.utils_cxt.trackedMemChunks = newszChunk;
else
u_sess->stat_cxt.trackedMemChunks = newszChunk;
(void)pg_atomic_sub_fetch_u32((volatile uint32*)&dynmicTrackedMemChunks, reduction);
}
int total = shareTrackedMemChunks + dynmicTrackedMemChunks;
gs_lock_test_and_set(&processMemInChunks, total);
}
}
int getSessionMemoryUsageMB()
{
int used = 0;
if (t_thrd.shemem_ptr_cxt.mySessionMemoryEntry) {
used = (unsigned int)(t_thrd.shemem_ptr_cxt.mySessionMemoryEntry->queryMemInChunks -
t_thrd.shemem_ptr_cxt.mySessionMemoryEntry->initMemInChunks)
<< (chunkSizeInBits - BITS_IN_MB);
}
return used;
}
* Memory allocation for sz bytes. If memory quota is enabled, it uses gs_malloc_internal to
* reserve the chunk and allocate memory.
*/
template <MemType mem_type>
void* MemoryProtectFunctions::gs_memprot_malloc(Size sz, bool needProtect)
{
if (!t_thrd.utils_cxt.gs_mp_inited)
return malloc(sz);
void* ptr = NULL;
bool status = memTracker_ReserveMem<mem_type>(sz, needProtect);
if (status == true) {
ptr = malloc(sz);
if (ptr == NULL) {
memTracker_ReleaseMem<mem_type>(sz);
gs_memprot_failed<false>(sz, mem_type);
return NULL;
}
return ptr;
}
gs_memprot_failed<true>(sz, mem_type);
return NULL;
}
template <MemType mem_type>
void MemoryProtectFunctions::gs_memprot_free(void* ptr, Size sz)
{
free(ptr);
ptr = NULL;
if (t_thrd.utils_cxt.gs_mp_inited)
memTracker_ReleaseMem<mem_type>(sz);
}
template <MemType mem_type>
void* MemoryProtectFunctions::gs_memprot_realloc(void* ptr, Size sz, Size newsz, bool needProtect)
{
Assert(GS_MP_INITED);
void* ret = NULL;
if ((newsz > 0) && memTracker_ReserveMem<mem_type>(newsz, needProtect)) {
memTracker_ReleaseMem<mem_type>(sz);
ret = realloc(ptr, newsz);
if (ret == NULL) {
memTracker_ReleaseMem<mem_type>(newsz);
gs_memprot_failed<false>(newsz, mem_type);
return NULL;
}
return ret;
}
gs_memprot_failed<true>(newsz, mem_type);
return NULL;
}
template <MemType mem_type>
int MemoryProtectFunctions::gs_posix_memalign(void** memptr, Size alignment, Size sz, bool needProtect)
{
if (!t_thrd.utils_cxt.gs_mp_inited)
return posix_memalign(memptr, alignment, sz);
int ret = 0;
bool status = memTracker_ReserveMem<mem_type>(sz, needProtect);
if (status == true) {
ret = posix_memalign(memptr, alignment, sz);
if (ret) {
memTracker_ReleaseMem<mem_type>(sz);
gs_memprot_failed<false>(sz, mem_type);
return ret;
}
return ret;
}
gs_memprot_failed<true>(sz, mem_type);
return ENOMEM;
}
* reseve memory for mmap of compressed table
* @tparam mem_type MEM_SHRD is supported only
* @param sz reserved size(bytes)
* @param needProtect
* @return success or not
*/
template <MemType type>
bool MemoryProtectFunctions::gs_memprot_reserve(Size sz, bool needProtect)
{
if (type != MEM_SHRD) {
return false;
}
return memTracker_ReserveMem<type>(sz, needProtect);
}
* release the momery allocated by gs_memprot_reserve
* @tparam type MEM_SHRD is supported only
* @param sz free size(bytes)
*/
template <MemType type>
void MemoryProtectFunctions::gs_memprot_release(Size sz)
{
if (type != MEM_SHRD) {
return;
}
memTracker_ReleaseMem<type>(sz);
}
void gs_memprot_thread_init(void)
{
if (maxChunksPerProcess) {
t_thrd.utils_cxt.gs_mp_inited = true;
}
}
void gs_memprot_reserved_backend(int avail_mem)
{
int reserved_mem = 0;
const int wal_thread_count = 5;
int reserved_thread_count = g_instance.attr.attr_network.ReservedBackends +
NUM_CMAGENT_PROCS + wal_thread_count +
NUM_DCF_CALLBACK_PROCS +
NUM_DMS_CALLBACK_PROCS +
g_instance.attr.attr_storage.max_wal_senders;
reserved_mem += reserved_thread_count * 10;
ereport(LOG, (errmsg("reserved memory for backend threads is: %d MB", reserved_mem)));
int64 wal_mem = 0;
int64 data_sender_size = 1 + sizeof(DataPageMessageHeader) + WS_MAX_SEND_SIZE;
wal_mem += data_sender_size * g_instance.attr.attr_storage.max_wal_senders;
int64 wal_sender_size = 1 + sizeof(WalDataMessageHeader) + (int)WS_MAX_SEND_SIZE;
wal_mem += wal_sender_size * g_instance.attr.attr_storage.max_wal_senders;
int64 wal_receiver_size = g_instance.attr.attr_storage.WalReceiverBufSize * 1024;
wal_mem += offsetof(WalRcvCtlBlock, walReceiverBuffer) + wal_receiver_size;
ereport(LOG, (errmsg("reserved memory for WAL buffers is: %ld MB", wal_mem >> BITS_IN_MB)));
reserved_mem += (uint64)wal_mem >> BITS_IN_MB;
backendReservedMemInChunk = reserved_mem;
maxChunksPerProcess = ((unsigned int)avail_mem >> BITS_IN_KB) - reserved_mem;
ereport(LOG, (errmsg("Set max backend reserve memory is: %d MB, max dynamic memory is: %d MB",
backendReservedMemInChunk, maxChunksPerProcess)));
}
void gs_memprot_init(Size size)
{
if (g_instance.attr.attr_memory.enable_memory_limit && maxChunksPerProcess == 0) {
maxSharedMemory = size >> BITS_IN_MB;
#ifdef ENABLE_MULTIPLE_NODES
Assert(g_instance.attr.attr_sql.udf_memory_limit >= UDF_DEFAULT_MEMORY);
int udf_memory = g_instance.attr.attr_sql.udf_memory_limit - UDF_DEFAULT_MEMORY;
#else
int udf_memory = 0;
#endif
int avail_mem = g_instance.attr.attr_memory.max_process_memory - g_instance.attr.attr_storage.cstore_buffers -
#ifdef ENABLE_HTAP
g_instance.attr.attr_memory.max_imcs_cache -
#endif
udf_memory - (size >> BITS_IN_KB);
if (avail_mem < MIN_PROCESS_LIMIT) {
ereport(WARNING,
(errmsg(
"Failed to initialize the memory protect for "
"g_instance.attr.attr_storage.cstore_buffers (%d Mbytes) or shared memory (%lu Mbytes) is larger.",
g_instance.attr.attr_storage.cstore_buffers >> BITS_IN_KB,
size >> BITS_IN_MB)));
backendReservedMemInChunk = 0;
maxChunksPerProcess = 0;
return;
}
gs_memprot_reserved_backend(avail_mem);
comm_original_memory = (g_instance.attr.attr_network.comm_usable_memory >> BITS_IN_KB);
ereport(LOG,
(errmsg("shared memory %lu Mbytes, memory context %d Mbytes, max process memory %d Mbytes",
(size >> BITS_IN_MB),
(maxChunksPerProcess + backendReservedMemInChunk),
(g_instance.attr.attr_memory.max_process_memory >> BITS_IN_KB))));
if (maxChunksPerProcess < (MIN_PROCESS_LIMIT >> BITS_IN_KB)) {
backendReservedMemInChunk = 0;
maxChunksPerProcess = 0;
t_thrd.utils_cxt.gs_mp_inited = false;
} else {
t_thrd.utils_cxt.gs_mp_inited = true;
}
#ifdef ENABLE_HTAP
double percent = g_instance.attr.attr_memory.htap_borrow_mem_percent / 100;
double pre_occupy = (double)g_instance.attr.attr_memory.max_imcs_cache * percent;
#else
double pre_occupy = 0;
#endif
g_instance.attr.attr_memory.avail_borrow_mem = g_instance.attr.attr_memory.max_borrow_memory - pre_occupy;
if (g_instance.attr.attr_memory.avail_borrow_mem <= 0) {
ereport(WARNING, (errmsg("max_imcs_cache * htap_borrow_mem_percent is larger than max_borrow_memory")));
}
}
}
void gs_memprot_process_gpu_memory(uint32 size)
{
uint32 i = size >> (chunkSizeInBits - BITS_IN_MB);
if (t_thrd.utils_cxt.gs_mp_inited == true) {
uint32 remain_size = maxChunksPerProcess >> 1;
if (remain_size < i) {
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("Failed to initilize the memory(%uM) of search server, "
"maybe it exceed the half of maxChunksPerProcess(%dM).",
i,
maxChunksPerProcess)));
}
maxChunksPerProcess -= i;
}
}