*
* proc.h
* per-process shared memory data structures
*
*
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/storage/proc.h
*
* -------------------------------------------------------------------------
*/
#ifndef _PROC_H_
#define _PROC_H_
#include "access/clog.h"
#include "access/xlog.h"
#include "datatype/timestamp.h"
#include "storage/latch.h"
#include "storage/lock/lock.h"
#include "storage/lock/pg_sema.h"
#include "threadpool/threadpool.h"
#include "replication/dataqueuedefs.h"
#include "utils/syscall_lock.h"
#include "pgtime.h"
#include "gtm/gtm_c.h"
#include "alarm/alarm.h"
#include "utils/atomic.h"
#include "utils/snapshot.h"
#include "access/multi_redo_settings.h"
#include "c.h"
* Each backend advertises up to PGPROC_MAX_CACHED_SUBXIDS TransactionIds
* for non-aborted subtransactions of its current top transaction. These
* have to be treated as running XIDs by other backends.
*
* We also keep track of whether the cache overflowed (ie, the transaction has
* generated at least one subtransaction that didn't fit in the cache).
* If none of the caches have overflowed, we can assume that an XID that's not
* listed anywhere in the PGPROC array is not a running transaction. Else we
* have to look at pg_subtrans.
*/
#define PGPROC_MAX_CACHED_SUBXIDS 64
#define PGPROC_INIT_CACHED_SUBXIDS 64
struct XidCache {
int maxNumber;
TransactionId* xids;
};
#define PROC_IS_AUTOVACUUM 0x01
#define PROC_IN_VACUUM 0x02
#define PROC_IN_ANALYZE 0x04
#define PROC_VACUUM_STATE_MASK (PROC_IN_VACUUM | PROC_IN_ANALYZE)
* Flags reused to mark data redistribution xact at online expansion time
* we do not want to introduce a new field in PGXACT for data
* redistribution which increases the sizeof(PGXACT) and possiblely make
* it not fit into CPU cacheline. Please see the comments below for PGXACT
*/
#define PROC_IS_REDIST 0x10
#define PROC_IN_LOGICAL_DECODING 0x20
#define XACT_NOT_IN_USE 0
#define XACT_IN_USE 1
* We allow a small number of "weak" relation locks (AccesShareLock,
* RowShareLock, RowExclusiveLock) to be recorded in the PGPROC structure
* rather than the main lock table. This eases contention on the lock
* manager LWLocks. See storage/lmgr/README for additional details.
*/
#define FP_LOCK_SLOTS_PER_BACKEND ((uint32)g_instance.attr.attr_storage.num_internal_lock_partitions[FASTPATH_PART])
#define FP_LOCK_SLOTS_PER_LOCKBIT 20
#define FP_LOCKBIT_NUM (((FP_LOCK_SLOTS_PER_BACKEND - 1) / FP_LOCK_SLOTS_PER_LOCKBIT) + 1)
#define FAST_PATH_SET_LOCKBITS_ZERO(proc) \
do { \
for (uint32 _idx = 0; _idx < FP_LOCKBIT_NUM; _idx++) { \
(proc)->fpLockBits[_idx] = 0; \
} \
} while (0)
typedef struct FastPathTag {
uint32 dbid;
uint32 relid;
uint32 partitionid;
} FastPathTag;
#define FAST_PATH_TAG_EQUALS(tag1, tag2) \
(((tag1).dbid == (tag2).dbid) && ((tag1).relid == (tag2).relid) && ((tag1).partitionid == (tag2).partitionid))
* An invalid pgprocno. Must be larger than the maximum number of PGPROC
* structures we could possibly have. See comments for MAX_BACKENDS.
*/
#define INVALID_PGPROCNO PG_INT32_MAX
* Each backend has a PGPROC struct in shared memory. There is also a list of
* currently-unused PGPROC structs that will be reallocated to new backends.
*
* links: list link for any list the PGPROC is in. When waiting for a lock,
* the PGPROC is linked into that lock's waitProcs queue. A recycled PGPROC
* is linked into ProcGlobal's freeProcs list.
*
* Note: twophase.c also sets up a dummy PGPROC struct for each currently
* prepared transaction. These PGPROCs appear in the ProcArray data structure
* so that the prepared transactions appear to be still running and are
* correctly shown as holding locks. A prepared transaction PGPROC can be
* distinguished from a real one at need by the fact that it has pid == 0.
* The semaphore and lock-activity fields in a prepared-xact PGPROC are unused,
* but its myProcLocks[] lists are valid.
*/
struct PGPROC {
SHM_QUEUE links;
PGSemaphoreData sem;
int waitStatus;
Latch procLatch;
LocalTransactionId lxid;
* being executed by this proc, if running;
* else InvalidLocalTransactionId */
TransactionId snapXmax;
* getting our snapshot. */
CommitSeqNo snapCSN;
CommitSeqNo commitCSN;
XLogRecPtr exrto_read_lsn;
XLogRecPtr exrto_min;
TimestampTz exrto_gen_snap_time;
* While in hot standby mode, shows that a conflict signal has been sent
* for the current transaction. Set/cleared while holding ProcArrayLock,
* though not required. Accessed without lock, if needed.
*/
bool recoveryConflictPending;
LWLock* subxidsLock;
struct XidCache subxids;
ThreadId pid;
* session id in mySessionMemoryEntry
* stream works share SessionMemoryEntry with their parent sessions,
* so sessMemorySessionid is their parent's as well.
*/
ThreadId sessMemorySessionid;
uint64 sessionid;
GlobalSessionId globalSessionId;
int logictid;
TransactionId gtt_session_frozenxid;
int pgprocno;
int nodeno;
int backendSlot;
BackendId backendId;
Oid databaseId;
Oid roleId;
uint32 workingVersionNum;
bool lwWaiting;
uint8 lwWaitMode;
bool lwIsVictim;
dlist_node lwWaitLink;
LOCK* waitLock;
PROCLOCK* waitProcLock;
LOCKMODE waitLockMode;
LOCKMASK heldLocks;
* lock object by this backend */
* Info to allow us to wait for synchronous replication, if needed.
* waitLSN is InvalidXLogRecPtr if not waiting; set only by user backend.
* syncRepState must not be touched except by owning process or WALSender.
* syncRepLinks used only while holding SyncRepLock.
*/
XLogRecPtr waitLSN;
int syncRepState;
bool syncRepInCompleteQueue;
SHM_QUEUE syncRepLinks;
XLogRecPtr syncSetConfirmedLSN;
XLogRecPtr waitPaxosLSN;
int syncPaxosState;
SHM_QUEUE syncPaxosLinks;
DataQueuePtr waitDataSyncPoint;
int dataSyncRepState;
SHM_QUEUE dataSyncRepLinks;
MemoryContext topmcxt;
char myProgName[64];
pg_time_t myStartTime;
syscalllock deleMemContextMutex;
int64* usedMemory;
bool procArrayGroupMember;
pg_atomic_uint32 procArrayGroupNext;
* latest transaction id among the transaction's main XID and
* subtransactions
*/
TransactionId procArrayGroupMemberXid;
int procArrayGroupSubXactNXids;
TransactionId* procArrayGroupSubXactXids;
* lastestXid amoung subtransaction's xid and it's committed children's,
* which can be detemined whether the group member is a subtransaction or
* transaction. procArrayGroupSubXactLatestXid != 0 only when the group
* memeber is subtransaction.
*/
TransactionId procArrayGroupSubXactLatestXid;
bool snapshotGroupMember;
pg_atomic_uint32 snapshotGroupNext;
volatile Snapshot snapshotGroup;
TransactionId xminGroup;
TransactionId xmaxGroup;
TransactionId globalxminGroup;
volatile TransactionId replicationSlotXminGroup;
volatile TransactionId replicationSlotCatalogXminGroup;
bool clogGroupMember;
pg_atomic_uint32 clogGroupNext;
TransactionId clogGroupMemberXid;
CLogXidStatus clogGroupMemberXidStatus;
* group member */
int64 clogGroupMemberPage;
* transaction id of clog group member */
XLogRecPtr clogGroupMemberLsn;
* group member */
#ifdef __aarch64__
bool xlogGroupMember;
pg_atomic_uint32 xlogGroupNext;
XLogRecData* xlogGrouprdata;
XLogRecPtr xlogGroupfpw_lsn;
XLogRecPtr* xlogGroupProcLastRecPtr;
XLogRecPtr* xlogGroupXactLastRecEnd;
void* xlogGroupCurrentTransactionState;
XLogRecPtr* xlogGroupRedoRecPtr;
void* xlogGroupLogwrtResult;
XLogRecPtr xlogGroupReturntRecPtr;
TimeLineID xlogGroupTimeLineID;
bool* xlogGroupDoPageWrites;
bool xlogGroupIsFPW;
uint64 snap_refcnt_bitmap;
#endif
bool exrto_reload_cache;
volatile GtmHostIndex my_gtmhost;
GtmHostIndex suggested_gtmhost;
pg_atomic_uint32 signal_cancel_gtm_conn_flag;
LWLock* backendLock;
uint64 *fpLockBits;
FastPathTag *fpRelId;
bool fpVXIDLock;
LocalTransactionId fpLocalTransactionId;
* lock */
PROCLOCK* blockProcLock;
* A knl_thrd_context pointer to find this proc's t_thrd.
*
* NOTE: Only be valid/used for lock waiter now. There is no concurrency risk for other lock
* releaser, who must be holding the sepcified partition lock of lockmgr's shared hash table
* to traverse the lock wait queue, to visit this thread since the lock wait queue would be
* destoryed before thread exits.
*/
void *waitLockThrd;
char *dw_unaligned_buf;
char *dw_buf;
volatile bool flush_new_dw;
volatile int32 dw_pos;
* Support for lock groups. Use LockHashPartitionLockByProc on the group
* leader to get the LWLock protecting these fields.
*/
PGPROC *lockGroupLeader;
dlist_head lockGroupMembers;
dlist_node lockGroupLink;
* All PROCLOCK objects for locks held or awaited by this backend are
* linked into one of these lists, according to the partition number of
* their lock.
*/
SHM_QUEUE myProcLocks[1];
};
#define PGXACT_PAD_OFFSET 55
* Prior to PostgreSQL 9.2, the fields below were stored as part of the
* PGPROC. However, benchmarking revealed that packing these particular
* members into a separate array as tightly as possible sped up GetSnapshotData
* considerably on systems with many CPU cores, by reducing the number of
* cache lines needing to be fetched. Thus, think very carefully before adding
* anything else here.
*/
typedef struct PGXACT {
GTM_TransactionHandle handle;
TransactionId xid;
* executed by this proc, if running and XID
* is assigned; else InvalidTransactionId */
TransactionId prepare_xid;
TransactionId xmin;
* starting our xact, excluding LAZY VACUUM:
* vacuum must not remove tuples deleted by
* xid >= xmin ! */
CommitSeqNo csn_min;
CommitSeqNo csn_dr;
TransactionId next_xid;
int nxids;
uint8 vacuumFlags;
uint32 needToSyncXid;
* In this window, we can get CSN but TransactionIdIsInProgress returns true,
* So we need to sync at this window.
*/
bool delayChkpt;
* previously called InCommit */
CommandId cid;
#ifdef __aarch64__
char padding[PG_CACHE_LINE_SIZE - PGXACT_PAD_OFFSET];
#endif
} PGXACT;
#define PROC_HDR_PAD_OFFSET 112
#define NUM_CMAGENT_PROCS (10)
* There is one ProcGlobal struct for the whole database cluster.
*/
typedef struct PROC_HDR {
PGPROC** allProcs;
PGXACT* allPgXact;
uint32 allProcCount;
uint32 allNonPreparedProcCount;
PGPROC* freeProcs;
PGPROC* externalFreeProcs;
PGPROC* dmsFreeProcs;
PGPROC* autovacFreeProcs;
PGPROC* cmAgentFreeProcs;
PGPROC* cmAgentAllProcs[NUM_CMAGENT_PROCS];
PGPROC* pgjobfreeProcs;
PGPROC* bgworkerFreeProcs;
pg_atomic_uint32 procArrayGroupFirst;
pg_atomic_uint32 snapshotGroupFirst;
Latch* walwriterLatch;
Latch* walwriterauxiliaryLatch;
Latch* checkpointerLatch;
Latch* pgwrMainThreadLatch;
Latch* cbmwriterLatch;
Latch* cbmrealwriterLatch;
volatile Latch* ShareStoragexlogCopyerLatch;
volatile Latch* BarrierPreParseLatch;
int spins_per_delay;
PGPROC* startupProc;
ThreadId startupProcPid;
int startupBufferPinWaitBufId;
#ifdef __aarch64__
char pad[PG_CACHE_LINE_SIZE - PROC_HDR_PAD_OFFSET];
#endif
} PROC_HDR;
* We set aside some extra PGPROC structures for auxiliary processes,
* ie things that aren't full-fledged backends but need shmem access.
*
* Background writer, checkpointer and WAL writer run during normal operation.
* Startup process and WAL receiver also consume 2 slots, but WAL writer is
* launched only after startup has exited, so we only need 4 slots.
*
* PGXC needs another slot for the pool manager process
*/
const int MAX_PAGE_WRITER_THREAD_NUM = 17;
#ifndef ENABLE_LITE_MODE
const int MAX_COMPACTION_THREAD_NUM = 100;
#else
const int MAX_COMPACTION_THREAD_NUM = 10;
#endif
#define NUM_MULTI_AUX_PROC \
(MAX_PAGE_WRITER_THREAD_NUM + \
MAX_RECOVERY_THREAD_NUM + \
g_instance.shmem_cxt.ThreadPoolGroupNum + \
MAX_COMPACTION_THREAD_NUM \
)
#define NUM_AUXILIARY_PROCS (NUM_SINGLE_AUX_PROC + NUM_MULTI_AUX_PROC)
#define CONNINFOLEN (64)
#define NUM_DCF_CALLBACK_PROCS \
(g_instance.attr.attr_storage.dcf_attr.enable_dcf ? \
g_instance.attr.attr_storage.dcf_attr.dcf_max_workers : 0)
#define NUM_DMS_REFORM_CALLLBACK_PROCS (5)
#define NUM_DMS_LSNR_CALLBACK_PROC (1)
#define NUM_DMS_SMON_CALLBACK_PROC (2)
#define NUM_DMS_PARALLEL_CALLBACK_PROC (g_instance.attr.attr_storage.dms_attr.parallel_thread_num <= 1 ? 0 : \
g_instance.attr.attr_storage.dms_attr.parallel_thread_num)
#define NUM_DMS_CKPT_NOTIFY_TASK_RATIO (1.0f / 32)
#define NUM_DMS_CLEAN_EDP_TASK_RATIO (1.0f / 32)
#define NUM_DMS_DERIVED_TASK_RATIO (1.0f / 8)
#define NUM_DMS_RECV_WORK_THREAD_RATIO (1.0f / 4)
#define NUM_DMS_WORK_THREAD_PRIO_0_2 (4)
#define NUM_DMS_WORK_THREAD_PRIO_3 Max(1, (uint32)(NUM_DMS_WORK_THREAD_PROCS * NUM_DMS_CKPT_NOTIFY_TASK_RATIO))
#define NUM_DMS_WORK_THREAD_PRIO_4 Max(1, (uint32)(NUM_DMS_WORK_THREAD_PROCS * NUM_DMS_CLEAN_EDP_TASK_RATIO))
#define NUM_DMS_WORK_THREAD_PRIO_5 Max(1, (uint32)(NUM_DMS_WORK_THREAD_PROCS * NUM_DMS_DERIVED_TASK_RATIO))
#define NUM_DMS_RECV_THREAD_PRIO_0_2 (3)
#define NUM_DMS_RECV_THREAD_PRIO_3 Max(1, (uint32)(NUM_DMS_WORK_THREAD_PRIO_3 * NUM_DMS_RECV_WORK_THREAD_RATIO))
#define NUM_DMS_RECV_THREAD_PRIO_4 Max(1, (uint32)(NUM_DMS_WORK_THREAD_PRIO_4 * NUM_DMS_RECV_WORK_THREAD_RATIO))
#define NUM_DMS_RECV_THREAD_PRIO_5 Max(1, (uint32)(NUM_DMS_WORK_THREAD_PRIO_5 * NUM_DMS_RECV_WORK_THREAD_RATIO))
#define NUM_DMS_RECV_THREAD_PRIO_6 \
Max(1, (uint32)((NUM_DMS_WORK_THREAD_PROCS - NUM_DMS_WORK_THREAD_PRIO_0_2 - \
NUM_DMS_WORK_THREAD_PRIO_3 - NUM_DMS_WORK_THREAD_PRIO_4 - NUM_DMS_WORK_THREAD_PRIO_5) * \
NUM_DMS_RECV_WORK_THREAD_RATIO))
#define NUM_DMS_SENDER_MONITOR_THREAD (1)
#define NUM_DMS_RECV_THREAD_CNT \
(NUM_DMS_RECV_THREAD_PRIO_0_2 + NUM_DMS_RECV_THREAD_PRIO_3 + NUM_DMS_RECV_THREAD_PRIO_4 + \
NUM_DMS_RECV_THREAD_PRIO_5 + NUM_DMS_RECV_THREAD_PRIO_6 + NUM_DMS_SENDER_MONITOR_THREAD)
#define NUM_DMS_MAX_WORK_THREAD_PROCS (g_instance.attr.attr_storage.dms_attr.work_thread_pool_max_cnt)
#define NUM_DMS_WORK_SCHEDULER_PROC (1)
#define NUM_DMS_RDMA_THREAD_PROCS (g_instance.attr.attr_storage.dms_attr.work_thread_pool_max_cnt != 0 ? \
NUM_DMS_MAX_WORK_THREAD_PROCS * 2 + NUM_DMS_WORK_SCHEDULER_PROC : \
g_instance.attr.attr_storage.dms_attr.work_thread_count * 2)
#define NUM_DMS_WORK_THREAD_PROCS (g_instance.attr.attr_storage.dms_attr.work_thread_pool_max_cnt != 0 ? \
NUM_DMS_MAX_WORK_THREAD_PROCS + NUM_DMS_WORK_SCHEDULER_PROC : \
g_instance.attr.attr_storage.dms_attr.work_thread_count)
#define NUM_DMS_CALLBACK_PROCS \
(g_instance.attr.attr_storage.dms_attr.enable_dms ? \
(NUM_DMS_RECV_THREAD_CNT + \
((!strcasecmp(g_instance.attr.attr_storage.dms_attr.interconnect_type, "TCP"))? \
NUM_DMS_WORK_THREAD_PROCS : NUM_DMS_RDMA_THREAD_PROCS) + \
NUM_DMS_LSNR_CALLBACK_PROC + \
NUM_DMS_SMON_CALLBACK_PROC + \
NUM_DMS_PARALLEL_CALLBACK_PROC + \
NUM_DMS_REFORM_CALLLBACK_PROCS ) : 0)
#define GLOBAL_ALL_PROCS \
(g_instance.shmem_cxt.MaxBackends + \
NUM_CMAGENT_PROCS + NUM_AUXILIARY_PROCS + NUM_DCF_CALLBACK_PROCS + \
NUM_DMS_CALLBACK_PROCS + \
(g_instance.attr.attr_storage.max_prepared_xacts * NUM_TWOPHASE_PARTITIONS))
#define GLOBAL_MAX_SESSION_NUM (2 * g_instance.shmem_cxt.MaxBackends)
#define GLOBAL_RESERVE_SESSION_NUM (g_instance.shmem_cxt.MaxReserveBackendId)
#define MAX_SESSION_SLOT_COUNT (GLOBAL_MAX_SESSION_NUM + GLOBAL_RESERVE_SESSION_NUM)
#define MAX_BACKEND_SLOT \
(g_instance.attr.attr_common.enable_thread_pool ? MAX_SESSION_SLOT_COUNT : g_instance.shmem_cxt.MaxBackends)
#define MAX_SESSION_TIMEOUT 24 * 60 * 60
#define BackendStatusArray_size (MAX_BACKEND_SLOT + NUM_AUXILIARY_PROCS)
#define GSC_MAX_BACKEND_SLOT (g_instance.shmem_cxt.MaxBackends + MAX_SESSION_SLOT_COUNT)
extern AlarmCheckResult ConnectionOverloadChecker(Alarm* alarm, AlarmAdditionalParam* additionalParam);
* Function Prototypes
*/
extern int ProcGlobalSemas(void);
extern Size ProcGlobalShmemSize(void);
extern void InitNuma(void);
extern void InitProcGlobal(void);
extern void InitProcess(void);
extern void InitProcessPhase2(void);
extern void InitAuxiliaryProcess(void);
extern void ProcBaseLockAcquire(pthread_mutex_t *procBaseLock);
extern void ProcBaseLockRelease(pthread_mutex_t *procBaseLock);
extern int GetAuxProcEntryIndex(int baseIdx);
extern void PublishStartupProcessInformation(void);
extern bool HaveNFreeProcs(int n);
extern void ProcReleaseLocks(bool isCommit);
extern int GetUsedConnectionCount(void);
extern int GetUsedInnerToolConnCount(void);
extern void ProcQueueInit(PROC_QUEUE* queue);
extern int ProcSleep(LOCALLOCK* locallock, LockMethod lockMethodTable, bool allow_con_update, int waitSec);
extern PGPROC* ProcWakeup(PGPROC* proc, int waitStatus);
extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK* lock, const PROCLOCK* proclock = NULL);
extern void ProcBlockerUpdate(PGPROC *waiterProc, PROCLOCK *blockerProcLock, const char* lockMode, bool isLockHolder);
extern bool IsWaitingForLock(void);
extern void LockErrorCleanup(void);
extern void ProcWaitForSignal(void);
extern void ProcSendSignal(ThreadId pid);
extern TimestampTz GetStatementFinTime();
extern bool enable_sig_alarm(int delayms, bool is_statement_timeout);
extern bool enable_lockwait_sig_alarm(int delayms);
extern bool enable_session_sig_alarm(int delayms);
extern bool enable_idle_in_transaction_session_sig_alarm(int delayms);
extern bool enable_query_plan_sig_alarm(int delayms);
extern bool disable_session_sig_alarm(void);
extern bool disable_idle_in_transaction_session_sig_alarm(void);
extern bool disable_sig_alarm(bool is_statement_timeout, int waitSec = 0);
extern bool pause_sig_alarm(bool is_statement_timeout);
extern bool resume_sig_alarm(bool is_statement_timeout);
extern void handle_sig_alarm(SIGNAL_ARGS);
extern bool enable_standby_sig_alarm(TimestampTz now, TimestampTz fin_time, bool deadlock_only);
extern bool disable_standby_sig_alarm(void);
extern void handle_standby_sig_alarm(SIGNAL_ARGS);
extern ThreadId getThreadIdFromLogicThreadId(int logictid);
extern int getLogicThreadIdFromThreadId(ThreadId tid);
extern bool IsRedistributionWorkerProcess(void);
extern void PgStatCMAThreadStatus();
void CancelBlockedRedistWorker(LOCK* lock, LOCKMODE lockmode);
extern void BecomeLockGroupLeader(void);
extern void BecomeLockGroupMember(PGPROC *leader);
extern int GetThreadPoolStreamProcNum(void);
#endif