*
* misc.c
*
*
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/utils/adt/misc.c
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include <sys/file.h>
#include <signal.h>
#include <dirent.h>
#include <math.h>
#include "access/sysattr.h"
#include "catalog/catalog.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/tablespace.h"
#include "commands/user.h"
#include "funcapi.h"
#include "instruments/gs_stat.h"
#include "miscadmin.h"
#include "parser/keywords.h"
#include "pgstat.h"
#include "postmaster/syslogger.h"
#include "rewrite/rewriteHandler.h"
#include "replication/replicainternal.h"
#include "storage/smgr/fd.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/file/fio_device.h"
#include "utils/lsyscache.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/timestamp.h"
#include "gssignal/gs_signal.h"
#include "workload/workload.h"
#ifdef PGXC
#include "pgxc/pgxc.h"
#endif
extern void cancel_backend(ThreadId pid);
#define atooid(x) ((Oid)strtoul((x), NULL, 10))
* current_database()
* Expose the current database to the user
*/
Datum current_database(PG_FUNCTION_ARGS)
{
Name db;
db = (Name)palloc(NAMEDATALEN);
namestrcpy(db, get_and_check_db_name(u_sess->proc_cxt.MyDatabaseId, true));
PG_RETURN_NAME(db);
}
* current_query()
* Expose the current query to the user (useful in stored procedures)
* We might want to use ActivePortal->sourceText someday.
*/
Datum current_query(PG_FUNCTION_ARGS)
{
if (t_thrd.postgres_cxt.debug_query_string) {
char *mask_string = maskPassword(t_thrd.postgres_cxt.debug_query_string);
if (mask_string == NULL) {
mask_string = (char *)t_thrd.postgres_cxt.debug_query_string;
}
PG_RETURN_TEXT_P(cstring_to_text(mask_string));
} else {
PG_RETURN_NULL();
}
}
* Send a signal to another backend.
*
* The signal is delivered for users with sysadmin privilege or the member of gs_role_signal_backend role
* or the owner of the database or the same user as the backend being signaled.
* For "dangerous" signals, an explicit check for superuser needs to be done prior to calling this function.
*
* Returns 0 on success, 1 on general failure, and 2 on permission error.
* In the event of a general failure (return code 1), a warning message will
* be emitted. For permission errors, doing that is the responsibility of
* the caller.
*/
#define SIGNAL_BACKEND_SUCCESS 0
#define SIGNAL_BACKEND_ERROR 1
#define SIGNAL_BACKEND_NOPERMISSION 2
static int pg_signal_backend(ThreadId pid, int sig, bool checkPermission)
{
PGPROC* proc = NULL;
if (checkPermission && !superuser()) {
* Since the user is not superuser, check for matching roles. Trust
* that BackendPidGetProc will return NULL if the pid isn't valid,
* even though the check for whether it's a backend process is below.
* The IsBackendPid check can't be relied on as definitive even if it
* was first. The process might end between successive checks
* regardless of their order. There's no way to acquire a lock on an
* arbitrary process to prevent that. But since so far all the callers
* of this mechanism involve some request for ending the process
* anyway, that it might end on its own first is not a problem.
*/
proc = BackendPidGetProc(pid);
if (proc == NULL) {
return SIGNAL_BACKEND_NOPERMISSION;
} else {
bool rolePermission = is_member_of_role(GetUserId(), DEFAULT_ROLE_SIGNAL_BACKENDID) &&
(proc->roleId != BOOTSTRAP_SUPERUSERID && !is_role_persistence(proc->roleId));
if (!pg_database_ownercheck(proc->databaseId, u_sess->misc_cxt.CurrentUserId) &&
proc->roleId != GetUserId() && !rolePermission) {
return SIGNAL_BACKEND_NOPERMISSION;
}
}
}
if (!IsBackendPid(pid)) {
* This is just a warning so a loop-through-resultset will not abort
* if one backend terminated on its own during the run.
*/
ereport(WARNING, (errmsg("PID %lu is not a gaussdb server thread", pid)));
return SIGNAL_BACKEND_ERROR;
}
* Can the process we just validated above end, followed by the pid being
* recycled for a new process, before reaching here? Then we'd be trying
* to kill the wrong thing. Seems near impossible when sequential pid
* assignment and wraparound is used. Perhaps it could happen on a system
* where pid re-use is randomized. That race condition possibility seems
* too unlikely to worry about.
*/
if (gs_signal_send(pid, sig)) {
ereport(WARNING, (errmsg("could not send signal to process %lu: %m", pid)));
return SIGNAL_BACKEND_ERROR;
}
return SIGNAL_BACKEND_SUCCESS;
}
uint64 get_query_id_beentry(ThreadId tid)
{
uint64 queryId = 0;
PgBackendStatus* beentry = t_thrd.shemem_ptr_cxt.BackendStatusArray + BackendStatusArray_size - 1;
PgBackendStatus* localentry = (PgBackendStatus*)palloc0(sizeof(PgBackendStatus));
* We go through BackendStatusArray from back to front, because
* in thread pool mode, both an active session and its thread pool worker
* will have the same procpid in their entry and in most cases what we want
* is session's entry, which will be returned first in such searching direction.
*/
for (int i = 1; i <= BackendStatusArray_size; i++) {
* Follow the protocol of retrying if st_changecount changes while we
* copy the entry, or if it's odd. (The check for odd is needed to
* cover the case where we are able to completely copy the entry while
* the source backend is between increment steps.) We use a volatile
* pointer here to ensure the compiler doesn't try to get cute.
*/
if (gs_stat_encap_status_info(localentry, beentry)) {
if (localentry->st_procpid == tid) {
queryId = localentry->st_queryid;
break;
}
}
beentry--;
}
pfree_ext(localentry->st_appname);
pfree_ext(localentry->st_clienthostname);
pfree_ext(localentry->st_conninfo);
pfree_ext(localentry->st_activity);
pfree(localentry);
return queryId;
}
* Signal to cancel a backend process.
* This is allowed if you have sysadmin privilege or have the same user as the process being canceled
* or you are the member of gs_role_signal_backend role or the owner of the database.
*/
Datum pg_cancel_backend(PG_FUNCTION_ARGS)
{
int r = 0;
ThreadId tid = PG_GETARG_INT64(0);
* It is forbidden to kill backend in the online expansion to protect
* the lock session from being interrupted by external applications.
*/
if (u_sess->attr.attr_sql.enable_online_ddl_waitlock && !u_sess->attr.attr_common.xc_maintenance_mode)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("kill backend is prohibited during online expansion."))));
r = pg_signal_backend(tid, SIGINT, true);
if (r == SIGNAL_BACKEND_NOPERMISSION) {
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must have sysadmin privilege or a member of the gs_role_signal_backend role or the "
"owner of the database or the same user to cancel queries running in other server processes"))));
}
if (t_thrd.proc && t_thrd.proc->workingVersionNum >= 92060) {
uint64 query_id = get_query_id_beentry(tid);
(void)gs_close_all_stream_by_debug_id(query_id);
}
PG_RETURN_BOOL(r == SIGNAL_BACKEND_SUCCESS);
}
Datum pg_cancel_session(PG_FUNCTION_ARGS)
{
int r = -1;
ThreadId tid = PG_GETARG_INT64(0);
uint64 sid = PG_GETARG_INT64(1);
if (tid == sid) {
if (u_sess->attr.attr_sql.enable_online_ddl_waitlock && !u_sess->attr.attr_common.xc_maintenance_mode)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("kill backend is prohibited during online expansion."))));
r = pg_signal_backend(tid, SIGINT, true);
if (r == SIGNAL_BACKEND_NOPERMISSION)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must be system admin or have the same role to cancel queries running in other server "
"processes"))));
if (t_thrd.proc && t_thrd.proc->workingVersionNum >= 92060) {
uint64 query_id = get_query_id_beentry(tid);
(void)gs_close_all_stream_by_debug_id(query_id);
}
} else if (ENABLE_THREAD_POOL) {
ThreadPoolSessControl *sess_ctrl = g_threadPoolControler->GetSessionCtrl();
int ctrl_idx = sess_ctrl->FindCtrlIdxBySessId(sid);
r = sess_ctrl->SendSignal((int)ctrl_idx, SIGINT);
}
PG_RETURN_BOOL(r == 0);
}
* Signal to cancel queries running in backends connected to demoted GTM.
* This is allowed if you are a superuser.
*/
Datum pg_cancel_invalid_query(PG_FUNCTION_ARGS)
{
#ifndef ENABLE_MULTIPLE_NODES
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("unsupported proc in single node mode.")));
PG_RETURN_NULL();
#else
if (!superuser()) {
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must be system admin to cancel invalid queries running in all server processes"))));
PG_RETURN_BOOL(false);
} else {
proc_cancel_invalid_gtm_lite_conn();
PG_RETURN_BOOL(true);
}
#endif
}
int kill_backend(ThreadId tid, bool checkPermission)
{
* It is forbidden to kill backend in the online expansion to protect
* the lock session from being interrupted by external applications.
*/
if (u_sess->attr.attr_sql.enable_online_ddl_waitlock && !u_sess->attr.attr_common.xc_maintenance_mode)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("kill backend is prohibited during online expansion."))));
int r = pg_signal_backend(tid, SIGTERM, checkPermission);
if (r == SIGNAL_BACKEND_NOPERMISSION) {
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must have sysadmin privilege or a member of the gs_role_signal_backend role or "
"the owner of the database or the same user to terminate other backend"))));
}
if (t_thrd.proc && t_thrd.proc->workingVersionNum >= 92060) {
uint64 query_id = get_query_id_beentry(tid);
(void)gs_close_all_stream_by_debug_id(query_id);
}
return r;
}
* Signal to terminate a backend process.
* This is allowed for users have sysadmin privilege or a member of the gs_role_signal_backend role or
* the owner of the database or the same user as the process being terminated.
*/
Datum pg_terminate_backend(PG_FUNCTION_ARGS)
{
ThreadId tid = PG_GETARG_INT64(0);
int r = kill_backend(tid, true);
PG_RETURN_BOOL(r == SIGNAL_BACKEND_SUCCESS);
}
Datum pg_terminate_session(PG_FUNCTION_ARGS)
{
ThreadId tid = PG_GETARG_INT64(0);
uint64 sid = PG_GETARG_INT64(1);
int r = -1;
if (tid == sid) {
r = kill_backend(tid, true);
} else if (ENABLE_THREAD_POOL) {
ThreadPoolSessControl *sess_ctrl = g_threadPoolControler->GetSessionCtrl();
int ctrl_idx = sess_ctrl->FindCtrlIdxBySessId(sid);
r = sess_ctrl->SendSignal((int)ctrl_idx, SIGTERM);
}
PG_RETURN_BOOL(r == 0);
}
Datum pg_terminate_active_session_socket(PG_FUNCTION_ARGS)
{
ThreadId tid = PG_GETARG_INT64(0);
uint64 sid = PG_GETARG_INT64(1);
if (tid <= 0 || sid <= 0) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Invalid params, tid or sessionid must have real number")));
}
if (!initialuser()) {
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be initial user can use this function")));
}
ereport(LOG, (errmsg("pg_terminate_active_session_socket: tid %lu and sessionid %lu", tid, sid)));
if (ENABLE_THREAD_POOL && tid != sid) {
int count = g_threadPoolControler->GetSessionCtrl()->terminate_session_socket(tid, sid);
if (count == 0) {
ereport(WARNING, (errmsg("tid %lu and sessionid %lu do not match with valid active session", tid, sid)));
PG_RETURN_BOOL(false);
}
}
ProcArrayStruct *array_ptr = g_instance.proc_array_idx;
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
for (int index = 0; index < array_ptr->numProcs; index++) {
int pg_proc_no = array_ptr->pgprocnos[index];
volatile PGPROC *proc = g_instance.proc_base_all_procs[pg_proc_no];
VirtualTransactionId vxid;
GET_VXID_FROM_PGPROC(vxid, *proc);
if (proc->pid != tid) {
continue;
}
(void)SendProcSignal(tid, PROCSIG_COMM_CLOSE_ACTIVE_SESSION_SOCKET, vxid.backendId);
ereport(LOG,
(errmsg("pg_terminate_active_session_socket: send signal to tid %lu and sessionid %lu", tid, sid)));
break;
}
LWLockRelease(ProcArrayLock);
PG_RETURN_BOOL(true);
}
* function name: pg_wlm_jump_queue
* description : wlm jump the queue with thread id.
*/
Datum pg_wlm_jump_queue(PG_FUNCTION_ARGS)
{
ThreadId tid = PG_GETARG_INT64(0);
if (g_instance.wlm_cxt->dynamic_workload_inited)
dywlm_client_jump_queue(&g_instance.wlm_cxt->MyDefaultNodeGroup.climgr, tid);
else
WLMJumpQueue(&g_instance.wlm_cxt->MyDefaultNodeGroup.parctl, tid);
PG_RETURN_BOOL(true);
}
* function name: gs_wlm_switch_cgroup
* description : wlm switch cgroup to new cgroup with session id.
*/
Datum gs_wlm_switch_cgroup(PG_FUNCTION_ARGS)
{
uint64 sess_id = PG_GETARG_INT64(0);
char* cgroup = PG_GETARG_CSTRING(1);
bool ret = false;
if (g_instance.attr.attr_common.enable_thread_pool) {
ereport(WARNING, (errmsg("Cannot switch control group in thread pool mode.")));
} else if (g_instance.wlm_cxt->gscgroup_init_done > 0) {
if (IS_PGXC_COORDINATOR && !superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("the user have no right to "
"change cgroup for session [%lu]!",
sess_id)));
if (IS_PGXC_COORDINATOR || IsConnFromCoord())
ret = WLMAjustCGroupByCNSessid(&g_instance.wlm_cxt->MyDefaultNodeGroup, sess_id, cgroup);
} else
ereport(WARNING, (errmsg("Switch failed because of cgroup without initialization.")));
PG_RETURN_BOOL(ret);
}
* function name: gs_wlm_node_recover
* description : recover node for dynamic workload.
*/
Datum gs_wlm_node_recover(PG_FUNCTION_ARGS)
{
bool isForce = PG_GETARG_BOOL(0);
int count = 0;
if (!superuser()) {
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser account to recover node for wlm")));
}
while (!WLMIsInfoInit()) {
CHECK_FOR_INTERRUPTS();
++count;
sleep(1);
if (count >= 3) {
ereport(WARNING, (errmsg("node recovering for dynamic workload failed.")));
PG_RETURN_BOOL(true);
}
}
if (!g_instance.wlm_cxt->dynamic_workload_inited) {
ereport(NOTICE, (errmsg("dynamic workload disable, need not recover.")));
} else {
dywlm_node_recover(isForce);
}
PG_RETURN_BOOL(true);
}
* function name: gs_wlm_node_recover
* description : recover node for dynamic workload.
*/
Datum gs_wlm_node_clean(PG_FUNCTION_ARGS)
{
char* nodename = PG_GETARG_CSTRING(0);
bool ret = false;
if (!superuser()) {
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser account to clean node for wlm")));
}
if (!g_instance.wlm_cxt->dynamic_workload_inited) {
ereport(NOTICE, (errmsg("dynamic workload disable, need not clean.")));
} else {
(void)dywlm_server_clean(nodename);
ret = true;
}
PG_RETURN_BOOL(ret);
}
* function name: gs_wlm_rebuild_user_resource_pool
* description : rebuild user and resource pool htab.
*/
Datum gs_wlm_rebuild_user_resource_pool(PG_FUNCTION_ARGS)
{
if (!superuser()) {
aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_PROC,
"gs_wlm_rebuild_user_resource_pool");
}
if (!BuildUserRPHash())
ereport(LOG, (errmsg("call 'gs_wlm_rebuild_user_resource_pool' to build user hash failed")));
else {
ereport(LOG, (errmsg("call 'gs_wlm_rebuild_user_resource_pool' to build user hash finished")));
}
PG_RETURN_BOOL(true);
}
* @Description: readjust user total space
* @IN roleid: user oid
* @Return: success or not
* @See also:
*/
Datum gs_wlm_readjust_user_space(PG_FUNCTION_ARGS)
{
Oid roleid = PG_GETARG_OID(0);
int count = 0;
while (!WLMIsInfoInit()) {
CHECK_FOR_INTERRUPTS();
++count;
sleep(5);
if (count >= 3)
PG_RETURN_TEXT_P(cstring_to_text("Exec Failed"));
}
WLMReadjustAllUserSpace(roleid);
PG_RETURN_TEXT_P(cstring_to_text("Exec Success"));
}
* @Description: readjust user total space through username
* @IN username: user name
* @Return: success or not
* @See also:
*/
Datum gs_wlm_readjust_user_space_through_username(PG_FUNCTION_ARGS)
{
char* username = PG_GETARG_CSTRING(0);
int count = 0;
while (!WLMIsInfoInit()) {
CHECK_FOR_INTERRUPTS();
++count;
sleep(5);
if (count >= 3)
PG_RETURN_TEXT_P(cstring_to_text("Exec Failed"));
}
WLMReadjustUserSpaceThroughAllDbs(username);
PG_RETURN_TEXT_P(cstring_to_text("Exec Success"));
}
* @Description: readjust user total space with reset flag,
* if flag is true, means calculate used space from zero,
* if flag is false, means calculate used space one by one.
* @IN username: user name
* @Return: success or not
* @See also:
*/
Datum gs_wlm_readjust_user_space_with_reset_flag(PG_FUNCTION_ARGS)
{
char* username = PG_GETARG_CSTRING(0);
bool isReset = PG_GETARG_BOOL(1);
int count = 0;
while (!WLMIsInfoInit()) {
CHECK_FOR_INTERRUPTS();
++count;
sleep(5);
if (count >= 3)
PG_RETURN_TEXT_P(cstring_to_text("Exec Failed"));
}
WLMReadjustUserSpaceByNameWithResetFlag(username, isReset);
PG_RETURN_TEXT_P(cstring_to_text("Exec Success"));
}
* function name: gs_get_role_name
* description :get role name with user oid.
*/
Datum gs_get_role_name(PG_FUNCTION_ARGS)
{
Oid roleid = PG_GETARG_OID(0);
char rolname[NAMEDATALEN] = {0};
if (!superuser() && !isMonitoradmin(GetUserId())) {
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied to get role name"),
errhint("Must be system admin or monitor admin can get role name.")));
}
if (OidIsValid(roleid))
(void)GetRoleName(roleid, rolname, sizeof(rolname));
PG_RETURN_TEXT_P(cstring_to_text(rolname));
}
* Signal to reload the database configuration
*/
Datum pg_reload_conf(PG_FUNCTION_ARGS)
{
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("must be system admin to signal the postmaster"))));
if (gs_signal_send(PostmasterPid, SIGHUP)) {
ereport(WARNING, (errmsg("failed to send signal to postmaster: %m")));
PG_RETURN_BOOL(false);
}
PG_RETURN_BOOL(true);
}
* Rotate log file
*/
Datum pg_rotate_logfile(PG_FUNCTION_ARGS)
{
if (!superuser())
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("must be system admin to rotate log files"))));
if (!g_instance.attr.attr_common.Logging_collector) {
ereport(WARNING, (errmsg("rotation not possible because log collection not active")));
PG_RETURN_BOOL(false);
}
SendPostmasterSignal(PMSIGNAL_ROTATE_LOGFILE);
PG_RETURN_BOOL(true);
}
typedef struct {
char* location;
DIR* dirdesc;
} ts_db_fctx;
Datum pg_tablespace_databases(PG_FUNCTION_ARGS)
{
FuncCallContext* funcctx = NULL;
struct dirent* de = NULL;
ts_db_fctx* fctx = NULL;
int location_len, ss_rc;
if (SRF_IS_FIRSTCALL()) {
MemoryContext oldcontext;
Oid tablespaceOid = PG_GETARG_OID(0);
funcctx = SRF_FIRSTCALL_INIT();
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
fctx = (ts_db_fctx*)palloc(sizeof(ts_db_fctx));
if (tablespaceOid == GLOBALTABLESPACE_OID) {
fctx->dirdesc = NULL;
ereport(WARNING, (errmsg("global tablespace never has databases")));
} else {
if (tablespaceOid == DEFAULTTABLESPACE_OID) {
location_len = (int)strlen(DEFTBSDIR) + 1;
fctx->location = (char*)palloc(location_len);
ss_rc = sprintf_s(fctx->location, (size_t)location_len, "%s", DEFTBSDIR);
} else if (ENABLE_DSS) {
location_len = (int)strlen(TBLSPCDIR) + 1 + OIDCHARS + 1 +
(int)strlen(TABLESPACE_VERSION_DIRECTORY) + 1;
fctx->location = (char*)palloc(location_len);
ss_rc = sprintf_s(fctx->location,
location_len,
"%s/%u/%s",
TBLSPCDIR,
tablespaceOid,
TABLESPACE_VERSION_DIRECTORY);
} else {
#ifdef PGXC
location_len = (int)strlen(TBLSPCDIR) + 1 + OIDCHARS + 1 +
(int)strlen(TABLESPACE_VERSION_DIRECTORY) + 1 +
(int)strlen(g_instance.attr.attr_common.PGXCNodeName) + 1;
fctx->location = (char*)palloc(location_len);
ss_rc = sprintf_s(fctx->location,
location_len,
"%s/%u/%s_%s",
TBLSPCDIR,
tablespaceOid,
TABLESPACE_VERSION_DIRECTORY,
g_instance.attr.attr_common.PGXCNodeName);
#else
location_len = (int)strlen(TBLSPCDIR) + 1 + OIDCHARS + 1 +
(int)strlen(TABLESPACE_VERSION_DIRECTORY) + 1;
fctx->location = (char*)palloc(location_len);
ss_rc = sprintf_s(fctx->location,
location_len,
"%s/%u/%s",
TBLSPCDIR,
tablespaceOid,
TABLESPACE_VERSION_DIRECTORY);
#endif
}
securec_check_ss(ss_rc, "\0", "\0");
fctx->dirdesc = AllocateDir(fctx->location);
if (fctx->dirdesc == NULL) {
if (errno != ENOENT)
ereport(ERROR,
(errcode_for_file_access(), errmsg("could not open directory \"%s\": %m", fctx->location)));
ereport(WARNING, (errmsg("%u is not a tablespace OID", tablespaceOid)));
}
}
funcctx->user_fctx = fctx;
MemoryContextSwitchTo(oldcontext);
}
funcctx = SRF_PERCALL_SETUP();
fctx = (ts_db_fctx*)funcctx->user_fctx;
if (fctx->dirdesc == NULL)
SRF_RETURN_DONE(funcctx);
while ((de = ReadDir(fctx->dirdesc, fctx->location)) != NULL) {
char* subdir = NULL;
DIR* dirdesc = NULL;
Oid datOid = atooid(de->d_name);
if (!datOid)
continue;
int sub_len = (int)strlen(fctx->location) + 1 + (int)strlen(de->d_name) + 1;
subdir = (char*)palloc(sub_len);
ss_rc = sprintf_s(subdir, sub_len, "%s/%s", fctx->location, de->d_name);
securec_check_ss(ss_rc, "\0", "\0");
dirdesc = AllocateDir(subdir);
while ((de = ReadDir(dirdesc, subdir)) != NULL) {
if (strcmp(de->d_name, ".") != 0 && strcmp(de->d_name, "..") != 0)
break;
}
FreeDir(dirdesc);
pfree_ext(subdir);
if (de == NULL)
continue;
SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(datOid));
}
FreeDir(fctx->dirdesc);
SRF_RETURN_DONE(funcctx);
}
* pg_tablespace_location - get location for a tablespace
*/
Datum pg_tablespace_location(PG_FUNCTION_ARGS)
{
Oid tablespaceOid = PG_GETARG_OID(0);
char sourcepath[MAXPGPATH];
char targetpath[MAXPGPATH];
int rllen;
* It's useful to apply this function to pg_class.reltablespace, wherein
* zero means "the database's default tablespace". So, rather than
* throwing an error for zero, we choose to assume that's what is meant.
*/
tablespaceOid = ConvertToRelfilenodeTblspcOid(tablespaceOid);
* Return empty string for the cluster's default tablespaces. If the
* currrent user is not superuser, we should not expose the absolute
* path to the client.
*/
if (tablespaceOid == DEFAULTTABLESPACE_OID || tablespaceOid == GLOBALTABLESPACE_OID || !superuser()) {
PG_RETURN_TEXT_P(cstring_to_text(""));
}
#if defined(HAVE_READLINK) || defined(WIN32)
* Find the location of the tablespace by reading the symbolic link that
* is in pg_tblspc/<oid>.
*/
errno_t ss_rc = snprintf_s(sourcepath, sizeof(sourcepath), sizeof(sourcepath) - 1,
"%s/%u", TBLSPCDIR, tablespaceOid);
securec_check_ss(ss_rc, "\0", "\0");
rllen = readlink(sourcepath, targetpath, sizeof(targetpath));
if (rllen < 0)
ereport(
ERROR, (errcode(ERRCODE_FILE_READ_FAILED), errmsg("could not read symbolic link \"%s\": %m", sourcepath)));
else if ((unsigned int)(rllen) >= sizeof(targetpath))
ereport(ERROR, (errcode(ERRCODE_NAME_TOO_LONG), errmsg("symbolic link \"%s\" target is too long", sourcepath)));
targetpath[rllen] = '\0';
size_t dataDirLength = strlen(t_thrd.proc_cxt.DataDir);
if (0 == strncmp(targetpath, t_thrd.proc_cxt.DataDir, dataDirLength) && (size_t)rllen > dataDirLength &&
targetpath[dataDirLength] == '/') {
* The position is not '/' when skip t_thrd.proc_cxt.DataDir. the relative location can't start from '/'
* We only need get the relative location, so remove the common prefix
*/
int pos = (t_thrd.proc_cxt.DataDir[strlen(t_thrd.proc_cxt.DataDir)] == '/')
? strlen(t_thrd.proc_cxt.DataDir)
: (strlen(t_thrd.proc_cxt.DataDir) + 1);
pos += strlen(PG_LOCATION_DIR) + 1;
errno_t rc = memmove_s(targetpath, MAXPGPATH, targetpath + pos, strlen(targetpath) - pos);
securec_check(rc, "\0", "\0");
targetpath[strlen(targetpath) - pos] = '\0';
}
PG_RETURN_TEXT_P(cstring_to_text(targetpath));
#else
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("tablespaces are not supported on this platform")));
PG_RETURN_NULL();
#endif
}
* pg_sleep - delay for N seconds
*/
Datum pg_sleep(PG_FUNCTION_ARGS)
{
float8 secs = PG_GETARG_FLOAT8(0);
float8 endtime;
* We break the requested sleep into segments of no more than 1 second, to
* put an upper bound on how long it will take us to respond to a cancel
* or die interrupt. (Note that pg_usleep is interruptible by signals on
* some platforms but not others.) Also, this method avoids exposing
* pg_usleep's upper bound on allowed delays.
*
* By computing the intended stop time initially, we avoid accumulation of
* extra delay across multiple sleeps. This also ensures we won't delay
* less than the specified time if pg_usleep is interrupted by other
* signals such as SIGHUP.
*/
#ifdef HAVE_INT64_TIMESTAMP
#define GetNowFloat() ((float8)GetCurrentTimestamp() / 1000000.0)
#else
#define GetNowFloat() GetCurrentTimestamp()
#endif
endtime = GetNowFloat() + secs;
for (;;) {
float8 delay;
CHECK_FOR_INTERRUPTS();
delay = endtime - GetNowFloat();
if (delay >= 1.0)
pg_usleep(1000000L);
else if (delay > 0.0)
pg_usleep((long)ceil(delay * 1000000.0));
else
break;
}
PG_RETURN_VOID();
}
Datum pg_get_nodename(PG_FUNCTION_ARGS)
{
if (g_instance.attr.attr_common.PGXCNodeName == NULL)
g_instance.attr.attr_common.PGXCNodeName = "Error Node";
PG_RETURN_TEXT_P(cstring_to_text(g_instance.attr.attr_common.PGXCNodeName));
}
Datum pg_get_keywords(PG_FUNCTION_ARGS)
{
FuncCallContext* funcctx = NULL;
if (SRF_IS_FIRSTCALL()) {
MemoryContext oldcontext;
TupleDesc tupdesc;
funcctx = SRF_FIRSTCALL_INIT();
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
tupdesc = CreateTemplateTupleDesc(3, false);
TupleDescInitEntry(tupdesc, (AttrNumber)1, "word", TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber)2, "catcode", CHAROID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber)3, "catdesc", TEXTOID, -1, 0);
funcctx->attinmeta = TupleDescGetAttInMetadata(tupdesc);
MemoryContextSwitchTo(oldcontext);
}
funcctx = SRF_PERCALL_SETUP();
if (funcctx->call_cntr < (uint32)ScanKeywords.num_keywords) {
char* values[3];
HeapTuple tuple;
values[0] = (char *)GetScanKeyword(funcctx->call_cntr,
&ScanKeywords);
switch (ScanKeywordCategories[funcctx->call_cntr]) {
case UNRESERVED_KEYWORD:
values[1] = "U";
values[2] = _("unreserved");
break;
case COL_NAME_KEYWORD:
values[1] = "C";
values[2] = _("unreserved (cannot be function or type name)");
break;
case TYPE_FUNC_NAME_KEYWORD:
values[1] = "T";
values[2] = _("reserved (can be function or type name)");
break;
case RESERVED_KEYWORD:
values[1] = "R";
values[2] = _("reserved");
break;
default:
values[1] = NULL;
values[2] = NULL;
break;
}
tuple = BuildTupleFromCStrings(funcctx->attinmeta, values);
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
}
SRF_RETURN_DONE(funcctx);
}
* Return the type of the argument.
*/
Datum pg_typeof(PG_FUNCTION_ARGS)
{
PG_RETURN_OID(get_fn_expr_argtype(fcinfo->flinfo, 0));
}
* Implementation of the COLLATE FOR expression; returns the collation
* of the argument.
*/
Datum pg_collation_for(PG_FUNCTION_ARGS)
{
Oid typeId;
Oid collid;
typeId = get_fn_expr_argtype(fcinfo->flinfo, 0);
if (!typeId)
PG_RETURN_NULL();
if (!type_is_collatable(typeId) && typeId != UNKNOWNOID)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("collations are not supported by type %s", format_type_be(typeId))));
collid = PG_GET_COLLATION();
if (!collid)
PG_RETURN_NULL();
PG_RETURN_TEXT_P(cstring_to_text(generate_collation_name(collid)));
}
Datum pg_test_err_contain_err(PG_FUNCTION_ARGS)
{
u_sess->utils_cxt.test_err_type = PG_GETARG_INT32(0);
switch (u_sess->utils_cxt.test_err_type) {
case 1: {
PG_TRY();
{
ereport(ERROR, (errcode(ERRCODE_DIAGNOSTICS_EXCEPTION), errmsg_internal("ERROR RETHROW")));
}
PG_CATCH();
{
PG_RE_THROW();
}
PG_END_TRY();
break;
}
case 2: {
PG_TRY();
{
ereport(ERROR, (errcode(ERRCODE_DIAGNOSTICS_EXCEPTION), errmsg_internal("ERROR")));
}
PG_CATCH();
{
ereport(ERROR, (errcode(ERRCODE_DIAGNOSTICS_EXCEPTION), errmsg_internal("ERROR CATCH")));
}
PG_END_TRY();
break;
}
case 3: {
PG_TRY();
{
ereport(ERROR, (errcode(ERRCODE_DIAGNOSTICS_EXCEPTION), errmsg_internal("ERR ERR RETHOW")));
}
PG_CATCH();
{
PG_RE_THROW();
}
PG_END_TRY();
break;
}
case 4: {
PG_TRY();
{
ereport(ERROR, (errcode(ERRCODE_DIAGNOSTICS_EXCEPTION), errmsg_internal("ERR ERR RETHOW")));
}
PG_CATCH();
{
ereport(ERROR, (errcode(ERRCODE_DIAGNOSTICS_EXCEPTION), errmsg_internal("ERR ERR CATCH")));
}
PG_END_TRY();
break;
}
default:
break;
}
PG_RETURN_VOID();
}
void cancel_backend(ThreadId pid)
{
int sig_return = 0;
sig_return = pg_signal_backend(pid, SIGINT, true);
if (sig_return == SIGNAL_BACKEND_NOPERMISSION) {
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("fail to drop the user"),
errhint("fail to cancel backend process for privilege")));
}
}
* SQL wrapper around RelationGetReplicaIndex().
*/
Datum pg_get_replica_identity_index(PG_FUNCTION_ARGS)
{
Oid reloid = PG_GETARG_OID(0);
Oid idxoid;
Relation rel;
rel = heap_open(reloid, AccessShareLock);
idxoid = RelationGetReplicaIndex(rel);
heap_close(rel, AccessShareLock);
if (OidIsValid(idxoid))
PG_RETURN_OID(idxoid);
else
PG_RETURN_NULL();
}
Datum b_database_row_count(PG_FUNCTION_ARGS)
{
PG_RETURN_INT64(u_sess->statement_cxt.last_row_count);
}
* pg_relation_is_updatable - determine which update events the specified
* relation supports.
*
* This relies on relation_is_updatable() in rewriteHandler.c, which see
* for additional information.
*/
Datum pg_relation_is_updatable(PG_FUNCTION_ARGS)
{
Oid reloid = PG_GETARG_OID(0);
bool include_triggers = PG_GETARG_BOOL(1);
PG_RETURN_INT32(relation_is_updatable(reloid, include_triggers, NULL));
}
* pg_column_is_updatable - determine whether a column is updatable
*
* This function encapsulates the decision about just what
* information_schema.columns.is_updatable actually means. It's not clear
* whether deletability of the column's relation should be required, so
* we want that decision in C code where we could change it without initdb.
*/
Datum pg_column_is_updatable(PG_FUNCTION_ARGS)
{
Oid reloid = PG_GETARG_OID(0);
AttrNumber attnum = PG_GETARG_INT16(1);
AttrNumber col = attnum - FirstLowInvalidHeapAttributeNumber;
bool include_triggers = PG_GETARG_BOOL(2);
int events;
if (attnum <= 0)
PG_RETURN_BOOL(false);
events = relation_is_updatable(reloid, include_triggers, bms_make_singleton(col));
#define REQ_EVENTS ((1 << CMD_UPDATE) | (1 << CMD_DELETE))
PG_RETURN_BOOL((events & REQ_EVENTS) == REQ_EVENTS);
}