*
* portalcmds.cpp
* Utility commands affecting portals (that is, SQL cursor commands)
*
* Note: see also tcop/pquery.c, which implements portal operations for
* the FE/BE protocol. This module uses pquery.c for some operations.
* And both modules depend on utils/mmgr/portalmem.c, which controls
* storage management for portals (but doesn't run any queries in them).
*
*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/gausskernel/optimizer/commands/portalcmds.cpp
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include <limits.h>
#include "access/xact.h"
#include "commands/portalcmds.h"
#include "executor/executor.h"
#include "executor/tstoreReceiver.h"
#include "distributelayer/streamCore.h"
#include "pgxc/execRemote.h"
#include "tcop/pquery.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#ifdef PGXC
#include "pgxc/pgxc.h"
#endif
* PerformCursorOpen
* Execute SQL DECLARE CURSOR command.
*
* The query has already been through parse analysis, rewriting, and planning.
* When it gets here, it looks like a SELECT PlannedStmt, except that the
* utilityStmt field is set.
*/
void PerformCursorOpen(PlannedStmt* stmt, ParamListInfo params, const char* queryString, bool isTopLevel)
{
DeclareCursorStmt* cstmt = (DeclareCursorStmt*)stmt->utilityStmt;
Portal portal;
MemoryContext oldContext;
if (cstmt == NULL || !IsA(cstmt, DeclareCursorStmt))
ereport(
ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("PerformCursorOpen called for non-cursor query")));
* Disallow empty-string cursor name (conflicts with protocol-level
* unnamed portal).
*/
if (!cstmt->portalname || cstmt->portalname[0] == '\0')
ereport(ERROR, (errcode(ERRCODE_INVALID_CURSOR_NAME), errmsg("invalid cursor name: must not be empty")));
* If this is a non-holdable cursor, we require that this statement has
* been executed inside a transaction block (or else, it would have no
* user-visible effect).
*/
if (!(cstmt->options & CURSOR_OPT_HOLD))
RequireTransactionChain(isTopLevel, "DECLARE CURSOR");
* Create a portal and copy the plan and queryString into its memory.
*/
portal = CreatePortal(cstmt->portalname, false, false);
#ifdef PGXC
* Consume the command id of the command creating the cursor
*/
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
GetCurrentCommandId(true);
#endif
oldContext = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
stmt = (PlannedStmt*)copyObject(stmt);
stmt->utilityStmt = NULL;
queryString = pstrdup(queryString);
PortalDefineQuery(portal,
NULL,
queryString,
"SELECT",
list_make1(stmt),
NULL);
* Also copy the outer portal's parameter list into the inner portal's
* memory context. We want to pass down the parameter values in case we
* had a command like
* DECLARE c CURSOR FOR SELECT ... WHERE foo = $1
* This will have been parsed using the outer parameter set and the
* parameter value needs to be preserved for use when the cursor is
* executed.
* ----------
*/
params = copyParamList(params);
(void)MemoryContextSwitchTo(oldContext);
* Set up options for portal.
*
* If the user didn't specify a SCROLL type, allow or disallow scrolling
* based on whether it would require any additional runtime overhead to do
* so. Also, we disallow scrolling for FOR UPDATE cursors.
*/
portal->cursorOptions = cstmt->options;
if (!(portal->cursorOptions & (CURSOR_OPT_SCROLL | CURSOR_OPT_NO_SCROLL))) {
if (stmt->rowMarks == NIL && ExecSupportsBackwardScan(stmt->planTree))
portal->cursorOptions |= CURSOR_OPT_SCROLL;
else
portal->cursorOptions |= CURSOR_OPT_NO_SCROLL;
}
if (portal->cursorOptions & CURSOR_OPT_HOLD) {
portal->cursorHoldUserId = u_sess->misc_cxt.CurrentUserId;
portal->cursorHoldSecRestrictionCxt = u_sess->misc_cxt.SecurityRestrictionContext;
}
if (u_sess->pgxc_cxt.gc_fdw_snapshot) {
PushActiveSnapshot(u_sess->pgxc_cxt.gc_fdw_snapshot);
}
* Start execution, inserting parameters if any.
*/
PortalStart(portal, params, 0, GetActiveSnapshot());
if (u_sess->stream_cxt.global_obj != NULL) {
portal->streamInfo.ResetEnvForCursor();
}
if (u_sess->pgxc_cxt.gc_fdw_snapshot) {
PopActiveSnapshot();
}
Assert(portal->strategy == PORTAL_ONE_SELECT);
* We're done; the query won't actually be run until PerformPortalFetch is
* called.
*/
}
* nprocessed = -1 if fetch error
*/
void cursorFetchEndHook(FetchStmt* stmt, int nprocessed)
{
if (stmt->ismove) {
return;
}
if (u_sess->hook_cxt.fetchStatusHook) {
((FetchStatusHook)(u_sess->hook_cxt.fetchStatusHook))(nprocessed > 0 ? 0 : -1);
}
if (u_sess->hook_cxt.rowcountHook) {
((RowcountHook)(u_sess->hook_cxt.rowcountHook))(nprocessed > 0 ? nprocessed : 0);
}
}
* PerformPortalFetch
* Execute SQL FETCH or MOVE command.
*
* stmt: parsetree node for command
* dest: where to send results
* completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE
* in which to store a command completion status string.
*
* completionTag may be NULL if caller doesn't want a status string.
*/
void PerformPortalFetch(FetchStmt* stmt, DestReceiver* dest, char* completionTag)
{
Portal portal;
long nprocessed;
* Disallow empty-string cursor name (conflicts with protocol-level
* unnamed portal).
*/
if (!stmt->portalname || stmt->portalname[0] == '\0') {
cursorFetchEndHook(stmt, -1);
ereport(ERROR, (errcode(ERRCODE_INVALID_CURSOR_NAME), errmsg("invalid cursor name: must not be empty")));
}
portal = GetPortalByName(stmt->portalname);
if (!PortalIsValid(portal)) {
cursorFetchEndHook(stmt, -1);
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_CURSOR), errmsg("cursor \"%s\" does not exist", stmt->portalname)));
return;
}
if (stmt->ismove)
dest = None_Receiver;
bool savedIsAllowCommitRollback;
bool needResetErrMsg = stp_disable_xact_and_set_err_msg(&savedIsAllowCommitRollback, STP_XACT_OPEN_FOR);
PG_TRY();
{
nprocessed = PortalRunFetch(portal, stmt->direction, stmt->howMany, dest);
}
PG_CATCH();
{
cursorFetchEndHook(stmt, -1);
PG_RE_THROW();
}
PG_END_TRY();
stp_reset_xact_state_and_err_msg(savedIsAllowCommitRollback, needResetErrMsg);
if (completionTag != NULL) {
int rc = snprintf_s(completionTag,
COMPLETION_TAG_BUFSIZE,
COMPLETION_TAG_BUFSIZE - 1,
"%s %ld",
stmt->ismove ? "MOVE" : "FETCH",
nprocessed);
securec_check_ss(rc, "", "");
}
cursorFetchEndHook(stmt, nprocessed);
}
* PerformPortalClose
* Close a cursor.
*/
void PerformPortalClose(const char* name)
{
Portal portal;
if (name == NULL) {
PortalHashTableDeleteAll();
return;
}
* Disallow empty-string cursor name (conflicts with protocol-level
* unnamed portal).
*/
if (name[0] == '\0')
ereport(ERROR, (errcode(ERRCODE_INVALID_CURSOR_NAME), errmsg("invalid cursor name: must not be empty")));
* get the portal from the portal name
*/
portal = GetPortalByName(name);
if (!PortalIsValid(portal)) {
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_CURSOR), errmsg("cursor \"%s\" does not exist", name)));
return;
}
* Note: PortalCleanup is called as a side-effect, if not already done.
*/
PortalDrop(portal, false);
}
* PortalCleanup
*
* Clean up a portal when it's dropped. This is the standard cleanup hook
* for portals.
*
* Note: if portal->status is PORTAL_FAILED, we are probably being called
* during error abort, and must be careful to avoid doing anything that
* is likely to fail again.
*/
void PortalCleanup(Portal portal)
{
QueryDesc* queryDesc = NULL;
* sanity checks
*/
AssertArg(PortalIsValid(portal));
AssertArg(portal->cleanup == PortalCleanup);
* Shut down executor, if still running. We skip this during error abort,
* since other mechanisms will take care of releasing executor resources,
* and we can't be sure that ExecutorEnd itself wouldn't fail.
*/
queryDesc = PortalGetQueryDesc(portal);
if (queryDesc != NULL) {
* Reset the queryDesc before anything else. This prevents us from
* trying to shut down the executor twice, in case of an error below.
* The transaction abort mechanisms will take care of resource cleanup
* in such a case.
*/
portal->queryDesc = NULL;
if (portal->status != PORTAL_FAILED) {
ResourceOwner saveResourceOwner;
saveResourceOwner = t_thrd.utils_cxt.CurrentResourceOwner;
PG_TRY();
{
#ifndef ENABLE_MULTIPLE_NODES
if (IS_STREAM_PORTAL) {
portal->streamInfo.AttachToSession();
}
#endif
t_thrd.utils_cxt.CurrentResourceOwner = portal->resowner;
ExecutorFinish(queryDesc);
ExecutorEnd(queryDesc);
#if !defined(ENABLE_MULTIPLE_NODES) && !defined(USE_SPQ)
* estate is under the queryDesc, and stream threads use it.
* we should wait all stream threads exit to cleanup queryDesc.
*/
if (IS_STREAM_PORTAL) {
StreamNodeGroup::ReleaseStreamGroup(true);
portal->streamInfo.Reset();
}
#else
if (t_thrd.spq_ctx.spq_role == ROLE_UTILITY && IS_STREAM_PORTAL) {
StreamNodeGroup::ReleaseStreamGroup(true);
portal->streamInfo.Reset();
}
#endif
FreeQueryDesc(queryDesc);
}
PG_CATCH();
{
t_thrd.utils_cxt.CurrentResourceOwner = saveResourceOwner;
PG_RE_THROW();
}
PG_END_TRY();
t_thrd.utils_cxt.CurrentResourceOwner = saveResourceOwner;
}
}
}
#ifdef ENABLE_MULTIPLE_NODES
static bool check_remote_plan(Plan* plan)
{
if (plan == NULL)
return false;
if (IsA(plan, RemoteQuery) || IsA(plan, VecRemoteQuery)) {
RemoteQuery* remote_query = (RemoteQuery*)plan;
if (remote_query->exec_type != EXEC_ON_COORDS)
return true;
}
return check_remote_plan(plan->lefttree) || check_remote_plan(plan->righttree);
}
static bool PortalCheckRemotePlan(PlannedStmt* plannedstmt)
{
bool is_remote_plan = false;
if (IsA(plannedstmt, PlannedStmt)) {
if (check_remote_plan(plannedstmt->planTree)) {
is_remote_plan = true;
}
}
return is_remote_plan;
}
#endif
* PersistHoldablePortal
*
* Prepare the specified Portal for access outside of the current
* transaction. When this function returns, all future accesses to the
* portal must be done via the Tuplestore (not by invoking the
* executor).
*/
void PersistHoldablePortal(Portal portal, bool is_rollback)
{
QueryDesc* queryDesc = PortalGetQueryDesc(portal);
Portal saveActivePortal;
ResourceOwner saveResourceOwner;
MemoryContext savePortalContext;
MemoryContext oldcxt;
Oid saveUserid;
Oid saveOldUserid;
int saveSecContext;
* If we're preserving a holdable portal, we had better be inside the
* transaction that originally created it.
*/
Assert(portal->createSubid != InvalidSubTransactionId);
Assert(queryDesc != NULL);
* Caller must have created the tuplestore already.
*/
Assert(portal->holdContext != NULL);
Assert(portal->holdStore != NULL);
* Before closing down the executor, we must copy the tupdesc into
* long-term memory, since it was created in executor memory.
*/
oldcxt = MemoryContextSwitchTo(portal->holdContext);
portal->tupDesc = CreateTupleDescCopy(portal->tupDesc);
MemoryContextSwitchTo(oldcxt);
* Check for improper portal use, and mark portal active.
*/
MarkPortalActive(portal);
* Set up global portal context pointers.
*/
saveActivePortal = ActivePortal;
saveResourceOwner = t_thrd.utils_cxt.CurrentResourceOwner;
savePortalContext = t_thrd.mem_cxt.portal_mem_cxt;
saveOldUserid = GetOldUserId(false);
if (OidIsValid(portal->cursorHoldUserId)) {
GetUserIdAndSecContext(&saveUserid, &saveSecContext);
SetOldUserId(saveUserid, false);
SetUserIdAndSecContext(portal->cursorHoldUserId,
portal->cursorHoldSecRestrictionCxt | SECURITY_LOCAL_USERID_CHANGE | SENDER_LOCAL_USERID_CHANGE);
}
PG_TRY();
{
ActivePortal = portal;
t_thrd.utils_cxt.CurrentResourceOwner = portal->resowner;
t_thrd.mem_cxt.portal_mem_cxt = PortalGetHeapMemory(portal);
MemoryContextSwitchTo(t_thrd.mem_cxt.portal_mem_cxt);
PushActiveSnapshot(queryDesc->snapshot);
if (IsA(queryDesc->planstate, StreamState)) {
* Record current position when transaction commit for cursor with stream plan,
* so that subsequent absolute FETCHs can be processed properly.
*/
portal->commitPortalPos = portal->portalPos;
portal->portalPos = 0;
} else {
* Rewind the executor: we need to store the entire result set in the
* tuplestore, so that subsequent backward FETCHs can be processed.
*/
portal->commitPortalPos = 0;
ExecutorRewind(queryDesc);
}
* Change the destination to output to the tuplestore. Note we tell
* the tuplestore receiver to detoast all data passed through it.
*/
queryDesc->dest = CreateDestReceiver(DestTuplestore);
SetTuplestoreDestReceiverParams(queryDesc->dest, portal->holdStore, portal->holdContext, true);
ExecutorRun(queryDesc, ForwardScanDirection, 0L);
#ifdef ENABLE_MULTIPLE_NODES
bool is_remote_plan = PortalCheckRemotePlan(queryDesc->plannedstmt);
#endif
(*queryDesc->dest->rDestroy)(queryDesc->dest);
queryDesc->dest = NULL;
if (is_rollback && queryDesc->estate->have_current_xact_date) {
portal->have_rollback_transaction = true;
} else {
portal->have_rollback_transaction = false;
}
* Now shut down the inner executor.
*/
portal->queryDesc = NULL;
ExecutorFinish(queryDesc);
ExecutorEnd(queryDesc);
#ifndef ENABLE_MULTIPLE_NODES
if (IS_STREAM_PORTAL) {
portal->streamInfo.AttachToSession();
StreamNodeGroup::ReleaseStreamGroup(true, STREAM_COMPLETE);
portal->streamInfo.Reset();
}
#endif
FreeQueryDesc(queryDesc);
* Set the position in the result set: ideally, this could be
* implemented by just skipping straight to the tuple # that we need
* to be at, but the tuplestore API doesn't support that. So we start
* at the beginning of the tuplestore and iterate through it until we
* reach where we need to be. (Fortunately, the
* typical case is that we're supposed to be at or near the start of
* the result set, so this isn't as bad as it sounds.)
*/
MemoryContextSwitchTo(portal->holdContext);
* This code, previously used only in cursor hold mode, is reused when we add commit/rollback features;
* distributed or single nodes will call tuplestore_rescan to rewind the active read pointer to start,
* but singlenode needs to call tuplestore_advance separately to mark the data that has been read before the deletion
*/
#ifdef ENABLE_MULTIPLE_NODES
if ((portal->cursorOptions & CURSOR_OPT_HOLD) || (!is_remote_plan)) {
#endif
if (portal->atEnd) {
while (tuplestore_advance(portal->holdStore, true))
;
} else {
long store_pos;
if (portal->posOverflow)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("could not reposition held cursor")));
tuplestore_rescan(portal->holdStore);
for (store_pos = 0; store_pos < portal->portalPos; store_pos++) {
if (!tuplestore_advance(portal->holdStore, true))
ereport(ERROR, (errcode(ERRCODE_UNEXPECTED_CHUNK_VALUE), errmsg("unexpected end of tuple stream")));
}
}
#ifdef ENABLE_MULTIPLE_NODES
}
else {
tuplestore_rescan(portal->holdStore);
}
#endif
SetOldUserId(saveOldUserid, false);
}
PG_CATCH();
{
MarkPortalFailed(portal);
ActivePortal = saveActivePortal;
t_thrd.utils_cxt.CurrentResourceOwner = saveResourceOwner;
t_thrd.mem_cxt.portal_mem_cxt = savePortalContext;
if (OidIsValid(portal->cursorHoldUserId)) {
SetUserIdAndSecContext(saveUserid, saveSecContext);
}
SetOldUserId(saveOldUserid, false);
PG_RE_THROW();
}
PG_END_TRY();
MemoryContextSwitchTo(oldcxt);
portal->status = PORTAL_READY;
ActivePortal = saveActivePortal;
t_thrd.utils_cxt.CurrentResourceOwner = saveResourceOwner;
t_thrd.mem_cxt.portal_mem_cxt = savePortalContext;
if (OidIsValid(portal->cursorHoldUserId)) {
SetUserIdAndSecContext(saveUserid, saveSecContext);
}
PopActiveSnapshot();
* We can now release any subsidiary memory of the portal's heap context;
* we'll never use it again. The executor already dropped its context,
* but this will clean up anything that glommed onto the portal's heap via
* t_thrd.mem_cxt.portal_mem_cxt.
*/
MemoryContextDeleteChildren(PortalGetHeapMemory(portal), NULL);
}