*
* nodeShareInputScan.cpp
*
* A Share Input Scan node is used to share the result of an operation in
* two different branches in the plan tree.
*
* These come in two variants: local, and cross-slice.
*
* Local shares
* ------------
*
* In local mode, all the consumers are in the same slice as the producer.
* In that case, there's no need to communicate across processes, so we
* rely entirely on data structures in backend-private memory to track the
* state.
*
* In local mode, there is no difference between producer and consumer
* nodes. In ExecInitShareInputScan(), the producer node stores the
* PlanState of the shared child node where all the nodes can find it.
* The first ExecShareInputScan() call initializes the store.
*
* A local-mode ShareInputScan is quite similar to PostgreSQL's CteScan,
* but there are some implementation differences. CteScan uses a special
* PARAM_EXEC entry to hold the shared state, while ShareInputScan uses
* an entry in es_sharenode instead.
*
* Cross-slice shares
* ------------------
*
* A cross-slice share works basically the same as a local one, except
* that the producing slice makes the underlying tuplestore available to
* other processes, by forcing it to be written to a file on disk. The
* first ExecShareInputScan() call in the producing slice materializes
* the whole tuplestore, and advertises that it's ready in shared memory.
* Consumer slices wait for that before trying to read the store.
*
* The producer and the consumers communicate the status of the scan using
* shared memory. There's a hash table in shared memory, containing a
* 'shareinput_Xslice_state' struct for each shared scan. The producer uses
* a &state->ready_done_cv.m_mutexcondition variable to wake up consumers, when the tuplestore is fully
* materialized, and the consumers use the same condition variable to inform
* the producer when they're done reading it. The producer slice keeps the
* underlying tuplestore open, until all the consumers have finished.
*
* Portions Copyright (c) 2023 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/gausskernel/runtime/executor/nodeShareInputScan.cpp
*
* -------------------------------------------------------------------------
*/
#ifdef USE_SPQ
#include "postgres.h"
#include "access/xact.h"
#include "executor/executor.h"
#include "executor/node/nodeShareInputScan.h"
#include "miscadmin.h"
#include "storage/lock/lwlock.h"
#include "storage/lwlocknames.h"
#include "storage/shmem.h"
#include "utils/memutils.h"
#include "utils/resowner.h"
#include "utils/tuplestore.h"
#include "lib/ilist.h"
* The SharedFileSet deletes any remaining files when the reference count
* reaches zero, but we don't rely on that mechanism. All the files are
* held in the same SharedFileSet, so it cannot be recycled until all
* ShareInputScans in the system have finished, which might never happen if
* new queries are started continuously. The shareinput_Xslice_state entries
* are reference counted separately, and we clean up the files backing each
* individual ShareInputScan whenever its reference count reaches zero.
*/
static SharedFileSet *shareinput_Xslice_fileset;
typedef struct {
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
} ConditionVariable;
* In a cross-slice ShareinputScan, the producer and consumer processes
* communicate using shared memory. There's a hash table containing one
* 'shareinput_share_state' for each in-progress shared input scan.
*
* The hash table itself,, and the fields within every entry, are protected
* by ShareInputScanLock. (Although some operations get away without the
* lock, when the field is atomic and/or there's only one possible writer.)
*
* All producers and consumers that participate in a shared scan hold
* a reference to the 'shareinput_Xslice_state' entry of the scan, for
* the whole lifecycle of the node from ExecInitShareInputScan() to
* ExecEndShareInputScan(). The entry in the hash table is created by
* the first participant that initializes, which is not necessarily the
* producer! When the last participant releases the entry, it is removed
* from the hash table.
*/
typedef struct shareinput_tag {
uint64 session_id;
int32 share_id;
int32 dop_id;
} shareinput_tag;
typedef struct shareinput_Xslice_state {
shareinput_tag tag;
int refcount;
bool ready;
int ndone;
* ready_done_cv is used for signaling when the scan becomes "ready", and
* when it becomes "done". The producer wakes up everyone waiting on this
* condition variable when it sets ready = true. Also, when the last
* consumer finishes the scan (ndone reaches nconsumers), it wakes up the
* producer using this same condition variable.
*/
ConditionVariable ready_done_cv;
} shareinput_Xslice_state;
* 'shareinput_reference' represents a reference or "lease" to an entry
* in the shared memory hash table. It is used for garbage collection of
* the entries, on transaction abort.
*
*/
typedef struct shareinput_Xslice_reference {
int share_id;
shareinput_Xslice_state *xslice_state;
ResourceOwner owner;
dlist_node node;
} shareinput_Xslice_reference;
* For local (i.e. intra-slice) variants, we use a 'shareinput_local_state'
* to track the status. It is analogous to 'shareinput_share_state' used for
* cross-slice scans, but we don't need to keep it in shared memory. These
* are held in estate->es_sharenode, indexed by share_id.
*/
typedef struct shareinput_local_state {
bool ready;
bool closed;
int ndone;
int nsharers;
* This points to the child node that's being shared. Set by
* ExecInitShareInputScan() of the instance that has the child.
*/
PlanState *childState;
Tuplestorestate *ts_state;
} shareinput_local_state;
static shareinput_Xslice_reference *get_shareinput_reference(int share_id);
static void release_shareinput_reference(shareinput_Xslice_reference *ref);
static void shareinput_create_bufname_prefix(char *p, int size, int share_id, int dop_id);
static void shareinput_writer_notifyready(shareinput_Xslice_reference *ref);
static void shareinput_reader_waitready(shareinput_Xslice_reference *ref);
static void shareinput_reader_notifydone(shareinput_Xslice_reference *ref, int nconsumers);
static void shareinput_writer_waitdone(shareinput_Xslice_reference *ref, int nconsumers);
* init_tuplestore_state
* Initialize the tuplestore state for the Shared node if the state
* is not initialized.
*/
static void init_tuplestore_state(ShareInputScanState *node)
{
EState *estate = node->ss.ps.state;
ShareInputScan *sisc = (ShareInputScan *)node->ss.ps.plan;
shareinput_local_state *local_state = node->local_state;
Tuplestorestate *ts;
int tsptrno;
TupleTableSlot *outerslot;
Assert(!node->isready);
Assert(node->ts_state == NULL);
Assert(node->ts_pos == -1);
if (!node->ref)
elog(ERROR, "cannot execute ShareInputScan that was not initialized");
if (!local_state->ready) {
if (t_thrd.spq_ctx.current_id == sisc->producer_slice_id || estate->es_plannedstmt->num_streams == 1) {
char rwfile_prefix[100];
ts = tuplestore_begin_heap(true,
false,
10);
shareinput_create_bufname_prefix(rwfile_prefix, sizeof(rwfile_prefix), sisc->share_id,
u_sess->stream_cxt.smp_id);
tuplestore_make_shared(ts, get_shareinput_fileset(), rwfile_prefix);
for (;;) {
outerslot = ExecProcNode(local_state->childState);
if (TupIsNull(outerslot))
break;
tuplestore_puttupleslot(ts, outerslot);
}
tuplestore_freeze(ts);
shareinput_writer_notifyready(node->ref);
tuplestore_rescan(ts);
} else {
* We are a consumer slice. Wait for the producer to create the
* tuplestore.
*/
char rwfile_prefix[100];
shareinput_reader_waitready(node->ref);
shareinput_create_bufname_prefix(rwfile_prefix, sizeof(rwfile_prefix), sisc->share_id,
u_sess->stream_cxt.smp_id);
ts = tuplestore_open_shared(get_shareinput_fileset(), rwfile_prefix);
}
local_state->ts_state = ts;
local_state->ready = true;
tsptrno = 0;
} else {
ts = local_state->ts_state;
tsptrno = tuplestore_alloc_read_pointer(ts, (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND));
tuplestore_select_read_pointer(ts, tsptrno);
tuplestore_rescan(ts);
}
node->ts_state = ts;
node->ts_pos = tsptrno;
node->isready = true;
}
* ExecShareInputScan
* Retrieve a tuple from the ShareInputScan
* ------------------------------------------------------------------
*/
TupleTableSlot *ExecShareInputScan(PlanState *pstate)
{
ShareInputScanState *node = castNode(ShareInputScanState, pstate);
ShareInputScan *sisc = (ShareInputScan *)pstate->plan;
EState *estate;
ScanDirection dir;
bool forward;
TupleTableSlot *slot;
* get state info from node
*/
estate = pstate->state;
dir = estate->es_direction;
forward = ScanDirectionIsForward(dir);
if (sisc->this_slice_id != t_thrd.spq_ctx.current_id && estate->es_plannedstmt->num_streams != 1)
elog(ERROR, "cannot execute alien Share Input Scan");
if (!node->isready)
init_tuplestore_state(node);
slot = node->ss.ps.ps_ResultTupleSlot;
Assert(!node->local_state->closed);
tuplestore_select_read_pointer(node->ts_state, node->ts_pos);
while (1) {
bool gotOK;
gotOK = tuplestore_gettupleslot(node->ts_state, forward, false, slot);
if (!gotOK)
return NULL;
return slot;
}
Assert(!"should not be here");
return NULL;
}
* ExecInitShareInputScan
* ------------------------------------------------------------------
*/
ShareInputScanState *ExecInitShareInputScan(ShareInputScan *node, EState *estate, int eflags)
{
ShareInputScanState *sisstate;
Plan *outerPlan;
PlanState *childState;
shareinput_local_state *local_state;
Assert(innerPlan(node) == NULL);
sisstate = makeNode(ShareInputScanState);
sisstate->ss.ps.plan = (Plan *)node;
sisstate->ss.ps.state = estate;
sisstate->ss.ps.ExecProcNode = ExecShareInputScan;
sisstate->ts_state = NULL;
sisstate->ts_pos = -1;
* init child node.
* if outerPlan is NULL, this is no-op (so that the ShareInput node will be
* only init-ed once).
*/
* initialize child nodes
*
* Like a Material node, we shield the child node from the need to support
* BACKWARD, or MARK/RESTORE.
*/
eflags &= ~(EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK);
outerPlan = outerPlan(node);
childState = ExecInitNode(outerPlan, estate, eflags);
outerPlanState(sisstate) = childState;
Assert(node->scan.plan.qual == NULL);
sisstate->ss.ps.qual = NULL;
*
* Create expression context
*/
ExecAssignExprContext(estate, &sisstate->ss.ps);
* Initialize result slot and type.
*/
ExecInitResultTupleSlot(estate, &sisstate->ss.ps);
ExecAssignResultTypeFromTL(&sisstate->ss.ps);
sisstate->ss.ps.ps_ProjInfo = NULL;
* When doing EXPLAIN only, we won't actually execute anything, so don't
* bother initializing the state. This isn't merely an optimization:
* closing a cross-slice ShareInputScan waits for the consumers to finish,
* but if we don't execute anything, it will hang forever.
*
* We could also exit here immediately if this is an "alien" node, i.e.
* a node that doesn't execute in this slice, but we can't easily
* detect that here.
*/
if ((eflags & EXEC_FLAG_EXPLAIN_ONLY) != 0)
return sisstate;
while (list_length(estate->es_sharenode) <= node->share_id) {
local_state = (shareinput_local_state *)palloc0(sizeof(shareinput_local_state));
local_state->ready = false;
estate->es_sharenode = lappend(estate->es_sharenode, local_state);
}
local_state = (shareinput_local_state *)list_nth(estate->es_sharenode, node->share_id);
* only the consumer ShareInputScan nodes executed in current
* slice are counted, since only consumers would increase
* "ndone" in local_state, and compare "ndone" with "nsharers"
* to judge whether to notify producer.
*/
if (t_thrd.spq_ctx.current_id == node->this_slice_id && t_thrd.spq_ctx.current_id != node->producer_slice_id)
local_state->nsharers++;
if (childState)
local_state->childState = childState;
sisstate->local_state = local_state;
sisstate->ref = get_shareinput_reference(node->share_id);
return sisstate;
}
* ExecEndShareInputScan
* ------------------------------------------------------------------
*/
void ExecEndShareInputScan(ShareInputScanState *node)
{
EState *estate = node->ss.ps.state;
ShareInputScan *sisc = (ShareInputScan *)node->ss.ps.plan;
shareinput_local_state *local_state = node->local_state;
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
if (node->ref) {
if (sisc->this_slice_id == t_thrd.spq_ctx.current_id || estate->es_plannedstmt->num_streams == 1) {
* The producer needs to wait for all the consumers to finish.
* Consumers signal the producer that they're done reading,
* but are free to exit immediately after that.
*/
if (t_thrd.spq_ctx.current_id == sisc->producer_slice_id) {
if (!local_state->ready)
init_tuplestore_state(node);
shareinput_writer_waitdone(node->ref, sisc->nconsumers);
} else {
if (!local_state->closed) {
shareinput_reader_notifydone(node->ref, sisc->nconsumers);
local_state->closed = true;
}
}
}
release_shareinput_reference(node->ref);
node->ref = NULL;
}
if (local_state && local_state->ts_state) {
tuplestore_end(local_state->ts_state);
local_state->ts_state = NULL;
}
* shutdown subplan. First scanner of underlying share input will
* do the shutdown, all other scanners are no-op because outerPlanState
* is NULL
*/
ExecEndNode(outerPlanState(node));
}
* ExecReScanShareInputScan
* ------------------------------------------------------------------
*/
void ExecReScanShareInputScan(ShareInputScanState *node)
{
if (!node->isready)
init_tuplestore_state(node);
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
Assert(node->ts_pos != -1);
tuplestore_select_read_pointer(node->ts_state, node->ts_pos);
tuplestore_rescan(node->ts_state);
}
* IPC, for cross-slice variants.
* ************************************************************************ */
* When creating a tuplestore file that will be accessed by
* multiple processes, shareinput_create_bufname_prefix() is used to
* construct the name for it.
*/
static void shareinput_create_bufname_prefix(char *p, int size, int share_id, int dop_id)
{
snprintf(p, size, "SIRW_%lu_%d_%d", t_thrd.spq_ctx.spq_session_id, share_id, u_sess->stream_cxt.smp_id);
}
#define MaxBackends 1
* Initialization of the shared hash table for cross-slice communication.
*
* XXX: Use MaxBackends to size it, on the assumption that max_connections
* will scale accordingly to query complexity. This is quite fuzzy, you could
* create a query with tons of cross-slice ShareInputScans but only a few
* slice, but that ought to be rare enough in practice. This isn't a hard
* limit anyway, the hash table will use up any "slop" in shared memory if
* needed.
*/
#define N_SHAREINPUT_SLOTS() (MaxBackends * 5)
Size ShareInputShmemSize(void)
{
Size size;
size = hash_estimate_size(N_SHAREINPUT_SLOTS(), sizeof(shareinput_Xslice_state));
return size;
}
void ShareInputShmemInit(void)
{
Size size = ShareInputShmemSize();
bool found = false;
shareinput_Xslice_fileset = (SharedFileSet *)ShmemInitStruct("ShareInputScan", size, &found);
if (!found || t_thrd.shemem_ptr_cxt.shareinput_Xslice_hash == nullptr) {
HASHCTL info;
errno_t rc = memset_s(&info, sizeof(info), 0, sizeof(info));
securec_check(rc, "\0", "\0");
info.keysize = sizeof(shareinput_tag);
info.entrysize = sizeof(shareinput_Xslice_state);
t_thrd.shemem_ptr_cxt.shareinput_Xslice_hash = ShmemInitHash("ShareInputScan notifications",
N_SHAREINPUT_SLOTS(), N_SHAREINPUT_SLOTS(), &info, HASH_ELEM | HASH_BLOBS);
}
}
* Get reference to the SharedFileSet used to hold all the tuplestore files.
*
* This is exported so that it can also be used by the INITPLAN function
* tuplestores.
*/
SharedFileSet *get_shareinput_fileset(void)
{
LWLockAcquire(ShareInputScanLock, LW_EXCLUSIVE);
if (shareinput_Xslice_fileset->refcnt == 0)
SharedFileSetInit(shareinput_Xslice_fileset);
else
SharedFileSetAttach(shareinput_Xslice_fileset);
LWLockRelease(ShareInputScanLock);
return shareinput_Xslice_fileset;
}
* Get a reference to slot in shared memory for this shared scan.
*
* If the slot doesn't exist yet, it is created and initialized into
* "not ready" state.
*
* The reference is tracked by the current ResourceOwner, and will be
* automatically released on abort.
*/
static shareinput_Xslice_reference *get_shareinput_reference(int share_id)
{
shareinput_tag tag;
shareinput_Xslice_state *xslice_state;
bool found;
shareinput_Xslice_reference *ref;
ref = (shareinput_Xslice_reference *)palloc0(sizeof(shareinput_Xslice_reference));
LWLockAcquire(ShareInputScanLock, LW_EXCLUSIVE);
tag.session_id = t_thrd.spq_ctx.spq_session_id;
tag.share_id = share_id;
tag.dop_id = u_sess->stream_cxt.smp_id;
xslice_state = (shareinput_Xslice_state *)hash_search(t_thrd.shemem_ptr_cxt.shareinput_Xslice_hash, &tag,
HASH_ENTER_NULL, &found);
if (!found) {
if (xslice_state == NULL) {
pfree(ref);
ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of cross-slice ShareInputScan slots")));
}
xslice_state->refcount = 0;
xslice_state->ready = false;
xslice_state->ndone = 0;
pthread_mutex_init(&xslice_state->ready_done_cv.m_mutex, NULL);
pthread_cond_init(&xslice_state->ready_done_cv.m_cond, NULL);
}
xslice_state->refcount++;
ref->share_id = share_id;
ref->xslice_state = xslice_state;
ref->owner = t_thrd.utils_cxt.CurrentResourceOwner;
LWLockRelease(ShareInputScanLock);
return ref;
}
* Release reference to a shared scan.
*
* The reference count in the shared memory slot is decreased, and if
* it reaches zero, it is destroyed.
*/
static void release_shareinput_reference(shareinput_Xslice_reference *ref)
{
shareinput_Xslice_state *state = ref->xslice_state;
LWLockAcquire(ShareInputScanLock, LW_EXCLUSIVE);
if (state->refcount == 1) {
bool found;
(void)hash_search(t_thrd.shemem_ptr_cxt.shareinput_Xslice_hash, &state->tag, HASH_REMOVE, &found);
Assert(found);
} else
state->refcount--;
LWLockRelease(ShareInputScanLock);
pfree(ref);
}
* shareinput_reader_waitready
*
* Called by the reader (consumer) to wait for the writer (producer) to produce
* all the tuples and write them to disk.
*
* This is a blocking operation.
*/
static void shareinput_reader_waitready(shareinput_Xslice_reference *ref)
{
shareinput_Xslice_state *state = ref->xslice_state;
pthread_mutex_lock(&state->ready_done_cv.m_mutex);
if (!state->ready) {
pthread_cond_wait(&state->ready_done_cv.m_cond, &state->ready_done_cv.m_mutex);
}
pthread_mutex_unlock(&state->ready_done_cv.m_mutex);
}
* shareinput_writer_notifyready
*
* Called by the writer (producer) once it is done producing all tuples and
* writing them to disk. It notifies all the readers (consumers) that tuples
* are ready to be read from disk.
*/
static void shareinput_writer_notifyready(shareinput_Xslice_reference *ref)
{
shareinput_Xslice_state *state = ref->xslice_state;
Assert(!state->ready);
pthread_mutex_lock(&state->ready_done_cv.m_mutex);
state->ready = true;
pthread_cond_broadcast(&state->ready_done_cv.m_cond);
pthread_mutex_unlock(&state->ready_done_cv.m_mutex);
}
* shareinput_reader_notifydone
*
* Called by the reader (consumer) to notify the writer (producer) that
* it is done reading tuples from disk.
*
* This is a non-blocking operation.
*/
static void shareinput_reader_notifydone(shareinput_Xslice_reference *ref, int nconsumers)
{
shareinput_Xslice_state *state = ref->xslice_state;
int ndone;
pthread_mutex_lock(&state->ready_done_cv.m_mutex);
state->ndone++;
ndone = state->ndone;
if (ndone >= nconsumers)
pthread_cond_broadcast(&state->ready_done_cv.m_cond);
pthread_mutex_unlock(&state->ready_done_cv.m_mutex);
}
* shareinput_writer_waitdone
*
* Called by the writer (producer) to wait for the "done" notification from
* all readers (consumers).
*
* This is a blocking operation.
*/
static void shareinput_writer_waitdone(shareinput_Xslice_reference *ref, int nconsumers)
{
shareinput_Xslice_state *state = ref->xslice_state;
if (!state->ready)
elog(ERROR, "shareinput_writer_waitdone() called without creating the tuplestore");
int ndone;
pthread_mutex_lock(&state->ready_done_cv.m_mutex);
ndone = state->ndone;
if (ndone < nconsumers) {
pthread_cond_wait(&state->ready_done_cv.m_cond, &state->ready_done_cv.m_mutex);
}
pthread_mutex_unlock(&state->ready_done_cv.m_mutex);
if (ndone > nconsumers)
elog(WARNING, "%d sharers of ShareInputScan reported to be done, but only %d were expected", ndone, nconsumers);
}
#endif