*
* groupmgr.c
* Routines to support manipulation of the pgxc_group catalog
* This includes support for DDL on objects NODE GROUP
*
* Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 2010-2012 Postgres-XC Development Group
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "miscadmin.h"
#include "access/heapam.h"
#include "access/tableam.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/heap.h"
#include "catalog/indexing.h"
#include "catalog/pg_database.h"
#include "catalog/pg_type.h"
#include "catalog/pgxc_node.h"
#include "catalog/pgxc_group.h"
#include "catalog/pgxc_class.h"
#include "catalog/pg_shdepend.h"
#include "catalog/pg_depend.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_proc.h"
#include "commands/dbcommands.h"
#include "libpq/libpq-fe.h"
#include "nodes/parsenodes.h"
#include "nodes/makefuncs.h"
#include "optimizer/planner.h"
#include "pgxc/groupmgr.h"
#include "pgxc/locator.h"
#include "pgxc/pgxc.h"
#include "postmaster/autovacuum.h"
#include "utils/builtins.h"
#include "utils/rel.h"
#include "utils/rel_gs.h"
#include "utils/syscache.h"
#include "utils/lsyscache.h"
#include "utils/array.h"
#include "utils/snapmgr.h"
#include "utils/acl.h"
#include "utils/elog.h"
#include "access/xact.h"
#include "access/sysattr.h"
#include "utils/fmgroids.h"
#include "catalog/pg_authid.h"
#include "pgxc/execRemote.h"
#include "commands/user.h"
#include "commands/sequence.h"
#include "pgxc/pgxcnode.h"
#include "tcop/utility.h"
#include "storage/proc.h"
#include "utils/elog.h"
#include "utils/snapmgr.h"
#include "utils/knl_relcache.h"
#define CHAR_BUF_SIZE 512
#define BUCKET_MAP_SIZE 32
#pragma GCC diagnostic ignored "-Wunused-function"
* The element data structure of bucketmap cache, where store the bucketmap that
* palloc-ed form t_thrd.top_mem_cxt, it is created on its first being used
*/
typedef struct BucketMapCache {
Oid groupoid;
ItemPointerData ctid;
char* groupname;
uint2* bucketmap;
int bucketcnt;
} NodeGroupBucketMap;
static void BucketMapCacheAddEntry(Oid groupoid, Datum groupanme_datum, Datum bucketmap_datum, ItemPointer ctid);
static void BucketMapCacheRemoveEntry(Oid groupoid);
#define BUCKETMAP_MODE_DEFAULT 0
#define BUCKETMAP_MODE_REMAP 1
static oidvector* get_group_member_ref(HeapTuple tup, bool* need_free);
static void generateConsistentHashBucketmap(CreateGroupStmt* stmt, oidvector* nodes_array, Relation pgxc_group_rel,
HeapTuple tuple, uint2* bucket_ptr, int bucketmap_mode = BUCKETMAP_MODE_DEFAULT);
static void contractBucketMap(Relation rel, HeapTuple old_group, oidvector* new_nodes, uint2* bucket_ptr);
static void getBucketsFromBucketList(List* bucketlist, int member_count, uint2* bucket_ptr);
static void GetTablesByDatabase(const char* database_name, char* query, size_t size);
void PgxcCheckOid(Oid noid, char* node_name);
void PgxcOpenGroupRelation(Relation* pgxc_group_rel);
static void set_current_installation_nodegroup(const char* group_name)
{
MemoryContext oldcontext =
MemoryContextSwitchTo(THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_OPTIMIZER));
t_thrd.pgxc_cxt.current_installation_nodegroup = pstrdup(group_name);
MemoryContextSwitchTo(oldcontext);
}
static void set_current_redistribution_nodegroup(const char* group_name)
{
MemoryContext oldcontext =
MemoryContextSwitchTo(THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_OPTIMIZER));
t_thrd.pgxc_cxt.current_redistribution_nodegroup = pstrdup(group_name);
MemoryContextSwitchTo(oldcontext);
}
static int nodeinfo_cmp(const void* a, const void* b)
{
const nodeinfo* node1 = (const nodeinfo*)a;
const nodeinfo* node2 = (const nodeinfo*)b;
if (node1->deleted && !node2->deleted) {
return 1;
}
if (!node1->deleted && node2->deleted) {
return -1;
}
if (node1->old_buckets_num > node2->old_buckets_num) {
return 1;
}
if (node1->old_buckets_num < node2->old_buckets_num) {
return -1;
}
return 0;
}
static void getBucketsFromBucketList(List* bucketlist, int member_count, uint2* bucket_ptr)
{
int bucket_num = list_length(bucketlist);
int* buckets_count = NULL;
ListCell* cell = NULL;
int i = 0;
int max_buckets;
int min_buckets;
if (bucket_num != BUCKETDATALEN)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OPERATION),
errmsg("Get buckets failed.reason:the buckets number(%d) is not correct(%d).",
bucket_num,
BUCKETDATALEN)));
buckets_count = (int*)palloc0(member_count * sizeof(int));
foreach (cell, bucketlist) {
uint2 b = (uint2)intVal(lfirst(cell));
if (b >= member_count)
ereport(
ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), errmsg("Bucket id(%d:%d) out of range.", i, b)));
bucket_ptr[i++] = b;
buckets_count[b]++;
}
Assert(i == BUCKETDATALEN);
max_buckets = min_buckets = buckets_count[0];
for (i = 0; i < member_count; i++) {
if (max_buckets < buckets_count[i]) {
max_buckets = buckets_count[i];
}
if (min_buckets > buckets_count[i]) {
min_buckets = buckets_count[i];
}
if (buckets_count[i] == 0) {
ereport(ERROR, (errcode(ERRCODE_INVALID_OPERATION), errmsg("Node(%d) has no buckets on it.", i)));
}
}
if (max_buckets > min_buckets + 1)
ereport(ERROR,
(errcode(ERRCODE_SYSTEM_ERROR),
errmsg(
"Buckets distribution is not even(max_buckets: %d, min_buckets: %d).", max_buckets, min_buckets)));
pfree(buckets_count);
}
* input:
* rel, old_group, new_nodes
* output:
* bucket_ptr
*/
static void contractBucketMap(Relation rel, HeapTuple old_group, oidvector* new_nodes, uint2* bucket_ptr)
{
bool isNull = false;
int i, j, k;
oidvector* old_nodes = NULL;
text* old_bucket_str = NULL;
uint2* old_bucket_ptr = NULL;
int bktlen = 0;
nodeinfo* node_array = NULL;
bool need_free = false;
old_nodes = get_group_member_ref(old_group, &need_free);
if (old_nodes->dim1 <= new_nodes->dim1)
ereport(
ERROR, (errcode(ERRCODE_INVALID_OPERATION), errmsg("new node group contains more nodes than old group.")));
old_bucket_ptr = (uint2*)palloc0(BUCKETDATALEN * sizeof(uint2));
old_bucket_str = (text*)heap_getattr(old_group, Anum_pgxc_group_buckets, RelationGetDescr(rel), &isNull);
if (isNull)
ereport(ERROR, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("can't get old group buckets.")));
text_to_bktmap(old_bucket_str, old_bucket_ptr, &bktlen);
node_array = (nodeinfo*)palloc0(old_nodes->dim1 * sizeof(nodeinfo));
for (i = 0; i < old_nodes->dim1; i++) {
node_array[i].node_id = old_nodes->values[i];
node_array[i].old_node_index = i;
node_array[i].old_buckets_num = 0;
#ifdef USE_ASSERT_CHECKING
node_array[i].new_buckets_num = 0;
#endif
}
for (i = 0; i < BUCKETDATALEN; i++)
node_array[old_bucket_ptr[i]].old_buckets_num++;
j = 0;
for (i = 0; i < new_nodes->dim1; i++) {
Oid nodeid = new_nodes->values[i];
bool found = false;
while (j < old_nodes->dim1) {
if (nodeid == node_array[j].node_id) {
node_array[j].deleted = false;
node_array[j].new_node_index = i;
found = true;
j++;
break;
} else {
node_array[j].deleted = true;
j++;
}
}
if (!found)
ereport(
ERROR, (errcode(ERRCODE_INVALID_OPERATION), errmsg("new node group contains nodes not in old group.")));
}
while (j < old_nodes->dim1)
node_array[j++].deleted = true;
qsort((void*)node_array, old_nodes->dim1, sizeof(nodeinfo), nodeinfo_cmp);
#ifdef USE_ASSERT_CHECKING
for (i = 0; i < new_nodes->dim1; i++)
Assert(!node_array[i].deleted);
for (i = new_nodes->dim1; i < old_nodes->dim1; i++)
Assert(node_array[i].deleted);
Assert(node_array[0].old_buckets_num + 1 >= node_array[new_nodes->dim1 - 1].old_buckets_num);
#endif
k = 0;
for (i = 0; i < BUCKETDATALEN; i++) {
for (j = 0; j < new_nodes->dim1; j++) {
if (old_bucket_ptr[i] == node_array[j].old_node_index)
break;
}
if (j == new_nodes->dim1) {
bucket_ptr[i] = node_array[k].new_node_index;
#ifdef USE_ASSERT_CHECKING
node_array[k].new_buckets_num++;
#endif
k = (k + 1) % (new_nodes->dim1);
} else {
bucket_ptr[i] = node_array[j].new_node_index;
#ifdef USE_ASSERT_CHECKING
node_array[j].new_buckets_num++;
#endif
}
}
#ifdef USE_ASSERT_CHECKING
k = BUCKETDATALEN / (new_nodes->dim1);
for (i = 0; i < new_nodes->dim1; i++) {
Assert(node_array[i].new_buckets_num == k || node_array[i].new_buckets_num == k + 1);
}
#endif
if (need_free)
pfree_ext(old_nodes);
}
* get_group_kind - Get group_kind filed from heap tuple.
*
*/
static char get_group_kind(HeapTuple tup)
{
bool isNull = true;
bool is_null = true;
Datum group_datum;
group_datum = SysCacheGetAttr(PGXCGROUPOID, tup, Anum_pgxc_group_is_installation, &isNull);
if (!isNull && DatumGetBool(group_datum)) {
return PGXC_GROUPKIND_INSTALLATION;
}
group_datum = SysCacheGetAttr(PGXCGROUPOID, tup, Anum_pgxc_group_kind, &is_null);
if (is_null)
return PGXC_GROUPKIND_NODEGROUP;
if (!isNull && DatumGetChar(group_datum) == PGXC_GROUPKIND_INSTALLATION)
return PGXC_GROUPKIND_NODEGROUP;
return DatumGetChar(group_datum);
}
* get_bucket_string - Get bucket string from bucket array.
*
*/
static char* get_bucket_string(uint2* bucket_ptr)
{
int i;
errno_t rc = 0;
char* bucket_str = NULL;
char one_bucket_str[CHAR_BUF_SIZE] = {0};
bucket_str = (char*)palloc0(BUCKETSTRLEN);
rc = snprintf_truncated_s(bucket_str, BUCKETSTRLEN, "%d", bucket_ptr[0]);
securec_check_ss(rc, "\0", "\0");
for (i = 1; i < BUCKETDATALEN; i++) {
rc = snprintf_s(one_bucket_str, sizeof(one_bucket_str), CHAR_BUF_SIZE - 1, ",%d", bucket_ptr[i]);
securec_check_ss(rc, "\0", "\0");
rc = strncat_s(bucket_str, BUCKETSTRLEN, one_bucket_str, strlen(one_bucket_str));
securec_check(rc, "\0", "\0");
}
return bucket_str;
}
* get_group_member_ref - Get group_member oidvector pointer from heap tuple.
*
* The return value is oidvector pointer, don't need to free.
*/
static oidvector* get_group_member_ref(HeapTuple tup, bool* need_free)
{
Datum oid_nodes_datum;
bool isNull = true;
oidvector* gmember = NULL;
*need_free = false;
oid_nodes_datum = SysCacheGetAttr(PGXCGROUPOID, tup, Anum_pgxc_group_members, &isNull);
if (isNull != false) {
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("group_members is null for tuple %u", HeapTupleGetOid(tup))));
}
gmember = (oidvector*)PG_DETOAST_DATUM(oid_nodes_datum);
if (gmember != (oidvector*)DatumGetPointer(oid_nodes_datum))
*need_free = true;
return gmember;
}
* get_group_member - Get group_member filed from node group.
*
* The return value is oidvector pointer, caller need to free oidvector by pfree.
*/
static oidvector* get_group_member(Oid group_oid)
{
HeapTuple tup;
bool isNull = false;
oidvector* gmember = NULL;
tup = SearchSysCache(PGXCGROUPOID, ObjectIdGetDatum(group_oid), 0, 0, 0);
if (!HeapTupleIsValid(tup))
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("PGXC Group %u: group not defined", group_oid)));
gmember = (oidvector*)PG_DETOAST_DATUM_COPY(SysCacheGetAttr(PGXCGROUPOID, tup, Anum_pgxc_group_members, &isNull));
ReleaseSysCache(tup);
return gmember;
}
* exist_logic_cluster - judge whether pgxc_group has logic cluster.
*
* The rel parameter must be pgxc_group relation.
*/
static bool exist_logic_cluster(Relation rel)
{
char group_kind;
HeapTuple tuple;
TableScanDesc scan;
bool exist = false;
scan = tableam_scan_begin(rel, SnapshotNow, 0, NULL);
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
while (tuple) {
group_kind = get_group_kind(tuple);
if (group_kind == PGXC_GROUPKIND_LCGROUP) {
exist = true;
break;
}
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
}
tableam_scan_end(scan);
return exist;
}
static int cmp_node(Oid p1, Oid p2)
{
NameData node1 = {{0}};
NameData node2 = {{0}};
return strcmp(get_pgxc_nodename(p1, &node1), get_pgxc_nodename(p2, &node2));
}
* exclude_nodes
*
* exclude oids in excluded from oids in nodes.
* oids in nodes and excluded must be in ascending sort.
*
* for example:
* nodes is [1,2,3,4,5,6,7], excluded is [2,4,5,7]
* we should get the result: [1,3,6]
* if some oids in excluded is not in nodes, for example:
* nodes is [1,2,3,4,5,6,7], excluded is [3,9]
* we should get the result: [1,2,4,5,6,7]
* If excluded vector includes nodes vector, return NULL;
*/
static oidvector* exclude_nodes(oidvector* nodes, oidvector* excluded)
{
int i, j, k;
oidvector* result = NULL;
if (nodes == NULL)
return NULL;
if (excluded == NULL)
return buildoidvector(nodes->values, nodes->dim1);
Oid* oid_array = (Oid*)palloc(nodes->dim1 * sizeof(Oid));
for (i = 0, k = 0; i < nodes->dim1; i++) {
for (j = 0; j < excluded->dim1; j++) {
if (nodes->values[i] == excluded->values[j])
break;
}
if (j == excluded->dim1) {
oid_array[k++] = nodes->values[i];
}
}
result = buildoidvector(oid_array, k);
pfree(oid_array);
return result;
}
* oidvector_eq - compare oidvector.
*
* If oid1 == oid2, return true, else return false;
*/
static bool oidvector_eq(oidvector* oid1, oidvector* oid2)
{
int i;
if (oid1->dim1 != oid2->dim1)
return false;
for (i = 0; i < oid1->dim1; i++) {
if (oid1->values[i] != oid2->values[i])
return false;
}
return true;
}
* oidvector_add - Add one oidvector to another.
*
* Caller need to free the return oidvector.
*/
static oidvector* oidvector_add(oidvector* nodes, oidvector* added)
{
oidvector* result = NULL;
errno_t rc = 0;
int count = nodes->dim1 + added->dim1;
Oid* oids = (Oid*)palloc(count * sizeof(Oid));
if (nodes->dim1 > 0) {
rc = memcpy_s(oids, count * sizeof(Oid), nodes->values, nodes->dim1 * sizeof(Oid));
securec_check(rc, "\0", "\0");
}
if (added->dim1 > 0 && nodes->dim1 >= 0) {
rc = memcpy_s(&oids[nodes->dim1], added->dim1 * sizeof(Oid), added->values, added->dim1 * sizeof(Oid));
securec_check(rc, "\0", "\0");
}
oids = SortRelationDistributionNodes(oids, count);
result = buildoidvector(oids, count);
pfree(oids);
return result;
}
* oidvector_remove - Remove one oidvector from another.
*
* Caller need to free the return oidvector.
*/
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Warray-bounds"
static oidvector* oidvector_remove(oidvector* nodes, oidvector* removed)
{
oidvector* result = NULL;
int oidoffset = 0;
Oid* oids = (Oid*)palloc(nodes->dim1 * sizeof(Oid));
for (int i = 0; i < nodes->dim1; i++) {
for (int j = 0; j < removed->dim1; j++) {
if (nodes->values[i] == removed->values[j]) {
nodes->values[i] = InvalidOid;
break;
}
}
if (InvalidOid != nodes->values[i])
oids[oidoffset++] = nodes->values[i];
}
result = buildoidvector(oids, oidoffset);
pfree(oids);
return result;
}
#pragma GCC diagnostic pop
static ExecNodes* create_exec_nodes(oidvector* gmember)
{
ExecNodes* exec_nodes = makeNode(ExecNodes);
exec_nodes->nodeList = GetNodeGroupNodeList(gmember->values, gmember->dim1);
return exec_nodes;
}
static void delete_exec_nodes(ExecNodes* exec_nodes)
{
if (exec_nodes != NULL) {
list_free(exec_nodes->nodeList);
pfree(exec_nodes);
}
}
static void exec_in_datanode(const char* group_name, ExecNodes* exec_nodes, bool create)
{
int rc;
char create_stmt[CHAR_BUF_SIZE];
if (create) {
rc = snprintf_s(create_stmt,
CHAR_BUF_SIZE,
CHAR_BUF_SIZE - 1,
"CREATE NODE GROUP \"%s\" WITH (localhost) VCGROUP;",
group_name);
} else {
rc = snprintf_s(create_stmt, CHAR_BUF_SIZE, CHAR_BUF_SIZE - 1, "DROP NODE GROUP \"%s\";", group_name);
}
securec_check_ss(rc, "\0", "\0");
ExecUtilityStmtOnNodes(create_stmt, exec_nodes, false, false, EXEC_ON_DATANODES, false);
}
static void drop_logic_cluster_in_datanode(const char* group_name, oidvector* gmember)
{
ExecNodes* exec_nodes = create_exec_nodes(gmember);
exec_in_datanode(group_name, exec_nodes, false);
delete_exec_nodes(exec_nodes);
}
static void alter_role_in_datanode(const char* rolname, const char* group_name, ExecNodes* exec_nodes)
{
int rc;
char alter_stmt[CHAR_BUF_SIZE];
rc = snprintf_s(
alter_stmt, CHAR_BUF_SIZE, CHAR_BUF_SIZE - 1, "ALTER ROLE \"%s\" NODE GROUP \"%s\";", rolname, group_name);
securec_check_ss(rc, "\0", "\0");
ExecUtilityStmtOnNodes(alter_stmt, exec_nodes, false, false, EXEC_ON_DATANODES, false);
}
* PgxcCreateVCGroup():
*
* Create logic cluster group
*/
static void PgxcCreateVCGroup(const char* group_name, char group_kind, oidvector* gmember, text* bucket_str)
{
Relation rel;
HeapTuple tuple;
bool nulls[Natts_pgxc_group];
Datum values[Natts_pgxc_group];
errno_t rc;
rel = heap_open(PgxcGroupRelationId, RowExclusiveLock);
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "\0", "\0");
values[Anum_pgxc_group_name - 1] = DirectFunctionCall1(namein, CStringGetDatum(group_name));
values[Anum_pgxc_group_in_redistribution - 1] = CharGetDatum('n');
values[Anum_pgxc_group_kind - 1] = CharGetDatum(group_kind);
values[Anum_pgxc_group_is_installation - 1] = BoolGetDatum(false);
values[Anum_pgxc_group_members - 1] = PointerGetDatum(gmember);
if (bucket_str != NULL) {
values[Anum_pgxc_group_buckets - 1] = PointerGetDatum(bucket_str);
} else {
nulls[Anum_pgxc_group_buckets - 1] = true;
}
nulls[Anum_pgxc_group_group_acl - 1] = true;
tuple = heap_form_tuple(rel->rd_att, values, nulls);
(void)simple_heap_insert(rel, tuple);
CatalogUpdateIndexes(rel, tuple);
heap_freetuple(tuple);
heap_close(rel, RowExclusiveLock);
CommandCounterIncrement();
}
* PgxcChangeGroupName():
*
* Change node group name.
*/
static void PgxcChangeGroupName(Oid group_oid, const char* group_name)
{
HeapTuple tup;
HeapTuple newtuple;
Relation rel;
bool nulls[Natts_pgxc_group];
Datum values[Natts_pgxc_group];
bool replaces[Natts_pgxc_group];
errno_t rc;
rel = heap_open(PgxcGroupRelationId, RowExclusiveLock);
tup = SearchSysCache(PGXCGROUPOID, ObjectIdGetDatum(group_oid), 0, 0, 0);
if (!HeapTupleIsValid(tup))
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("PGXC Group %u: group not defined", group_oid)));
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check(rc, "\0", "\0");
replaces[Anum_pgxc_group_name - 1] = true;
values[Anum_pgxc_group_name - 1] = DirectFunctionCall1(namein, CStringGetDatum(group_name));
newtuple = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces);
simple_heap_update(rel, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(rel, newtuple);
heap_freetuple(newtuple);
ReleaseSysCache(tup);
heap_close(rel, RowExclusiveLock);
CommandCounterIncrement();
}
* PgxcChangeGroupMember():
*
* Change node group node members.
*/
static void PgxcChangeGroupMember(Oid group_oid, oidvector* gmember)
{
HeapTuple tup;
HeapTuple newtuple;
Relation rel;
bool nulls[Natts_pgxc_group];
Datum values[Natts_pgxc_group];
bool replaces[Natts_pgxc_group];
errno_t rc;
rel = heap_open(PgxcGroupRelationId, RowExclusiveLock);
tup = SearchSysCache(PGXCGROUPOID, ObjectIdGetDatum(group_oid), 0, 0, 0);
if (!HeapTupleIsValid(tup))
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("PGXC Group %d: group not defined", group_oid)));
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check(rc, "\0", "\0");
replaces[Anum_pgxc_group_members - 1] = true;
nulls[Anum_pgxc_group_members - 1] = false;
values[Anum_pgxc_group_members - 1] = PointerGetDatum(gmember);
if (get_group_kind(tup) == PGXC_GROUPKIND_INSTALLATION) {
int i;
uint2* bucket_ptr = NULL;
char* bucket_str = NULL;
bucket_ptr = (uint2*)palloc0(BUCKETDATALEN * sizeof(uint2));
for (i = 0; i < BUCKETDATALEN; i++) {
bucket_ptr[i] = i % gmember->dim1;
}
bucket_str = get_bucket_string(bucket_ptr);
replaces[Anum_pgxc_group_buckets - 1] = true;
values[Anum_pgxc_group_buckets - 1] = DirectFunctionCall1(textin, CStringGetDatum(bucket_str));
pfree(bucket_ptr);
pfree(bucket_str);
}
newtuple = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces);
simple_heap_update(rel, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(rel, newtuple);
heap_freetuple(newtuple);
ReleaseSysCache(tup);
heap_close(rel, RowExclusiveLock);
CommandCounterIncrement();
}
* PgxcChangeRedistribution():
*
* Change in_redistribution filed of pgxc_group to 'y'.
*/
void PgxcChangeRedistribution(Oid group_oid, char in_redistribution)
{
HeapTuple tup;
HeapTuple newtuple;
Relation rel;
bool nulls[Natts_pgxc_group];
Datum values[Natts_pgxc_group];
bool replaces[Natts_pgxc_group];
errno_t rc;
rel = heap_open(PgxcGroupRelationId, RowExclusiveLock);
tup = SearchSysCache(PGXCGROUPOID, ObjectIdGetDatum(group_oid), 0, 0, 0);
if (!HeapTupleIsValid(tup))
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("PGXC Group %u: group not defined", group_oid)));
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check(rc, "\0", "\0");
replaces[Anum_pgxc_group_in_redistribution - 1] = true;
nulls[Anum_pgxc_group_in_redistribution - 1] = false;
values[Anum_pgxc_group_in_redistribution - 1] = CharGetDatum(in_redistribution);
newtuple = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces);
simple_heap_update(rel, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(rel, newtuple);
heap_freetuple(newtuple);
ReleaseSysCache(tup);
heap_close(rel, RowExclusiveLock);
CommandCounterIncrement();
}
* get_groupid_from_tuple():
*
* Get node group oid from heap tuple.
*/
static Oid get_groupid_from_tuple(HeapTuple tup)
{
bool isNull = true;
Datum ng_datum;
ng_datum = SysCacheGetAttr(AUTHOID, tup, Anum_pg_authid_rolnodegroup, &isNull);
return isNull ? InvalidOid : DatumGetObjectId(ng_datum);
}
* PgxcGroupGetFirstRoleId():
*
* Scan pg_authid and get the first roleid which rolnodegroup is group_oid.
*/
static Oid PgxcGetFirstRoleId(Oid group_oid)
{
HeapTuple tuple;
Relation auth_rel;
TableScanDesc scan;
Oid roleid;
Form_pg_authid auth;
roleid = InvalidOid;
auth_rel = heap_open(AuthIdRelationId, AccessShareLock);
scan = tableam_scan_begin(auth_rel, SnapshotNow, 0, NULL);
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
while (tuple) {
auth = (Form_pg_authid)GETSTRUCT(tuple);
if (group_oid == get_groupid_from_tuple(tuple)) {
roleid = HeapTupleGetOid(tuple);
break;
}
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
}
tableam_scan_end(scan);
heap_close(auth_rel, AccessShareLock);
return roleid;
}
* PgxcGetGroupRoleList
*
* Get role list that attached to same logic cluster.
*/
static List* PgxcGetGroupRoleList()
{
HeapTuple tuple;
Relation auth_rel;
TableScanDesc scan;
Oid roleid;
Form_pg_authid auth;
List* roles_list = NULL;
auth_rel = heap_open(AuthIdRelationId, AccessShareLock);
scan = tableam_scan_begin(auth_rel, SnapshotNow, 0, NULL);
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
while (tuple) {
auth = (Form_pg_authid)GETSTRUCT(tuple);
if (InvalidOid != get_groupid_from_tuple(tuple)) {
roleid = HeapTupleGetOid(tuple);
roles_list = list_append_unique_oid(roles_list, roleid);
}
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
}
tableam_scan_end(scan);
heap_close(auth_rel, AccessShareLock);
return roles_list;
}
* PgxcGetRelationRoleList
*
* Get role list that attached to same logic cluster and create relations in the logic cluster.
* We get the role list by querying shdepend system table. We don't query pgxc_class
* because we can't visit pgxc_class tables in other databases.
*/
static List* PgxcGetRelationRoleList()
{
HeapTuple tuple;
Relation rel;
TableScanDesc scan;
Form_pg_shdepend shdepend;
Form_pg_authid auth;
Oid roleid;
Oid rpoid;
Datum respoolDatum;
bool isNull = false;
List* roles_list = NULL;
rel = heap_open(SharedDependRelationId, AccessShareLock);
scan = tableam_scan_begin(rel, SnapshotNow, 0, NULL);
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
while (tuple) {
shdepend = (Form_pg_shdepend)GETSTRUCT(tuple);
if ((shdepend->classid != RelationRelationId && shdepend->classid != NamespaceRelationId &&
shdepend->classid != ProcedureRelationId) ||
#ifdef ENABLE_MOT
(shdepend->deptype != SHARED_DEPENDENCY_OWNER &&
shdepend->deptype != SHARED_DEPENDENCY_ACL &&
shdepend->deptype != SHARED_DEPENDENCY_MOT_TABLE)) {
#else
(shdepend->deptype != SHARED_DEPENDENCY_OWNER && shdepend->deptype != SHARED_DEPENDENCY_ACL)) {
#endif
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
continue;
}
roles_list = list_append_unique_oid(roles_list, shdepend->refobjid);
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
}
tableam_scan_end(scan);
heap_close(rel, AccessShareLock);
rel = heap_open(AuthIdRelationId, AccessShareLock);
scan = tableam_scan_begin(rel, SnapshotNow, 0, NULL);
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
while (tuple) {
rpoid = InvalidOid;
isNull = true;
auth = (Form_pg_authid)GETSTRUCT(tuple);
respoolDatum = heap_getattr(tuple, Anum_pg_authid_rolrespool, RelationGetDescr(rel), &isNull);
if (!isNull) {
rpoid = get_resource_pool_oid(DatumGetPointer(respoolDatum));
}
if (OidIsValid(rpoid) && rpoid != DEFAULT_POOL_OID) {
roleid = HeapTupleGetOid(tuple);
roles_list = list_append_unique_oid(roles_list, roleid);
}
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
}
tableam_scan_end(scan);
heap_close(rel, AccessShareLock);
return roles_list;
}
* PgxcChangeUserGroupOid():
*
* Change common users groupOid or installation group oid.
* if group_name is NULL, the users rolnodegroup is changed to empty.
* For admin users(superuser,systemadmin,securityadmin), don't change.
* The function is only executed in CN.
*/
static void PgxcChangeUserGroupOid(List* role_list, const char* group_name, ExecNodes* exec_nodes)
{
HeapTuple rtup;
HeapTuple newtuple;
Relation auth_rel;
Oid roleid;
ListCell* cell = NULL;
Form_pg_authid auth;
Oid nodegroupid;
bool nulls[Natts_pg_authid];
bool replaces[Natts_pg_authid];
Datum values[Natts_pg_authid];
errno_t rc;
Oid group_oid = InvalidOid;
if (role_list == NULL)
return;
if (group_name != NULL)
group_oid = get_pgxc_groupoid(group_name);
auth_rel = heap_open(AuthIdRelationId, RowExclusiveLock);
foreach (cell, role_list) {
roleid = lfirst_oid(cell);
rtup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(roleid));
if (!HeapTupleIsValid(rtup)) {
continue;
}
auth = (Form_pg_authid)GETSTRUCT(rtup);
if (auth->rolsuper || auth->rolsystemadmin || auth->rolcreaterole) {
ReleaseSysCache(rtup);
continue;
}
nodegroupid = get_groupid_from_tuple(rtup);
if (group_oid == nodegroupid) {
ReleaseSysCache(rtup);
continue;
}
if (exec_nodes != NULL) {
if (group_name == NULL) {
alter_role_in_datanode(NameStr(auth->rolname), CNG_OPTION_INSTALLATION, exec_nodes);
} else {
alter_role_in_datanode(NameStr(auth->rolname), group_name, exec_nodes);
}
}
if (OidIsValid(nodegroupid)) {
grantNodeGroupToRole(nodegroupid, roleid, ACL_ALL_RIGHTS_NODEGROUP, false);
}
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check(rc, "\0", "\0");
replaces[Anum_pg_authid_rolnodegroup - 1] = true;
values[Anum_pg_authid_rolnodegroup - 1] = ObjectIdGetDatum(group_oid);
nulls[Anum_pg_authid_rolnodegroup - 1] = (group_oid == InvalidOid);
if (!OidIsValid(group_oid)) {
replaces[Anum_pg_authid_rolkind - 1] = true;
values[Anum_pg_authid_rolkind - 1] = CharGetDatum(ROLKIND_NORMAL);
}
newtuple = heap_modify_tuple(rtup, RelationGetDescr(auth_rel), values, nulls, replaces);
simple_heap_update(auth_rel, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(auth_rel, newtuple);
ReleaseSysCache(rtup);
heap_freetuple(newtuple);
CommandCounterIncrement();
if (OidIsValid(group_oid)) {
grantNodeGroupToRole(group_oid, roleid, ACL_ALL_RIGHTS_NODEGROUP, true);
}
}
heap_close(auth_rel, RowExclusiveLock);
}
* PgxcGetRedisNodes():
*
* Get nodes needed to released to elastic_group when delete the node group that is redistributed.
* The function is used for deleting node group. We need to check the source redistributed group
* and the destination redistributed group.
* For example:
* source group nodes: [1 3 5 8 9]
* destination group nodes: [1 3 5 8 9 10 11]
* if redist_kind is source group, we return empty oids;
* if redist_kind is destination group, we return [10 11];
* another example:
* source group nodes: [1 3 4 6 7]
* destination group nodes: [1 2 6 8 9 10 11]
* if redist_kind is source group, we return [3 4 7];
* if redist_kind is destination group, we return [2 8 9 10 11];
*/
oidvector* PgxcGetRedisNodes(Relation rel, char redist_kind)
{
HeapTuple tuple;
TableScanDesc scan;
char in_redistribution;
int self_index = -1;
oidvector* gmember = NULL;
oidvector* node_oids[2] = {NULL, NULL};
oidvector* result = NULL;
node_oids[0] = NULL;
node_oids[1] = NULL;
scan = tableam_scan_begin(rel, SnapshotNow, 0, NULL);
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
while (tuple) {
bool need_free = false;
gmember = get_group_member_ref(tuple, &need_free);
in_redistribution = ((Form_pgxc_group)GETSTRUCT(tuple))->in_redistribution;
if (in_redistribution == PGXC_REDISTRIBUTION_DST_GROUP && node_oids[0] == NULL) {
node_oids[0] = buildoidvector(gmember->values, gmember->dim1);
if (in_redistribution == redist_kind) {
self_index = 0;
}
} else if (in_redistribution == PGXC_REDISTRIBUTION_SRC_GROUP && node_oids[1] == NULL) {
node_oids[1] = buildoidvector(gmember->values, gmember->dim1);
if (in_redistribution == redist_kind) {
self_index = 1;
}
}
if (need_free)
pfree_ext(gmember);
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
}
tableam_scan_end(scan);
if (self_index == 1) {
oidvector* tmp = node_oids[1];
node_oids[1] = node_oids[0];
node_oids[0] = tmp;
}
if (self_index < 0) {
result = NULL;
} else if (node_oids[1] == NULL) {
return node_oids[0];
} else {
int i, j, k;
int loop_num;
Oid* oid_array = NULL;
oid_array = (Oid*)palloc(node_oids[0]->dim1 * sizeof(Oid));
loop_num = Min(node_oids[0]->dim1, node_oids[1]->dim1);
for (i = 0, j = 0, k = 0; i < loop_num;) {
if (node_oids[1]->values[j] == node_oids[0]->values[i]) {
i++;
j++;
} else if (cmp_node(node_oids[1]->values[j], node_oids[0]->values[i]) > 0) {
oid_array[k++] = node_oids[0]->values[i];
i++;
} else {
j++;
}
}
for (; i < node_oids[0]->dim1; i++) {
oid_array[k++] = node_oids[0]->values[i];
}
if (k > 0) {
result = buildoidvector(oid_array, k);
}
pfree(oid_array);
}
if (node_oids[0] != NULL)
pfree(node_oids[0]);
if (node_oids[1] != NULL)
pfree(node_oids[1]);
return result;
}
* PgxcUpdateRedistSrcGroup():
*
* Set in_redistribution for redistributed source node group.
* We shoule set in_redistribution to 'n'
* We should call the function in final phase of expanding logic cluster.
*/
void PgxcUpdateRedistSrcGroup(Relation rel, oidvector* gmember, text* bucket_str, char* group_name)
{
HeapTuple tuple;
TableScanDesc scan;
bool nulls[Natts_pgxc_group];
Datum values[Natts_pgxc_group];
bool replaces[Natts_pgxc_group];
HeapTuple newtuple;
char in_redistribution;
errno_t rc;
scan = tableam_scan_begin(rel, SnapshotNow, 0, NULL);
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
while (tuple) {
in_redistribution = ((Form_pgxc_group)GETSTRUCT(tuple))->in_redistribution;
if (in_redistribution == PGXC_REDISTRIBUTION_SRC_GROUP) {
break;
}
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
}
if (tuple == NULL) {
tableam_scan_end(scan);
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("Can not find redistributed source group with in_redistribution 'y'.")));
}
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check(rc, "\0", "\0");
replaces[Anum_pgxc_group_in_redistribution - 1] = true;
values[Anum_pgxc_group_in_redistribution - 1] = CharGetDatum(PGXC_NON_REDISTRIBUTION_GROUP);
replaces[Anum_pgxc_group_members - 1] = true;
values[Anum_pgxc_group_members - 1] = PointerGetDatum(gmember);
if (bucket_str != NULL) {
replaces[Anum_pgxc_group_buckets - 1] = true;
values[Anum_pgxc_group_buckets - 1] = PointerGetDatum(bucket_str);
}
newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), values, nulls, replaces);
simple_heap_update(rel, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(rel, newtuple);
heap_freetuple(newtuple);
tableam_scan_end(scan);
}
* PgxcGroupAddNode():
*
* add node group node members.
*/
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Warray-bounds"
void PgxcGroupAddNode(Oid group_oid, Oid nodeid)
{
HeapTuple tup;
oidvector* gmember_ref = NULL;
oidvector* gmember = NULL;
oidvector new_node;
bool need_free = false;
tup = SearchSysCache(PGXCGROUPOID, ObjectIdGetDatum(group_oid), 0, 0, 0);
if (!HeapTupleIsValid(tup))
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("PGXC Group %u: group not defined", group_oid)));
gmember_ref = get_group_member_ref(tup, &need_free);
if (gmember_ref->dim1 == 0) {
gmember = buildoidvector(&nodeid, 1);
} else {
new_node.dim1 = 1;
new_node.ndim = 1;
new_node.values[0] = nodeid;
gmember = oidvector_add(gmember_ref, &new_node);
}
PgxcChangeGroupMember(group_oid, gmember);
ReleaseSysCache(tup);
if (need_free)
pfree_ext(gmember_ref);
pfree_ext(gmember);
}
* PgxcGroupRemoveNode():
*
* Remove node members from node group .
*/
void PgxcGroupRemoveNode(Oid group_oid, Oid nodeid)
{
HeapTuple tup;
oidvector* gmember_ref = NULL;
oidvector* gmember = NULL;
oidvector deleted_node;
bool need_free = false;
tup = SearchSysCache(PGXCGROUPOID, ObjectIdGetDatum(group_oid), 0, 0, 0);
if (!HeapTupleIsValid(tup))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("PGXC Group %s: group not defined", get_pgxc_groupname(group_oid))));
gmember_ref = get_group_member_ref(tup, &need_free);
for (int i = 0; i < gmember_ref->dim1; i++) {
if (nodeid == gmember_ref->values[i]) {
deleted_node.dim1 = 1;
deleted_node.ndim = 1;
deleted_node.values[0] = nodeid;
gmember = oidvector_remove(gmember_ref, &deleted_node);
PgxcChangeGroupMember(group_oid, gmember);
pfree_ext(gmember);
break;
}
}
if (need_free)
pfree_ext(gmember_ref);
ReleaseSysCache(tup);
}
#pragma GCC diagnostic pop
* IsNodeInLogicCluster
*
* Check if node in logic cluster
*/
bool IsNodeInLogicCluster(Oid* oid_array, int count, Oid excluded)
{
Relation relation;
HeapTuple tuple;
TableScanDesc scan;
bool find = false;
Oid group_oid;
oidvector* gmember = NULL;
List* oid_list = NULL;
int i;
relation = heap_open(PgxcGroupRelationId, AccessShareLock);
scan = tableam_scan_begin(relation, SnapshotNow, 0, NULL);
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
while (tuple) {
group_oid = HeapTupleGetOid(tuple);
if (group_oid != excluded && PGXC_GROUPKIND_LCGROUP == get_group_kind(tuple)) {
bool need_free = false;
gmember = get_group_member_ref(tuple, &need_free);
for (i = 0; i < gmember->dim1; i++) {
oid_list = lappend_oid(oid_list, gmember->values[i]);
}
if (need_free)
pfree_ext(gmember);
}
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
}
tableam_scan_end(scan);
heap_close(relation, AccessShareLock);
find = false;
for (i = 0; i < count; i++) {
ListCell* cell = NULL;
foreach (cell, oid_list) {
if (lfirst_oid(cell) == oid_array[i]) {
find = true;
break;
}
}
if (find)
break;
}
list_free(oid_list);
return find;
}
static Acl* PgxcGroupGetAcl(Oid group_oid, Relation rel)
{
Acl* acl = NULL;
bool isNull = false;
HeapTuple tuple = SearchSysCache1(PGXCGROUPOID, ObjectIdGetDatum(group_oid));
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("cache lookup failed for node group with oid %u", group_oid)));
Datum aclDatum = heap_getattr(tuple, Anum_pgxc_group_group_acl, RelationGetDescr(rel), &isNull);
if (!isNull) {
acl = DatumGetAclPCopy(aclDatum);
}
ReleaseSysCache(tuple);
return acl;
}
void PgxcCheckOid(Oid noid, char* node_name)
{
if (!OidIsValid(noid)) {
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("PGXC Node %s: object not defined", node_name)));
}
if (get_pgxc_nodetype(noid) != PGXC_NODE_DATANODE) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("PGXC node %s: only Datanodes can be group members", node_name)));
}
}
* PgxcGroupCreate
*
* Create a PGXC node group
*/
void PgxcGroupCreate(CreateGroupStmt* stmt)
{
const char* group_name = stmt->group_name;
List* nodes = stmt->nodes;
oidvector* nodes_array = NULL;
Oid* inTypes = NULL;
Relation rel;
HeapTuple tup;
bool nulls[Natts_pgxc_group];
Datum values[Natts_pgxc_group];
int member_count = list_length(stmt->nodes);
ListCell* lc = NULL;
int i = 0;
uint2* bucket_ptr = NULL;
char* bucket_str = NULL;
TableScanDesc scan;
HeapTuple tuple;
char group_kind = 'i';
bool is_installation = false;
oidvector* elastic_nodes = NULL;
oidvector* src_group_nodes = NULL;
ExecNodes* exec_nodes = NULL;
bool dummy_nodegroup = false;
Oid src_group_oid = InvalidOid;
Acl* group_acl = NULL;
int len = BUCKETDATALEN * sizeof(uint2);
errno_t rc = 0;
List* tmp_list = NIL;
if (!have_createdb_privilege())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be system admin or createdb role to create cluster node groups")));
if (group_name &&
(!ng_is_valid_group_name(group_name) || pg_strcasecmp(VNG_OPTION_ELASTIC_GROUP, group_name) == 0)) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("NodeGroup name %s can not be preserved group name", group_name)));
}
if (OidIsValid(get_pgxc_groupoid(group_name)))
ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("PGXC Group %s: group already defined", group_name)));
if (stmt->vcgroup && u_sess->attr.attr_storage.enable_data_replicate &&
g_instance.attr.attr_storage.replication_type == RT_WITH_MULTI_STANDBY)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("Don't support logic cluster in multi_standby mode.")));
if (isRestoreMode && list_length(nodes) == 1) {
char* str = strVal(lfirst(list_head(nodes)));
if (strcmp(str, "localhost") == 0) {
dummy_nodegroup = true;
}
}
if (dummy_nodegroup || (IS_PGXC_DATANODE && stmt->vcgroup && !isRestoreMode)) {
rel = heap_open(PgxcGroupRelationId, RowExclusiveLock);
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "\0", "\0");
values[Anum_pgxc_group_name - 1] = DirectFunctionCall1(namein, CStringGetDatum(group_name));
nulls[Anum_pgxc_group_buckets - 1] = true;
nulls[Anum_pgxc_group_members - 1] = true;
nulls[Anum_pgxc_group_group_acl - 1] = true;
values[Anum_pgxc_group_in_redistribution - 1] = CharGetDatum(PGXC_NON_REDISTRIBUTION_GROUP);
values[Anum_pgxc_group_is_installation - 1] = BoolGetDatum(false);
values[Anum_pgxc_group_kind - 1] = CharGetDatum(PGXC_GROUPKIND_LCGROUP);
tup = heap_form_tuple(rel->rd_att, values, nulls);
(void)simple_heap_insert(rel, tup);
CatalogUpdateIndexes(rel, tup);
heap_freetuple(tup);
heap_close(rel, RowExclusiveLock);
CommandCounterIncrement();
CreateNodeGroupInfoInHTAB(group_name);
return;
}
inTypes = (Oid*)palloc(member_count * sizeof(Oid));
foreach (lc, nodes) {
char* node_name = strVal(lfirst(lc));
Oid noid = get_pgxc_nodeoid(node_name);
PgxcCheckOid(noid, node_name);
if (list_member_oid(tmp_list, noid)) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("PGXC node %s is already specified: duplicate Datanodes is not allowed in node-list",
node_name)));
} else
tmp_list = lappend_oid(tmp_list, noid);
inTypes[i] = noid;
i++;
}
inTypes = SortRelationDistributionNodes(inTypes, member_count);
nodes_array = buildoidvector(inTypes, member_count);
for (i = 0; i < Natts_pgxc_group; i++) {
nulls[i] = false;
values[i] = (Datum)0;
}
bucket_ptr = (uint2*)palloc0(len);
rel = heap_open(PgxcGroupRelationId, RowExclusiveLock);
i = 0;
scan = tableam_scan_begin(rel, SnapshotNow, 0, NULL);
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
* No nodegroup found, it is the first node group creation, so generate bucketmap
* in default mode
*/
if (tuple == NULL) {
if (!isRestoreMode) {
* In case of isRestoreMode mode, we don't specified first created node group as
* installation group as it aiming to restore the original node group status and
* is_installation is update later (in pg_dumps).
*/
is_installation = true;
if (stmt->vcgroup) {
tableam_scan_end(scan);
ereport(ERROR,
(errcode(ERRCODE_SYSTEM_ERROR),
errmsg("Can not create logic cluster group, create installation group first.")));
}
}
generateConsistentHashBucketmap(stmt, nodes_array, rel, tuple, bucket_ptr, BUCKETMAP_MODE_DEFAULT);
} else {
group_kind = get_group_kind(tuple);
* There is at least one nodegroup, so we may in cluster capactiy expansion
* and multi-nodegroup case
*/
if (!u_sess->attr.attr_sql.enable_cluster_resize) {
generateConsistentHashBucketmap(stmt, nodes_array, rel, tuple, bucket_ptr, BUCKETMAP_MODE_DEFAULT);
if (group_kind == 'i') {
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
if (tuple) {
group_kind = get_group_kind(tuple);
}
}
} else {
Datum datum;
bool isNull = false;
* In cluster resizing case, we need find previous node group and
* generate bucketmap from that
*/
while (tuple) {
if (stmt->src_group_name) {
const char* name = NameStr(((Form_pgxc_group)GETSTRUCT(tuple))->group_name);
if (strcmp(stmt->src_group_name, name) == 0)
break;
} else {
datum = heap_getattr(tuple, Anum_pgxc_group_is_installation, RelationGetDescr(rel), &isNull);
if (DatumGetBool(datum)) {
* Break as we already find installation node group so generate bucketmap
* for new installation group base on it
*/
break;
}
}
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
if (tuple && group_kind == 'i') {
group_kind = get_group_kind(tuple);
}
}
if (!tuple) {
tableam_scan_end(scan);
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("installation node group or source node group not found")));
}
generateConsistentHashBucketmap(stmt, nodes_array, rel, tuple, bucket_ptr, BUCKETMAP_MODE_REMAP);
}
if ((group_kind == 'n' && stmt->vcgroup) || ((group_kind == 'v' || group_kind == 'e') && !stmt->vcgroup)) {
tableam_scan_end(scan);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Do not support logic cluster in coexistence with common node group!")));
}
}
if (stmt->src_group_name) {
if (!u_sess->attr.attr_common.xc_maintenance_mode) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CREATE NODE GROUP ... DISTRIBUTE FROM can only be executed in maintenance mode.")));
}
if (group_name == NULL) {
ereport(ERROR, (errcode(ERRCODE_UNEXPECTED_NULL_VALUE), errmsg("group_name can not be NULL ")));
}
if (strcmp(stmt->src_group_name, group_name) == 0) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("The node group name can not be same as redistribution node group name!")));
}
src_group_oid = get_pgxc_groupoid(stmt->src_group_name, false);
src_group_nodes = get_group_member(src_group_oid);
if (src_group_nodes == NULL) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_OPERATION),
errmsg("The node group %s has not any nodes.", stmt->src_group_name)));
}
PgxcChangeRedistribution(src_group_oid, PGXC_REDISTRIBUTION_SRC_GROUP);
}
if (stmt->vcgroup) {
oidvector* gmember = NULL;
Oid group_oid = InvalidOid;
bool need_free = false;
char search_group_kind =
(group_kind == PGXC_GROUPKIND_INSTALLATION) ? PGXC_GROUPKIND_INSTALLATION : PGXC_GROUPKING_ELASTIC;
* installation group or elastic group. If group_kind is 'i', we
* need to query installation group; Or we need to query elastic group.
*/
tableam_scan_rescan(scan, 0);
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
while (tuple) {
group_kind = get_group_kind(tuple);
if (group_kind == search_group_kind) {
group_oid = HeapTupleGetOid(tuple);
gmember = get_group_member_ref(tuple, &need_free);
break;
}
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
}
if (IsNodeInLogicCluster(nodes_array->values, nodes_array->dim1, src_group_oid)) {
tableam_scan_end(scan);
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES), errmsg("Some nodes have been allocated to logic cluster!")));
}
if (search_group_kind == PGXC_GROUPKIND_INSTALLATION && !CanPgxcGroupRemove(group_oid, true)) {
tableam_scan_end(scan);
ereport(ERROR,
(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
errmsg("PGXC Group %s: some tables is distributed in installation group,"
"can not create logic cluster.",
group_name)));
}
* a logic cluster used for redistribution.
*/
if (src_group_nodes != NULL) {
oidvector* add_nodes = NULL;
if (oidvector_eq(nodes_array, src_group_nodes)) {
tableam_scan_end(scan);
ereport(
ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("Don't need to expand the node group!")));
}
add_nodes = exclude_nodes(nodes_array, src_group_nodes);
if (add_nodes == NULL) {
elastic_nodes = NULL;
if (gmember != NULL)
elastic_nodes = buildoidvector(gmember->values, gmember->dim1);
} else {
if (gmember != NULL)
elastic_nodes = exclude_nodes(gmember, add_nodes);
pfree(add_nodes);
}
} else {
elastic_nodes = exclude_nodes(gmember, nodes_array);
}
* so we need to create elastic group first.
* If search_group_kind is 'e' means elastic group exists already,
* so we need to remove the nodes from elastic group.
*/
if (search_group_kind == PGXC_GROUPKIND_INSTALLATION) {
PgxcCreateVCGroup(VNG_OPTION_ELASTIC_GROUP, 'e', elastic_nodes, NULL);
CreateNodeGroupInfoInHTAB(VNG_OPTION_ELASTIC_GROUP);
} else {
PgxcChangeGroupMember(group_oid, elastic_nodes);
}
if (in_logic_cluster() && IS_PGXC_COORDINATOR && !IsConnFromCoord()) {
exec_nodes = create_exec_nodes(nodes_array);
}
if (need_free)
pfree_ext(gmember);
}
tableam_scan_end(scan);
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "\0", "\0");
bucket_str = get_bucket_string(bucket_ptr);
values[Anum_pgxc_group_name - 1] = DirectFunctionCall1(namein, CStringGetDatum(group_name));
values[Anum_pgxc_group_buckets - 1] = DirectFunctionCall1(textin, CStringGetDatum(bucket_str));
values[Anum_pgxc_group_members - 1] = PointerGetDatum(nodes_array);
values[Anum_pgxc_group_in_redistribution - 1] = (src_group_nodes == NULL)
? CharGetDatum(PGXC_NON_REDISTRIBUTION_GROUP)
: CharGetDatum(PGXC_REDISTRIBUTION_DST_GROUP);
values[Anum_pgxc_group_is_installation - 1] = BoolGetDatum(is_installation);
if (is_installation == false) {
if (OidIsValid(src_group_oid)) {
group_acl = PgxcGroupGetAcl(src_group_oid, rel);
}
nulls[Anum_pgxc_group_group_acl - 1] = (group_acl == NULL);
values[Anum_pgxc_group_group_acl - 1] = PointerGetDatum(group_acl);
nulls[Anum_pgxc_group_kind - 1] = false;
values[Anum_pgxc_group_kind - 1] =
stmt->vcgroup ? CharGetDatum(PGXC_GROUPKIND_LCGROUP) : CharGetDatum(PGXC_GROUPKIND_NODEGROUP);
} else {
nulls[Anum_pgxc_group_group_acl - 1] = false;
values[Anum_pgxc_group_group_acl - 1] = PointerGetDatum(getAclNodeGroup());
nulls[Anum_pgxc_group_kind - 1] = false;
values[Anum_pgxc_group_kind - 1] = CharGetDatum(PGXC_GROUPKIND_INSTALLATION);
}
tup = heap_form_tuple(rel->rd_att, values, nulls);
(void)simple_heap_insert(rel, tup);
CatalogUpdateIndexes(rel, tup);
heap_freetuple(tup);
heap_close(rel, RowExclusiveLock);
pfree(bucket_ptr);
pfree(bucket_str);
pfree(inTypes);
if (group_acl != NULL)
pfree(group_acl);
if (elastic_nodes != NULL) {
pfree(elastic_nodes);
}
if (src_group_nodes != NULL) {
pfree(src_group_nodes);
}
if (stmt->vcgroup)
CreateNodeGroupInfoInHTAB(group_name);
if (exec_nodes != NULL) {
exec_in_datanode(group_name, exec_nodes, true);
delete_exec_nodes(exec_nodes);
}
return;
}
* PgxcNodeGroupsSetDefault():
*
* Alter a PGXC node group, set node group to default
*/
static void PgxcGroupSetDefault(const char* group_name)
{
Relation relation;
bool nulls[Natts_pgxc_group];
Datum values[Natts_pgxc_group];
bool replaces[Natts_pgxc_group];
HeapTuple tup;
HeapTuple newtuple;
TableScanDesc scan;
errno_t rc;
Oid group_oid = get_pgxc_groupoid(group_name);
if (!OidIsValid(group_oid))
ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("PGXC Group %s: group not defined", group_name)));
tup = SearchSysCache(PGXCGROUPOID, ObjectIdGetDatum(group_oid), 0, 0, 0);
if (!HeapTupleIsValid(tup))
ereport(ERROR, (errcode(ERRCODE_INVALID_OPERATION), errmsg("PGXC Group %s: group not defined", group_name)));
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check(rc, "\0", "\0");
replaces[Anum_pgxc_group_in_redistribution - 1] = true;
values[Anum_pgxc_group_in_redistribution - 1] = CharGetDatum('n');
relation = heap_open(PgxcGroupRelationId, RowExclusiveLock);
newtuple = heap_modify_tuple(tup, RelationGetDescr(relation), values, nulls, replaces);
simple_heap_update(relation, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(relation, newtuple);
heap_freetuple(newtuple);
ReleaseSysCache(tup);
scan = tableam_scan_begin(relation, SnapshotNow, 0, NULL);
while ((tup = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection))) {
if (HeapTupleGetOid(tup) != group_oid) {
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check(rc, "\0", "\0");
replaces[Anum_pgxc_group_in_redistribution - 1] = true;
values[Anum_pgxc_group_in_redistribution - 1] = CharGetDatum('y');
newtuple = heap_modify_tuple(tup, RelationGetDescr(relation), values, nulls, replaces);
simple_heap_update(relation, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(relation, newtuple);
}
}
tableam_scan_end(scan);
heap_close(relation, RowExclusiveLock);
}
* PgxcGroupConvertVCGroup():
*
* Convert installation node group to logic cluster lcname.
*/
static void PgxcGroupConvertVCGroup(const char* group_name, const char* lcname)
{
Oid group_oid;
Relation rel;
HeapTuple tuple;
TableScanDesc scan;
char group_kind;
oidvector* gmember = NULL;
oidvector* gmember_ref = NULL;
text* groupbucket = NULL;
bool isNull = false;
List* role_list = NULL;
oidvector* elasitc_group = NULL;
ExecNodes* exec_nodes = NULL;
bool need_free = false;
rel = heap_open(PgxcGroupRelationId, AccessShareLock);
scan = tableam_scan_begin(rel, SnapshotNow, 0, NULL);
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
if (tuple == NULL) {
ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("PGXC Group %s: group not defined", group_name)));
}
group_kind = get_group_kind(tuple);
if (group_kind != 'i') {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Do not allow to convert installation group to logic cluster "
"when other node group exists.")));
}
if (pg_strcasecmp(group_name, NameStr(((Form_pgxc_group)GETSTRUCT(tuple))->group_name)) != 0) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("PGXC Group %s is not installation group.", group_name)));
}
group_oid = HeapTupleGetOid(tuple);
gmember_ref = get_group_member_ref(tuple, &need_free);
if (gmember_ref->dim1 > 0) {
gmember = buildoidvector(gmember_ref->values, gmember_ref->dim1);
} else {
ereport(ERROR, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("The installation group has no members.")));
}
groupbucket = (text*)heap_getattr(tuple, Anum_pgxc_group_buckets, RelationGetDescr(rel), &isNull);
if (isNull) {
groupbucket = NULL;
}
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
if (tuple) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Do not allow to convert installation group to logic cluster"
"when other node group exists.")));
}
tableam_scan_end(scan);
heap_close(rel, AccessShareLock);
role_list = PgxcGetRelationRoleList();
if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) {
exec_nodes = create_exec_nodes(gmember);
}
elasitc_group = buildoidvector(NULL, 0);
PgxcCreateVCGroup(VNG_OPTION_ELASTIC_GROUP, 'e', elasitc_group, NULL);
pfree_ext(elasitc_group);
CreateNodeGroupInfoInHTAB(VNG_OPTION_ELASTIC_GROUP);
PgxcCreateVCGroup(lcname, 'v', gmember, groupbucket);
if (exec_nodes != NULL) {
exec_in_datanode(lcname, exec_nodes, true);
}
if (role_list != NULL) {
PgxcChangeUserGroupOid(role_list, lcname, exec_nodes);
list_free(role_list);
}
if (exec_nodes != NULL) {
delete_exec_nodes(exec_nodes);
}
CreateNodeGroupInfoInHTAB(lcname);
pfree_ext(gmember);
if (need_free)
pfree_ext(gmember_ref);
}
* PgxcGroupSetVCGroup():
*
* Alter installation node group, move all pgxc tables to logic cluster.
*/
static void PgxcGroupSetVCGroup(const char* group_name, const char* install_name)
{
Oid group_oid;
Relation rel;
HeapTuple tuple;
TableScanDesc scan;
char group_kind;
oidvector* gmember = NULL;
oidvector* gmember_ref = NULL;
text* groupbucket = NULL;
bool isNull = false;
List* role_list = NULL;
oidvector* elasitc_group = NULL;
ExecNodes* exec_nodes = NULL;
bool need_free = false;
rel = heap_open(PgxcGroupRelationId, AccessShareLock);
scan = tableam_scan_begin(rel, SnapshotNow, 0, NULL);
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
if (tuple == NULL) {
ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("PGXC Group %s: group not defined", group_name)));
}
group_kind = get_group_kind(tuple);
if (group_kind != 'i') {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Do not allow to convert installation group to logic cluster "
"when other node group exists.")));
}
if (pg_strcasecmp(group_name, NameStr(((Form_pgxc_group)GETSTRUCT(tuple))->group_name)) != 0) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("PGXC Group %s is not installation group.", group_name)));
}
group_oid = HeapTupleGetOid(tuple);
gmember_ref = get_group_member_ref(tuple, &need_free);
if (gmember_ref->dim1 > 0) {
gmember = buildoidvector(gmember_ref->values, gmember_ref->dim1);
} else {
ereport(ERROR, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("The installation group has no members.")));
}
groupbucket = (text*)heap_getattr(tuple, Anum_pgxc_group_buckets, RelationGetDescr(rel), &isNull);
if (isNull) {
groupbucket = NULL;
}
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
if (tuple) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Do not allow to convert installation group to logic cluster"
"when other node group exists.")));
}
tableam_scan_end(scan);
heap_close(rel, AccessShareLock);
role_list = PgxcGetRelationRoleList();
if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) {
exec_nodes = create_exec_nodes(gmember);
}
elasitc_group = buildoidvector(NULL, 0);
PgxcCreateVCGroup(VNG_OPTION_ELASTIC_GROUP, 'e', elasitc_group, NULL);
pfree_ext(elasitc_group);
CreateNodeGroupInfoInHTAB(VNG_OPTION_ELASTIC_GROUP);
PgxcChangeGroupName(group_oid, install_name);
PgxcCreateVCGroup(group_name, 'v', gmember, groupbucket);
if (exec_nodes != NULL) {
exec_in_datanode(group_name, exec_nodes, true);
}
if (role_list != NULL) {
PgxcChangeUserGroupOid(role_list, group_name, exec_nodes);
list_free(role_list);
}
if (exec_nodes != NULL) {
delete_exec_nodes(exec_nodes);
}
CreateNodeGroupInfoInHTAB(group_name);
pfree_ext(gmember);
if (need_free)
pfree_ext(gmember_ref);
}
* PgxcGroupSetNotVCGroup():
*
* Alter installation node group, convert all logic clusters to common node group.
*/
static void PgxcGroupSetNotVCGroup(const char* group_name)
{
HeapTuple tuple;
HeapTuple newtuple;
Relation rel;
TableScanDesc scan;
char group_kind;
bool nulls[Natts_pgxc_group];
Datum values[Natts_pgxc_group];
bool replaces[Natts_pgxc_group];
errno_t rc;
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check(rc, "\0", "\0");
rel = heap_open(PgxcGroupRelationId, RowExclusiveLock);
scan = tableam_scan_begin(rel, SnapshotNow, 0, NULL);
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
while (tuple) {
group_kind = get_group_kind(tuple);
if (group_kind == 'i') {
if (pg_strcasecmp(group_name, NameStr(((Form_pgxc_group)GETSTRUCT(tuple))->group_name)) != 0) {
heap_close(rel, RowExclusiveLock);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("PGXC Group %s is not installation group.", group_name)));
}
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
continue;
}
if (group_kind == 'n') {
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
continue;
}
if (group_kind == 'e') {
Oid elastic_group_oid = get_pgxc_groupoid(VNG_OPTION_ELASTIC_GROUP, false);
deleteSharedDependencyRecordsFor(PgxcGroupRelationId, elastic_group_oid, 0);
simple_heap_delete(rel, &tuple->t_self);
RemoveNodeGroupInfoInHTAB(VNG_OPTION_ELASTIC_GROUP);
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
continue;
}
RemoveNodeGroupInfoInHTAB(group_name);
if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) {
bool need_free = false;
oidvector* gmember = NULL;
const char* gname = NameStr(((Form_pgxc_group)GETSTRUCT(tuple))->group_name);
gmember = get_group_member_ref(tuple, &need_free);
drop_logic_cluster_in_datanode(gname, gmember);
if (need_free)
pfree_ext(gmember);
}
replaces[Anum_pgxc_group_kind - 1] = true;
values[Anum_pgxc_group_kind - 1] = CharGetDatum('n');
newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), values, nulls, replaces);
simple_heap_update(rel, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(rel, newtuple);
heap_freetuple(newtuple);
tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
}
tableam_scan_end(scan);
heap_close(rel, RowExclusiveLock);
CommandCounterIncrement();
List* role_list = PgxcGetGroupRoleList();
if (role_list != NULL) {
ExecNodes* exec_nodes = makeNode(ExecNodes);
exec_nodes->nodeList = GetAllDataNodes();
PgxcChangeUserGroupOid(role_list, NULL, exec_nodes);
list_free(role_list);
delete_exec_nodes(exec_nodes);
}
}
* PgxcGroupRename():
*
* Rename node group group_name to new_name.
*/
static void PgxcGroupRename(const char* group_name, const char* new_name)
{
Oid group_oid;
if (!ng_is_valid_group_name(new_name) || pg_strcasecmp(VNG_OPTION_ELASTIC_GROUP, new_name) == 0) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("NodeGroup name %s can not be preserved group name", group_name)));
}
group_oid = GetSysCacheOid1(PGXCGROUPNAME, PointerGetDatum(new_name));
if (OidIsValid(group_oid))
ereport(
ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("PGXC Group Name %s has already been used!", new_name)));
group_oid = get_pgxc_groupoid(group_name);
if (!OidIsValid(group_oid))
ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("PGXC Group %s: group not defined", group_name)));
PgxcChangeGroupName(group_oid, new_name);
if (get_pgxc_groupkind(group_oid) != PGXC_GROUPKIND_LCGROUP)
return;
RemoveNodeGroupInfoInHTAB(group_name);
(void)CreateNodeGroupInfoInHTAB(new_name);
if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && in_logic_cluster()) {
int rc;
int nmembers = 0;
Oid* members = NULL;
oidvector* gmember = NULL;
ExecNodes* exec_nodes = NULL;
char alter_stmt[CHAR_BUF_SIZE];
rc = snprintf_s(alter_stmt,
CHAR_BUF_SIZE,
CHAR_BUF_SIZE - 1,
"ALTER NODE GROUP \"%s\" RENAME TO \"%s\";",
group_name,
new_name);
securec_check_ss(rc, "\0", "\0");
nmembers = get_pgxc_groupmembers(group_oid, &members);
Assert(members != NULL);
gmember = buildoidvector(members, nmembers);
pfree(members);
exec_nodes = create_exec_nodes(gmember);
ExecUtilityStmtOnNodes(alter_stmt, exec_nodes, false, false, EXEC_ON_DATANODES, false);
delete_exec_nodes(exec_nodes);
pfree(gmember);
}
}
* PgxcChangeTableGroupName():
*
* Change the pgroup field of pgxc_class.
*/
static void PgxcChangeTableGroupName(const char* group_name, const char* new_name)
{
HeapTuple tup;
HeapTuple newtuple;
Relation rel;
bool nulls[Natts_pgxc_class];
Datum values[Natts_pgxc_class];
bool replaces[Natts_pgxc_class];
char* pgroup = NULL;
TableScanDesc scan;
Datum datum;
bool isNull = false;
errno_t rc;
if (pg_strcasecmp(group_name, new_name) == 0) {
return;
}
if (!ng_is_valid_group_name(new_name) || pg_strcasecmp(VNG_OPTION_ELASTIC_GROUP, new_name) == 0) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("NodeGroup name %s can not be preserved group name", group_name)));
}
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check(rc, "\0", "\0");
nulls[Anum_pgxc_class_option - 1] = true;
replaces[Anum_pgxc_class_pgroup - 1] = true;
values[Anum_pgxc_class_pgroup - 1] = DirectFunctionCall1(namein, CStringGetDatum(new_name));
rel = heap_open(PgxcClassRelationId, RowExclusiveLock);
scan = tableam_scan_begin(rel, SnapshotNow, 0, NULL);
tup = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
while (tup) {
datum = SysCacheGetAttr(PGXCCLASSRELID, tup, Anum_pgxc_class_pgroup, &isNull);
if (isNull) {
tup = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
continue;
}
pgroup = DatumGetCString(datum);
if (pg_strcasecmp(pgroup, group_name) != 0) {
tup = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
continue;
}
newtuple = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces);
simple_heap_update(rel, &tup->t_self, newtuple);
CatalogUpdateIndexes(rel, newtuple);
heap_freetuple(newtuple);
tup = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection);
}
tableam_scan_end(scan);
heap_close(rel, RowExclusiveLock);
CommandCounterIncrement();
}
* PgxcCopyBucketsFromNew():
*
* Copy Buckets From New Node Group
*/
static void PgxcCopyBucketsFromNew(const char* group_name, const char* src_group_name)
{
HeapTuple tup;
HeapTuple srctuple;
Relation relation;
bool nulls[Natts_pgxc_group];
Datum values[Natts_pgxc_group];
bool replaces[Natts_pgxc_group];
errno_t rc;
Oid group_oid = get_pgxc_groupoid(group_name);
Oid src_group_oid = get_pgxc_groupoid(src_group_name);
bool isNull = true;
bool need_free = false;
oidvector* gmember = NULL;
if (IS_PGXC_DATANODE)
return;
relation = heap_open(PgxcGroupRelationId, RowExclusiveLock);
srctuple = SearchSysCache(PGXCGROUPOID, ObjectIdGetDatum(src_group_oid), 0, 0, 0);
if (!HeapTupleIsValid(srctuple))
ereport(
ERROR, (errcode(ERRCODE_INVALID_OPERATION), errmsg("PGXC Group %s: group not defined", src_group_name)));
gmember = get_group_member_ref(srctuple, &need_free);
text* groupbuckests = (text*)heap_getattr(srctuple, Anum_pgxc_group_buckets, RelationGetDescr(relation), &isNull);
if (isNull) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("PGXC Group %s' buckets can not be null", src_group_name)));
}
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "\0", "\0");
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check(rc, "\0", "\0");
replaces[Anum_pgxc_group_members - 1] = true;
values[Anum_pgxc_group_members - 1] = PointerGetDatum(gmember);
replaces[Anum_pgxc_group_buckets - 1] = true;
values[Anum_pgxc_group_buckets - 1] = PointerGetDatum(groupbuckests);
tup = SearchSysCache(PGXCGROUPOID, ObjectIdGetDatum(group_oid), 0, 0, 0);
if (!HeapTupleIsValid(tup))
ereport(ERROR, (errcode(ERRCODE_INVALID_OPERATION), errmsg("PGXC Group %s: group not defined", group_name)));
HeapTuple newtuple = heap_modify_tuple(tup, RelationGetDescr(relation), values, nulls, replaces);
simple_heap_update(relation, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(relation, newtuple);
heap_freetuple(newtuple);
ReleaseSysCache(tup);
ReleaseSysCache(srctuple);
heap_close(relation, NoLock);
}
* PgxcGroupModifyRedisAttr
*
* modify group attr in pgxc_group
* but now just modify in_redistribution and is_installation attr
*/
static void PgxcGroupModifyRedisAttr(const char* group_name, Form_pgxc_group groupattr)
{
Relation relation;
HeapTuple tuple;
HeapTuple newtuple;
bool nulls[Natts_pgxc_group];
Datum values[Natts_pgxc_group];
bool replaces[Natts_pgxc_group];
errno_t rc;
Oid group_oid = get_pgxc_groupoid(group_name);
if (!OidIsValid(group_oid)) {
if (IS_PGXC_DATANODE)
return;
ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("PGXC Group %s: group not defined", group_name)));
}
relation = heap_open(PgxcGroupRelationId, RowExclusiveLock);
tuple = SearchSysCache(PGXCGROUPOID, ObjectIdGetDatum(group_oid), 0, 0, 0);
if (!HeapTupleIsValid(tuple))
ereport(ERROR, (errcode(ERRCODE_INVALID_OPERATION), errmsg("PGXC Group %s: group not defined", group_name)));
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check(rc, "\0", "\0");
replaces[Anum_pgxc_group_in_redistribution - 1] = true;
values[Anum_pgxc_group_in_redistribution - 1] = CharGetDatum(groupattr->in_redistribution);
replaces[Anum_pgxc_group_is_installation - 1] = true;
values[Anum_pgxc_group_is_installation - 1] = BoolGetDatum(groupattr->is_installation);
newtuple = heap_modify_tuple(tuple, RelationGetDescr(relation), values, nulls, replaces);
simple_heap_update(relation, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(relation, newtuple);
ReleaseSysCache(tuple);
heap_freetuple(newtuple);
heap_close(relation, RowExclusiveLock);
}
#ifdef ENABLE_MULTIPLE_NODES
static void CreateRemoteSeqs(const char* queryString, List* uuids, ExecNodes* excluded_nodes)
{
char* msg = NULL;
char* uuidinfo = nodeToString(uuids);
AssembleHybridMessage(&msg, queryString, uuidinfo);
List* all_nodes = GetAllDataNodes();
List* exec_nodes = list_difference_int(all_nodes, excluded_nodes->nodeList);
RemoteQuery* step = makeNode(RemoteQuery);
step->combine_type = COMBINE_TYPE_SAME;
step->sql_statement = (char*)msg;
step->exec_type = EXEC_ON_DATANODES;
step->is_temp = false;
step->exec_nodes = makeNode(ExecNodes);
step->exec_nodes->nodeList = exec_nodes;
list_free(all_nodes);
ExecRemoteUtility(step);
list_free(step->exec_nodes->nodeList);
pfree_ext(step->exec_nodes);
pfree_ext(step);
pfree_ext(msg);
pfree_ext(uuidinfo);
}
static void DropRemoteSeqs(char* queryString, ExecNodes* excluded_nodes)
{
List* all_nodes = GetAllDataNodes();
List* exec_nodes = list_difference_int(all_nodes, excluded_nodes->nodeList);
RemoteQuery* step = makeNode(RemoteQuery);
step->combine_type = COMBINE_TYPE_SAME;
step->sql_statement = (char*)queryString;
step->exec_type = EXEC_ON_DATANODES;
step->is_temp = false;
step->exec_nodes = makeNode(ExecNodes);
step->exec_nodes->nodeList = exec_nodes;
list_free(all_nodes);
ExecRemoteUtility(step);
list_free(step->exec_nodes->nodeList);
pfree_ext(step->exec_nodes);
pfree_ext(step);
}
* PgxcGroupSetSeqNodes():
*
* sync sequences in nodegroup to all datanodes or only current nodegroup.
* If group_name is installation nodegroup, sync all sequences in all other nodegroups.
* If not, sync all sequences in this nodegroup.
* If allnodes is true, sync sequences to all datanodes;
* If allnodes is false, delete sequences in nodes that don't belong to this nodegroup.
*/
static void PgxcGroupSetSeqNodes(const char* group_name, bool allnodes)
{
Relation depRel;
TableScanDesc scan;
HeapTuple tup;
const char* seqName = NULL;
const char* nspName = NULL;
const char* relName = NULL;
const char* schName = NULL;
Oid nspoid;
Oid relowner;
char query[512];
errno_t rc = EOK;
StringInfoData str;
List* uuids = NIL;
Oid* members = NULL;
ExecNodes* exec_nodes = NULL;
int nmembers;
Oid group_oid;
int seq_count = 0;
str.data = NULL;
char* installationGroup = PgxcGroupGetInstallationGroup();
if (installationGroup != NULL && strcmp(installationGroup, group_name) == 0) {
char* name = NULL;
Relation pgxc_group_rel = heap_open(PgxcGroupRelationId, AccessShareLock);
scan = tableam_scan_begin(pgxc_group_rel, SnapshotNow, 0, NULL);
while ((tup = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection)) != NULL) {
name = NameStr(((Form_pgxc_group)GETSTRUCT(tup))->group_name);
if (strcmp(name, group_name) == 0)
continue;
PgxcGroupSetSeqNodes(name, allnodes);
}
tableam_scan_end(scan);
heap_close(pgxc_group_rel, AccessShareLock);
return;
}
if (strcmp(group_name, VNG_OPTION_ELASTIC_GROUP) == 0)
return;
group_oid = get_pgxc_groupoid(group_name);
if (!OidIsValid(group_oid)) {
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("PGXC Group %s: group not defined", group_name)));
}
nmembers = get_pgxc_groupmembers(group_oid, &members);
if (nmembers == u_sess->pgxc_cxt.NumDataNodes) {
pfree_ext(members);
return;
}
exec_nodes = makeNode(ExecNodes);
exec_nodes->nodeList = GetNodeGroupNodeList(members, nmembers);
pfree_ext(members);
depRel = heap_open(DependRelationId, AccessShareLock);
scan = tableam_scan_begin(depRel, SnapshotNow, 0, NULL);
while (HeapTupleIsValid(tup = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection))) {
Form_pg_depend deprec = (Form_pg_depend)GETSTRUCT(tup);
if (deprec->classid == RelationRelationId && deprec->refclassid == RelationRelationId &&
deprec->objsubid == 0 && deprec->refobjsubid != 0 && deprec->deptype == DEPENDENCY_AUTO &&
get_rel_relkind(deprec->objid) == RELKIND_SEQUENCE) {
if (get_pgxc_class_groupoid(deprec->refobjid) != group_oid)
continue;
Relation relseq = relation_open(deprec->objid, AccessShareLock);
HeapTuple tp = SearchSysCache1(RELOID, ObjectIdGetDatum(deprec->objid));
if (!HeapTupleIsValid(tp)) {
ereport(ERROR,
(errcode(ERRCODE_CACHE_LOOKUP_FAILED),
errmsg("cache lookup failed for relation %u", deprec->objid)));
}
Form_pg_class reltup = (Form_pg_class)GETSTRUCT(tp);
relName = NameStr(reltup->relname);
nspoid = reltup->relnamespace;
relowner = reltup->relowner;
schName = get_namespace_name(nspoid);
seqName = quote_identifier(relName);
nspName = quote_identifier(schName);
if (str.data == NULL) {
initStringInfo(&str);
}
if (allnodes) {
int64 uuid;
int64 start;
int64 increment;
int64 maxvalue;
int64 minvalue;
int64 cachevalue;
bool cycle = false;
get_sequence_params(relseq, &uuid, &start, &increment, &maxvalue, &minvalue, &cachevalue, &cycle);
Const* n = makeConst(INT8OID, -1, InvalidOid, sizeof(int64), Int64GetDatum(uuid), false, true);
uuids = lappend(uuids, n);
rc = snprintf_s(query,
sizeof(query),
sizeof(query) - 1,
"CREATE SEQUENCE %s.%s START WITH %ld INCREMENT BY %ld "
"MAXVALUE %ld MINVALUE %ld CACHE %ld %s;",
nspName,
seqName,
start,
increment,
maxvalue,
minvalue,
cachevalue,
cycle ? "CACHE" : " ");
securec_check_ss(rc, "\0", "\0");
appendStringInfoString(&str, query);
if (relowner != GetUserId()) {
char* ownerName = GetUserNameFromId(relowner);
rc = snprintf_s(query,
sizeof(query),
sizeof(query) - 1,
"ALTER SEQUENCE %s.%s OWNER TO %s;",
nspName,
seqName,
ownerName);
securec_check_ss(rc, "\0", "\0");
appendStringInfoString(&str, query);
pfree_ext(ownerName);
}
} else {
rc = snprintf_s(query,
sizeof(query),
sizeof(query) - 1,
"DROP SEQUENCE IF EXISTS %s.%s CASCADE;",
nspName,
seqName);
securec_check_ss(rc, "\0", "\0");
appendStringInfoString(&str, query);
}
ReleaseSysCache(tp);
relation_close(relseq, AccessShareLock);
if (seqName != relName)
pfree_ext(seqName);
if (nspName != schName)
pfree_ext(nspName);
pfree_ext(schName);
seq_count++;
if (seq_count >= 65536) {
seq_count = 0;
if (allnodes) {
CreateRemoteSeqs(str.data, uuids, exec_nodes);
list_free_deep(uuids);
uuids = NIL;
} else {
DropRemoteSeqs(str.data, exec_nodes);
}
pfree_ext(str.data);
}
}
}
tableam_scan_end(scan);
heap_close(depRel, AccessShareLock);
if (str.data != NULL) {
if (allnodes) {
CreateRemoteSeqs(str.data, uuids, exec_nodes);
list_free_deep(uuids);
uuids = NIL;
} else {
DropRemoteSeqs(str.data, exec_nodes);
}
pfree_ext(str.data);
}
delete_exec_nodes(exec_nodes);
}
#endif
* PgxcGroupResize()
*
* modify attr of pgxc_group for cluster resize
*/
void PgxcGroupResize(const char* src_group_name, const char* dest_group_name)
{
if (in_logic_cluster()) {
ereport(
ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("Resize is not supported in logical cluster mode")));
}
Oid group_oid = get_pgxc_groupoid(dest_group_name);
if (!OidIsValid(group_oid)) {
ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("PGXC Group %s: group not defined", dest_group_name)));
}
AclResult aclresult = pg_nodegroup_aclcheck(group_oid, GetUserId(), ACL_ALTER);
if (aclresult != ACLCHECK_OK && !have_createdb_privilege()) {
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must have sysadmin or createdb or alter privilege to resize cluster node groups")));
}
Form_pgxc_group groupattr = (Form_pgxc_group)palloc(sizeof(*groupattr));
groupattr->in_redistribution = PGXC_REDISTRIBUTION_SRC_GROUP;
groupattr->is_installation = true;
PgxcGroupModifyRedisAttr(src_group_name, groupattr);
groupattr->in_redistribution = PGXC_REDISTRIBUTION_DST_GROUP;
groupattr->is_installation = false;
PgxcGroupModifyRedisAttr(dest_group_name, groupattr);
pfree(groupattr);
}
* PgxcGroup_Resizing()
*
* judge if in physical cluster resize
*/
bool PgxcGroup_Resizing()
{
Relation relation;
HeapTuple tuple;
TableScanDesc scan;
char in_redistribution;
bool isfind = false;
if (in_logic_cluster())
return false;
relation = heap_open(PgxcGroupRelationId, RowExclusiveLock);
scan = tableam_scan_begin(relation, SnapshotNow, 0, NULL);
while ((tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection)) != NULL) {
in_redistribution = ((Form_pgxc_group)GETSTRUCT(tuple))->in_redistribution;
if (in_redistribution == PGXC_REDISTRIBUTION_DST_GROUP) {
isfind = true;
break;
}
}
tableam_scan_end(scan);
heap_close(relation, RowExclusiveLock);
return isfind;
}
* PgxcGroupResizeComplete()
*
* modify destgroup's attr in pgxc_group before we delete srcgroup
*/
void PgxcGroupResizeComplete()
{
Relation relation;
HeapTuple tuple;
HeapTuple newtuple;
TableScanDesc scan;
char in_redistribution;
bool nulls[Natts_pgxc_group];
Datum values[Natts_pgxc_group];
bool replaces[Natts_pgxc_group];
errno_t rc;
relation = heap_open(PgxcGroupRelationId, RowExclusiveLock);
scan = tableam_scan_begin(relation, SnapshotNow, 0, NULL);
while ((tuple = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection)) != NULL) {
in_redistribution = ((Form_pgxc_group)GETSTRUCT(tuple))->in_redistribution;
if (in_redistribution == PGXC_REDISTRIBUTION_DST_GROUP) {
break;
}
}
if (!HeapTupleIsValid(tuple))
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("No destgroup in resize process")));
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check(rc, "\0", "\0");
replaces[Anum_pgxc_group_in_redistribution - 1] = true;
values[Anum_pgxc_group_in_redistribution - 1] = CharGetDatum(PGXC_NON_REDISTRIBUTION_GROUP);
replaces[Anum_pgxc_group_kind - 1] = true;
values[Anum_pgxc_group_kind - 1] = CharGetDatum(PGXC_GROUPKIND_INSTALLATION);
replaces[Anum_pgxc_group_is_installation - 1] = true;
values[Anum_pgxc_group_is_installation - 1] = BoolGetDatum(true);
newtuple = heap_modify_tuple(tuple, RelationGetDescr(relation), values, nulls, replaces);
simple_heap_update(relation, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(relation, newtuple);
tableam_scan_end(scan);
heap_freetuple(newtuple);
heap_close(relation, RowExclusiveLock);
}
* PgxcGroupAddNodes():
*
* add some nodes to nodegroup.
*/
static void PgxcGroupAddNodes(const char* group_name, List* nodes)
{
Oid group_oid;
HeapTuple tup;
oidvector* gmember = NULL;
ListCell* cell = NULL;
char group_kind;
int count;
int node_count;
Oid* oids = NULL;
errno_t rc;
bool need_free = false;
if (!ng_is_valid_group_name(group_name)) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("NodeGroup name %s is invalid.", group_name)));
}
group_oid = get_pgxc_groupoid(group_name);
tup = SearchSysCache(PGXCGROUPOID, ObjectIdGetDatum(group_oid), 0, 0, 0);
if (!HeapTupleIsValid(tup))
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("PGXC Group %d: group not defined", group_oid)));
group_kind = get_group_kind(tup);
if (group_kind == PGXC_GROUPKIND_INSTALLATION) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("PGXC GROUP %s: do not support add nodes to installation node group.", group_name)));
} else if (group_kind != PGXC_GROUPKING_ELASTIC) {
elog(WARNING,
"Change nodelist of NodeGroup directly maybe result in data inconsistency,"
"confirm this is what you want.");
}
gmember = get_group_member_ref(tup, &need_free);
Assert(gmember != NULL);
count = gmember->dim1 + nodes->length;
node_count = gmember->dim1;
oids = (Oid*)palloc(count * sizeof(Oid));
if (gmember->dim1 > 0) {
rc = memcpy_s(oids, count * sizeof(Oid), gmember->values, gmember->dim1 * sizeof(Oid));
securec_check(rc, "\0", "\0");
}
ReleaseSysCache(tup);
if (need_free)
pfree_ext(gmember);
foreach (cell, nodes) {
int i;
char* node_name = strVal(lfirst(cell));
Oid noid = get_pgxc_nodeoid(node_name);
PgxcCheckOid(noid, node_name);
for (i = 0; i < node_count; i++) {
if (noid == oids[i]) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("PGXC node %s: already exist in node group %s or duplicate in nodelist.",
node_name,
group_name)));
}
}
oids[node_count++] = noid;
}
oids = SortRelationDistributionNodes(oids, node_count);
gmember = buildoidvector(oids, node_count);
pfree_ext(oids);
PgxcChangeGroupMember(group_oid, gmember);
pfree_ext(gmember);
}
* PgxcGroupDeleteNodes():
*
* delete some nodes to nodegroup.
*/
static void PgxcGroupDeleteNodes(const char* group_name, List* nodes)
{
Oid group_oid;
HeapTuple tup;
oidvector* gmember = NULL;
ListCell* cell = NULL;
char group_kind;
int node_count;
Oid* oids = NULL;
bool need_free = false;
errno_t rc;
if (!ng_is_valid_group_name(group_name)) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("NodeGroup name %s is invalid.", group_name)));
}
group_oid = get_pgxc_groupoid(group_name);
tup = SearchSysCache(PGXCGROUPOID, ObjectIdGetDatum(group_oid), 0, 0, 0);
if (!HeapTupleIsValid(tup))
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("PGXC Group %d: group not defined", group_oid)));
group_kind = get_group_kind(tup);
if (group_kind == PGXC_GROUPKIND_INSTALLATION) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("PGXC GROUP %s: do not support delete nodes from installation node group.", group_name)));
} else if (group_kind != PGXC_GROUPKING_ELASTIC) {
elog(WARNING,
"Change nodelist of NodeGroup directly maybe result in data inconsistency,"
"confirm this is what you want.");
}
gmember = get_group_member_ref(tup, &need_free);
if (gmember->dim1 > 0) {
oids = (Oid*)palloc(gmember->dim1 * sizeof(Oid));
rc = memcpy_s(oids, gmember->dim1 * sizeof(Oid), gmember->values, gmember->dim1 * sizeof(Oid));
securec_check(rc, "\0", "\0");
}
node_count = gmember->dim1;
ReleaseSysCache(tup);
if (need_free)
pfree_ext(gmember);
foreach (cell, nodes) {
int i;
bool find_node = false;
char* node_name = strVal(lfirst(cell));
Oid noid = get_pgxc_nodeoid(node_name);
PgxcCheckOid(noid, node_name);
for (i = 0; i < node_count; i++) {
Assert(oids != NULL);
if (noid == oids[i]) {
oids[i] = InvalidOid;
find_node = true;
continue;
}
if (find_node) {
oids[i - 1] = oids[i];
}
}
if (!find_node)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("PGXC node %s: does not exist in node group %s or duplicate in nodelist.",
node_name,
group_name)));
node_count--;
}
if (node_count == 0 && group_kind != PGXC_GROUPKING_ELASTIC) {
ereport(
ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("PGXC group %s: can not delete all nodes.", group_name)));
}
gmember = buildoidvector(oids, node_count);
if (oids != NULL)
pfree_ext(oids);
PgxcChangeGroupMember(group_oid, gmember);
pfree_ext(gmember);
}
#ifdef ENABLE_MULTIPLE_NODES
* PgxcNodeGroupsAlter():
*
* Alter a PGXC node group
*/
void PgxcGroupAlter(AlterGroupStmt* stmt)
{
Oid group_oid = get_pgxc_groupoid(stmt->group_name);
if (!OidIsValid(group_oid)) {
ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("PGXC Group %s: group not defined", stmt->group_name)));
}
AclResult aclresult = pg_nodegroup_aclcheck(group_oid, GetUserId(), ACL_ALTER);
if (aclresult != ACLCHECK_OK && !have_createdb_privilege()) {
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must have sysadmin or createdb or alter privilege to alter cluster node groups")));
}
if (!u_sess->attr.attr_common.xc_maintenance_mode && stmt->alter_type != AG_SET_DEFAULT) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Alter node group can only be executed in maintenance mode.")));
}
switch (stmt->alter_type) {
case AG_SET_DEFAULT:
PgxcGroupSetDefault(stmt->group_name);
break;
case AG_SET_VCGROUP:
PgxcGroupSetVCGroup(stmt->group_name, stmt->install_name);
break;
case AG_SET_NOT_VCGROUP:
PgxcGroupSetNotVCGroup(stmt->group_name);
break;
case AG_SET_RENAME:
PgxcGroupRename(stmt->group_name, stmt->install_name);
break;
case AG_SET_TABLE_GROUP:
PgxcChangeTableGroupName(stmt->group_name, stmt->install_name);
break;
case AG_SET_BUCKETS:
PgxcCopyBucketsFromNew(stmt->group_name, stmt->install_name);
break;
case AG_ADD_NODES:
PgxcGroupAddNodes(stmt->group_name, stmt->nodes);
break;
case AG_DELETE_NODES:
PgxcGroupDeleteNodes(stmt->group_name, stmt->nodes);
break;
case AG_RESIZE_GROUP:
PgxcGroupResize(stmt->group_name, stmt->install_name);
break;
case AG_CONVERT_VCGROUP:
PgxcGroupConvertVCGroup(stmt->group_name, stmt->install_name);
break;
case AG_SET_SEQ_ALLNODES:
PgxcGroupSetSeqNodes(stmt->group_name, true);
break;
case AG_SET_SEQ_SELFNODES:
PgxcGroupSetSeqNodes(stmt->group_name, false);
break;
default:
break;
}
}
#endif
* PgxcNodeGroupsRemove():
*
* Remove a PGXC node group
*/
void PgxcGroupRemove(DropGroupStmt* stmt)
{
Relation relation;
HeapTuple tup;
const char* group_name = stmt->group_name;
Oid group_oid = get_pgxc_groupoid(group_name);
char group_kind;
char in_redistribution;
oidvector* node_oids = NULL;
ExecNodes* exec_nodes = NULL;
bool ignore_pmk = false;
if (!OidIsValid(group_oid)) {
if (IS_PGXC_DATANODE)
return;
ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("PGXC Group %s: group not defined", group_name)));
}
AclResult aclresult = pg_nodegroup_aclcheck(group_oid, GetUserId(), ACL_DROP);
if (aclresult != ACLCHECK_OK && !have_createdb_privilege()) {
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must have sysadmin or createdb or drop privilege to remove cluster node groups")));
}
if (group_name && !ng_is_valid_group_name(group_name)) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("NodeGroup name %s can not be preserved group name", group_name)));
}
if (IS_PGXC_DATANODE) {
relation = heap_open(PgxcGroupRelationId, RowExclusiveLock);
tup = SearchSysCache(PGXCGROUPOID, ObjectIdGetDatum(group_oid), 0, 0, 0);
if (!HeapTupleIsValid(tup))
ereport(
ERROR, (errcode(ERRCODE_INVALID_OPERATION), errmsg("PGXC Group %s: group not defined", group_name)));
simple_heap_delete(relation, &tup->t_self);
ReleaseSysCache(tup);
deleteSharedDependencyRecordsFor(PgxcGroupRelationId, group_oid, 0);
heap_close(relation, NoLock);
CommandCounterIncrement();
RemoveNodeGroupInfoInHTAB(group_name);
return;
}
char* group = PgxcGroupGetInRedistributionGroup();
if (PgxcGroup_Resizing() && group && (0 == strcmp(group_name, group))) {
PgxcGroupResizeComplete();
}
ignore_pmk = in_logic_cluster();
if (!CanPgxcGroupRemove(group_oid, ignore_pmk)) {
ereport(ERROR,
(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
errmsg("cannot drop '%s' because other objects depend on it", group_name)));
}
relation = heap_open(PgxcGroupRelationId, RowExclusiveLock);
tup = SearchSysCache(PGXCGROUPOID, ObjectIdGetDatum(group_oid), 0, 0, 0);
if (!HeapTupleIsValid(tup))
ereport(ERROR, (errcode(ERRCODE_INVALID_OPERATION), errmsg("PGXC Group %s: group not defined", group_name)));
in_redistribution = ((Form_pgxc_group)GETSTRUCT(tup))->in_redistribution;
if (stmt->src_group_name != NULL) {
if (!in_logic_cluster()) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Do not support distribute from clause for non-logic cluster group!")));
}
if (!u_sess->attr.attr_common.xc_maintenance_mode) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("DROP NODE GROUP ... DISTRIBUTE FROM can only be executed in maintenance mode.")));
}
if (in_redistribution != PGXC_REDISTRIBUTION_DST_GROUP) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("PGXC Group %s: group is not redistributed "
"distination group, can not be dropped",
group_name)));
}
}
* But if we are redistributing the logic group or the whole cluster,
* we can drop installation group.
*/
group_kind = get_group_kind(tup);
if (group_kind == PGXC_GROUPKIND_INSTALLATION && in_redistribution == PGXC_NON_REDISTRIBUTION_GROUP) {
ereport(ERROR,
(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
errmsg("PGXC Group %s: group is installation group, can not be dropped", group_name)));
} else if (group_kind == PGXC_GROUPKING_ELASTIC) {
if (exist_logic_cluster(relation)) {
ereport(ERROR,
(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
errmsg(
"PGXC Group %s: cannot drop elastic group because other logic clusters exist.", group_name)));
}
} else if (group_kind == PGXC_GROUPKIND_LCGROUP) {
bool isNull = true;
oidvector* gmember = NULL;
Oid roleid;
bool need_free = false;
roleid = PgxcGetFirstRoleId(group_oid);
if (roleid != InvalidOid) {
ereport(ERROR,
(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
errmsg("cannot drop '%s' because at least one role %u depend on it", group_name, roleid)));
}
gmember = get_group_member_ref(tup, &need_free);
if (in_redistribution == PGXC_REDISTRIBUTION_DST_GROUP || in_redistribution == PGXC_REDISTRIBUTION_SRC_GROUP) {
if (stmt->src_group_name != NULL) {
if (stmt->to_elastic_group)
node_oids = PgxcGetRedisNodes(relation, PGXC_REDISTRIBUTION_SRC_GROUP);
if (gmember->dim1 > 0) {
text* groupbucket =
(text*)heap_getattr(tup, Anum_pgxc_group_buckets, RelationGetDescr(relation), &isNull);
if (isNull)
groupbucket = NULL;
PgxcUpdateRedistSrcGroup(relation, gmember, groupbucket, stmt->src_group_name);
}
} else if (stmt->to_elastic_group) {
node_oids = PgxcGetRedisNodes(relation, in_redistribution);
}
} else {
if (gmember->dim1 > 0 && stmt->to_elastic_group) {
node_oids = buildoidvector(gmember->values, gmember->dim1);
}
}
if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && in_logic_cluster()) {
exec_nodes = create_exec_nodes(gmember);
}
if (need_free)
pfree_ext(gmember);
}
simple_heap_delete(relation, &tup->t_self);
ReleaseSysCache(tup);
* Removethe cache entry from BucketMapCache
*
* Note: As cache entry removal is not rollback-able, we still do cache remove here,
* because becache the bucketmap-cache mechanism use a lazy-setup behavior to make
* its persistance, so even in case of cache-remove but rollback transaction, the later
* cache-read can bring the cache content back.
*
* So, it is safe to do cache remove here anyway
*/
BucketMapCacheRemoveEntry(group_oid);
deleteSharedDependencyRecordsFor(PgxcGroupRelationId, group_oid, 0);
if (group_kind == PGXC_GROUPKIND_LCGROUP && node_oids != NULL) {
bool nulls[Natts_pgxc_group];
Datum values[Natts_pgxc_group];
bool replaces[Natts_pgxc_group];
HeapTuple newtuple;
oidvector* gmember = NULL;
oidvector* elastic_nodes = NULL;
errno_t rc;
bool need_free = false;
tup = SearchSysCache1(PGXCGROUPNAME, CStringGetDatum(VNG_OPTION_ELASTIC_GROUP));
if (!HeapTupleIsValid(tup))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("PGXC Group %s: group not defined", VNG_OPTION_ELASTIC_GROUP)));
gmember = get_group_member_ref(tup, &need_free);
elastic_nodes = node_oids;
if (gmember->dim1 > 0) {
elastic_nodes = oidvector_add(gmember, node_oids);
pfree(node_oids);
}
rc = memset_s(values, sizeof(values), 0, sizeof(values));
securec_check(rc, "\0", "\0");
rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "\0", "\0");
rc = memset_s(replaces, sizeof(replaces), false, sizeof(replaces));
securec_check(rc, "\0", "\0");
replaces[Anum_pgxc_group_members - 1] = true;
values[Anum_pgxc_group_members - 1] = PointerGetDatum(elastic_nodes);
newtuple = heap_modify_tuple(tup, RelationGetDescr(relation), values, nulls, replaces);
simple_heap_update(relation, &newtuple->t_self, newtuple);
CatalogUpdateIndexes(relation, newtuple);
heap_freetuple(newtuple);
ReleaseSysCache(tup);
pfree(elastic_nodes);
if (need_free)
pfree_ext(gmember);
}
* relation close with NoLock to let nodegroup-removal process hold the lock longer (until
* xact commit/abort) to prevent other concurrent TO-GROUP create table request(with ShareLock)
* run into concurrent update error.
*/
heap_close(relation, NoLock);
if (stmt->src_group_name == NULL) {
RemoveNodeGroupInfoInHTAB(group_name);
}
if (exec_nodes != NULL) {
exec_in_datanode(group_name, exec_nodes, false);
delete_exec_nodes(exec_nodes);
}
}
* PgxcGroupGetFirstLogicCluster()
*
* Return first logic cluster.caller need to free the return value
*/
char* PgxcGroupGetFirstLogicCluster()
{
char* group_name = NULL;
Relation pgxc_group_rel = NULL;
SysScanDesc scan;
ScanKeyData skey[1];
HeapTuple tup = NULL;
char group_kind;
pgxc_group_rel = heap_open(PgxcGroupRelationId, AccessShareLock);
if (!pgxc_group_rel) {
ereport(ERROR, (errcode(ERRCODE_INVALID_OPERATION), errmsg("can not open pgxc_group")));
}
ScanKeyInit(&skey[0], ObjectIdAttributeNumber, BTGreaterStrategyNumber, F_OIDGT, ObjectIdGetDatum(0));
scan = systable_beginscan(pgxc_group_rel, PgxcGroupOidIndexId, true, NULL, 1, skey);
while ((tup = systable_getnext(scan)) != NULL) {
group_kind = get_group_kind(tup);
if (group_kind == 'v') {
group_name = pstrdup(NameStr(((Form_pgxc_group)GETSTRUCT(tup))->group_name));
break;
}
}
systable_endscan(scan);
heap_close(pgxc_group_rel, AccessShareLock);
return group_name;
}
* IsLogicClusterRedistributed()
*
* Judge whether logic cluster with name group_name is redistributed.
*/
bool IsLogicClusterRedistributed(const char* group_name)
{
if (in_logic_cluster()) {
const char* redist_group_name = PgxcGroupGetInRedistributionGroup();
if (redist_group_name && strcmp(group_name, redist_group_name) == 0)
return true;
}
return false;
}
* PgxcGroupGetCurrentLogicCluster()
*
* Get current logic cluster name, caller need to free the string memory.
*/
char* PgxcGroupGetCurrentLogicCluster()
{
char* group_name = NULL;
if (!in_logic_cluster()) {
if (strcmp(u_sess->attr.attr_sql.default_storage_nodegroup, INSTALLATION_MODE) == 0) {
group_name = PgxcGroupGetInstallationGroup();
if (group_name != NULL)
group_name = pstrdup(group_name);
} else {
group_name = pstrdup(u_sess->attr.attr_sql.default_storage_nodegroup);
}
} else {
group_name = (char*)get_current_lcgroup_name();
if (group_name != NULL)
group_name = pstrdup(group_name);
* If group_name is null, the user is superuser or not attached to nodegroup.
* For superuser, we allow create tables in default storage nodegroup;
*/
if (group_name == NULL && superuser()) {
if (strcmp(u_sess->attr.attr_sql.default_storage_nodegroup, INSTALLATION_MODE) == 0) {
group_name = PgxcGroupGetFirstLogicCluster();
} else {
Oid group_oid = get_pgxc_groupoid(u_sess->attr.attr_sql.default_storage_nodegroup);
if (!OidIsValid(group_oid))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("default_storage_nodegroup %s not defined.",
u_sess->attr.attr_sql.default_storage_nodegroup)));
char group_kind = get_pgxc_groupkind(group_oid);
if (group_kind == PGXC_GROUPKIND_INSTALLATION) {
group_name = PgxcGroupGetFirstLogicCluster();
} else {
group_name = pstrdup(u_sess->attr.attr_sql.default_storage_nodegroup);
}
}
}
if (group_name != NULL && IsLogicClusterRedistributed(group_name)) {
char* exec_group = PgxcGroupGetStmtExecGroupInRedis();
if (exec_group != NULL) {
pfree(group_name);
group_name = exec_group;
}
}
}
return group_name;
}
* PgxcGroupGetRedistDestGroupOid()
*
*/
Oid PgxcGroupGetRedistDestGroupOid()
{
Relation pgxc_group_rel = NULL;
TableScanDesc scan;
HeapTuple tup = NULL;
Datum datum;
bool isNull = false;
Oid group_oid = InvalidOid;
if (!isRestoreMode && IS_PGXC_DATANODE) {
return InvalidOid;
}
PgxcOpenGroupRelation(&pgxc_group_rel);
scan = tableam_scan_begin(pgxc_group_rel, SnapshotNow, 0, NULL);
while ((tup = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection)) != NULL) {
datum = heap_getattr(tup, Anum_pgxc_group_in_redistribution, RelationGetDescr(pgxc_group_rel), &isNull);
if (PGXC_REDISTRIBUTION_DST_GROUP == DatumGetChar(datum)) {
group_oid = HeapTupleGetOid(tup);
break;
}
}
tableam_scan_end(scan);
heap_close(pgxc_group_rel, AccessShareLock);
return group_oid;
}
*
* During the redistribution process
* determine create table on new or old cluster
*/
char* PgxcGroupGetStmtExecGroupInRedis()
{
Relation pgxc_group_rel = NULL;
TableScanDesc scan;
HeapTuple tup = NULL;
bool isNull = false;
Datum datum;
char* group_name = NULL;
oidvector* destgmember = NULL;
oidvector* srcgmember = NULL;
char* destgname = NULL;
char* srcgname = NULL;
bool need_free_dst = false;
bool need_free_src = false;
if (!isRestoreMode && IS_PGXC_DATANODE) {
return NULL;
}
PgxcOpenGroupRelation(&pgxc_group_rel);
scan = tableam_scan_begin(pgxc_group_rel, SnapshotNow, 0, NULL);
while ((tup = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection)) != NULL) {
datum = heap_getattr(tup, Anum_pgxc_group_in_redistribution, RelationGetDescr(pgxc_group_rel), &isNull);
if (PGXC_REDISTRIBUTION_DST_GROUP == DatumGetChar(datum)) {
if (destgmember != NULL && strcmp(destgname, NameStr(((Form_pgxc_group)GETSTRUCT(tup))->group_name)) != 0) {
ereport(
ERROR, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("more than one node group in_redistribution='t'")));
}
destgmember = get_group_member_ref(tup, &need_free_dst);
destgname = NameStr(((Form_pgxc_group)GETSTRUCT(tup))->group_name);
} else if (PGXC_REDISTRIBUTION_SRC_GROUP == DatumGetChar(datum)) {
if (srcgmember != NULL && strcmp(srcgname, NameStr(((Form_pgxc_group)GETSTRUCT(tup))->group_name)) != 0) {
ereport(
ERROR, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("more than one node group in_redistribution='y'")));
}
srcgmember = get_group_member_ref(tup, &need_free_src);
srcgname = NameStr(((Form_pgxc_group)GETSTRUCT(tup))->group_name);
}
if (destgmember != NULL && srcgmember != NULL) {
if (oidvector_eq(destgmember, srcgmember))
group_name = pstrdup((const char*)srcgname);
else
group_name = pstrdup((const char*)destgname);
break;
} else if (srcgmember != NULL) {
group_name = pstrdup((const char*)srcgname);
}
}
tableam_scan_end(scan);
heap_close(pgxc_group_rel, AccessShareLock);
if (need_free_dst)
pfree_ext(destgmember);
if (need_free_src)
pfree_ext(srcgmember);
return group_name;
}
* Name: BucketMapCacheAddEntry()
*
* Brief: add a bucketmap cache element into global BucketMapCache
*
* Parameters:
* @in groupoid: bucket cache entry key
* @in groupanme_datum: the datum of pgxc_group.groupname
* @in bucketmap_datum: the datum of pgxc_group.bucketmap
*
* Return: void
*/
static void BucketMapCacheAddEntry(Oid groupoid, Datum groupanme_datum, Datum bucketmap_datum, ItemPointer ctid)
{
MemoryContext oldcontext = MemoryContextSwitchTo(LocalGBucketMapMemCxt());
BucketMapCache* bmc = (BucketMapCache*)palloc0(sizeof(BucketMapCache));
bmc->groupoid = groupoid;
bmc->groupname = (char*)palloc0(NAMEDATALEN);
errno_t rc = strcpy_s(bmc->groupname, NAMEDATALEN, (const char*)DatumGetCString(groupanme_datum));
securec_check(rc, "\0", "\0");
bmc->bucketmap = (uint2*)palloc0(BUCKETDATALEN * sizeof(uint2));
ItemPointerCopy(ctid, &bmc->ctid);
text_to_bktmap((text*)bucketmap_datum, bmc->bucketmap, &bmc->bucketcnt);
elog(DEBUG2, "Add [%s][%u]'s bucketmap to BucketMapCache", bmc->groupname, groupoid);
AppendLocalRelCacheGBucketMapCache((ListCell *)bmc);
MemoryContextSwitchTo(oldcontext);
}
* Name: BucketMapCacheRemoveEntry()
*
* Brief: remove a bucketmap cache element into global BucketMapCache
*
* Parameters:
* @in groupoid: bucket cache entry key that needs to be removed
*
* Return: void
*/
static void BucketMapCacheRemoveEntry(Oid groupoid)
{
if (LocalRelCacheGBucketMapCache() == NIL) {
* We may run into here, when a new node group is drop but no table access
* of its table happened.
*/
elog(LOG, "Remove element from BucketMapCache with oid %u, but BucketMapCache is not setup yet", groupoid);
return;
}
ListCell* cell = NULL;
ListCell* prev = NULL;
ListCell* next = NULL;
bool found = false;
for (cell = list_head(LocalRelCacheGBucketMapCache()); cell; cell = next) {
BucketMapCache* bmc = (BucketMapCache*)lfirst(cell);
next = lnext(cell);
if (bmc->groupoid == groupoid) {
elog(LOG, "Remove [%s][%u]'s bucketmap From BucketMapCache", bmc->groupname, groupoid);
found = true;
pfree_ext(bmc->bucketmap);
pfree_ext(bmc->groupname);
DeteleLocalRelCacheGBucketMapCache(cell, prev);
pfree_ext(bmc);
break;
} else {
prev = cell;
}
}
if (!found) {
elog(LOG, "Try to remove bucketmap cache entry with group oid %u, but it is not found", groupoid);
}
return;
}
* Name: BucketMapCacheUpdate()
*
* Brief: update bucketmap for given groupoid in the global BucketMapCache
*
* Parameters:
* @in bmc: BucketMapCache that needs to be update
*
* Return: void
*/
static void BucketMapCacheUpdate(BucketMapCache* bmc)
{
bool isNull = true;
errno_t rc = 0;
Relation rel = heap_open(PgxcGroupRelationId, AccessShareLock);
HeapTuple htup = SearchSysCache(PGXCGROUPOID, ObjectIdGetDatum(bmc->groupoid), 0, 0, 0);
if (!HeapTupleIsValid(htup)) {
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("Bucketmap is not found with given groupoid %u", bmc->groupoid)));
}
Datum groupname_datum = heap_getattr(htup, Anum_pgxc_group_name, RelationGetDescr(rel), &isNull);
Assert(!isNull);
Datum bucketmap_datum = heap_getattr(htup, Anum_pgxc_group_buckets, RelationGetDescr(rel), &isNull);
Assert(!isNull);
elog(LOG, "Update [%s][%u]'s bucketmap in BucketMapCache", bmc->groupname, bmc->groupoid);
ItemPointerCopy(&(htup->t_self), &bmc->ctid);
rc = strcpy_s(bmc->groupname, NAMEDATALEN, (const char*)DatumGetCString(groupname_datum));
securec_check(rc, "\0", "\0");
text_to_bktmap((text*)bucketmap_datum, bmc->bucketmap, &bmc->bucketcnt);
ReleaseSysCache(htup);
heap_close(rel, AccessShareLock);
}
* Name: ClearInvalidBucketMapCache()
*
* Brief: clear invalid cell in LocalRelCacheGBucketMapCache
*
* Parameters:
* none
*
* Return:
* none
*/
static void ClearInvalidBucketMapCache(void)
{
ListCell* cell = NULL;
ListCell* next = NULL;
for (cell = list_head(LocalRelCacheGBucketMapCache()); cell; cell = next) {
next = lnext(cell);
BucketMapCache* bmc = (BucketMapCache*)lfirst(cell);
HeapTuple tuple = SearchSysCache1(PGXCGROUPOID, ObjectIdGetDatum(bmc->groupoid));
if (!HeapTupleIsValid(tuple)) {
BucketMapCacheRemoveEntry(bmc->groupoid);
continue;
}
ReleaseSysCache(tuple);
}
if ((unsigned int)list_length(LocalRelCacheGBucketMapCache()) >= LocalRelCacheMaxBucketMapSize()) {
EnlargeLocalRelCacheMaxBucketMapSize(2);
}
}
* Name: BucketMapCacheDestroy()
*
* Brief: destory the bucketmap cache
*
* Parameters:
* none
*
* Return:
* none
*/
void BucketMapCacheDestroy(void)
{
* In current BuckeMapCache implementation, destroy is not necessary to implement as
* it persists with its belonging gaussdb session.
*/
Assert(false);
}
* Name: BucketMapCacheGetBucketmap(Oid)
*
* Brief: fetch bucketmap content with given groupoid, and populate the bucketmap cache
* if we do not found in BucketMapCache
*
* Parameters:
* @in groupoid: groupoid of bucketmap that need to fetch
*
* Return:
* bucketmap that is found
*/
uint2* BucketMapCacheGetBucketmap(Oid groupoid, int *bucketlen)
{
Assert(groupoid != InvalidOid);
if (LocalRelCacheGBucketMapCache() == NIL) {
elog(DEBUG2, "Global bucketmap cache is not setup");
}
if ((unsigned int)list_length(LocalRelCacheGBucketMapCache()) >= LocalRelCacheMaxBucketMapSize()) {
ClearInvalidBucketMapCache();
}
while (LocalRelCacheMaxBucketMapSize() / 2 >
(unsigned int)list_length(LocalRelCacheGBucketMapCache()) &&
LocalRelCacheMaxBucketMapSize() / 2 > BUCKET_MAP_SIZE) {
EnlargeLocalRelCacheMaxBucketMapSize(0.5);
}
ListCell* cell = NULL;
uint2* bucketmap = NULL;
foreach (cell, LocalRelCacheGBucketMapCache()) {
BucketMapCache* bmc = (BucketMapCache*)lfirst(cell);
if (bmc->groupoid == groupoid) {
* Note: Here, we still have to check if given groupoid could be found from
* systable and verify it must not be updated, as BucketMapCache is thread
* local and not transactional protected, for example "group1" is setup in
* BucketMapCache but "group1" is dropped or its bucketmap content changed in
* another gaussdb session, in this case we have to remove it from BucketMapCache
* and try to rebuild it.
*
* Here using a light-weight check to verify the ctid of pgxc_group's tuple
*/
HeapTuple tup = SearchSysCache1(PGXCGROUPOID, ObjectIdGetDatum(groupoid));
if (!HeapTupleIsValid(tup)) {
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("cache lookup failed on node group %u", groupoid)));
}
if (unlikely(ItemPointerCompare(&(bmc->ctid), &(tup->t_self)) != 0)) {
BucketMapCacheUpdate(bmc);
}
bucketmap = bmc->bucketmap;
*bucketlen = bmc->bucketcnt;
ReleaseSysCache(tup);
break;
}
}
if (bucketmap == NULL) {
bool isNull = true;
Relation rel = heap_open(PgxcGroupRelationId, AccessShareLock);
HeapTuple htup = SearchSysCache(PGXCGROUPOID, ObjectIdGetDatum(groupoid), 0, 0, 0);
if (!HeapTupleIsValid(htup)) {
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("Bucketmap is not found with given groupoid %u", groupoid)));
}
Datum groupname_datum = heap_getattr(htup, Anum_pgxc_group_name, RelationGetDescr(rel), &isNull);
* Assert to check groupname is not null as group_name in pgxc_group is defined
* as NOT NULL constraint.
*/
Assert(groupoid != InvalidOid && !isNull);
Datum bucketmap_datum = heap_getattr(htup, Anum_pgxc_group_buckets, RelationGetDescr(rel), &isNull);
if (!isNull) {
BucketMapCacheAddEntry(groupoid, groupname_datum, bucketmap_datum, &(htup->t_self));
}
ReleaseSysCache(htup);
heap_close(rel, AccessShareLock);
bucketmap = BucketMapCacheGetBucketmap(groupoid, bucketlen);
Assert(bucketmap != NULL);
}
return bucketmap;
}
* Name: BucketMapCacheGetBucketmap(groupname)
*
* Brief: similar with its oid version but the input is the groupname
*
* Parameters:
* @in groupname: name of group whose bucketmap that need to fetch
*
* Return:
* bucketmap that is found
*/
uint2* BucketMapCacheGetBucketmap(const char* groupname, int *bucketlen)
{
Assert(groupname != NULL);
Oid groupoid = get_pgxc_groupoid(groupname, false );
return BucketMapCacheGetBucketmap(groupoid, bucketlen);
}
* PgxcGroupGetInstallationGroup()
*
* Return installation group to caller
*/
char* PgxcGroupGetInstallationGroup()
{
char* group_name = NULL;
char* installation_node_group = NULL;
Relation pgxc_group_rel = NULL;
TableScanDesc scan;
HeapTuple tup = NULL;
Datum datum;
bool isNull = false;
int group_count = 0;
oidvector* gmember = NULL;
if (!isRestoreMode && IS_PGXC_DATANODE) {
return NULL;
}
* If current installation node group is catched, return it directly instead of
* fetch it from pgxc_group.
*/
if (t_thrd.pgxc_cxt.current_installation_nodegroup != NULL) {
return t_thrd.pgxc_cxt.current_installation_nodegroup;
}
PgxcOpenGroupRelation(&pgxc_group_rel);
scan = tableam_scan_begin(pgxc_group_rel, SnapshotNow, 0, NULL);
while ((tup = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection)) != NULL) {
bool need_free = false;
gmember = get_group_member_ref(tup, &need_free);
group_count++;
{
Datum group_name_datum = heap_getattr(tup, Anum_pgxc_group_name, RelationGetDescr(pgxc_group_rel), &isNull);
group_name = (char*)pstrdup((const char*)DatumGetCString(group_name_datum));
}
TupleDesc tupleDesc = RelationGetDescr(pgxc_group_rel);
if (tupleDesc->natts >= Anum_pgxc_group_is_installation) {
datum = heap_getattr(tup, Anum_pgxc_group_is_installation, tupleDesc, &isNull);
} else {
datum = BoolGetDatum(true);
}
if (DatumGetBool(datum)) {
installation_node_group = group_name;
break;
}
if (need_free)
pfree_ext(gmember);
}
tableam_scan_end(scan);
heap_close(pgxc_group_rel, AccessShareLock);
* If installation node group is not found, we are returning the only nodegroup already
* defined in pgxc_group as installation nodegroup to maximumly *compatible* with existing
* none multi-nodegroup system
*/
if (installation_node_group == NULL && group_count == 1) {
set_current_installation_nodegroup(group_name);
pfree(group_name);
return t_thrd.pgxc_cxt.current_installation_nodegroup;
}
if (installation_node_group != NULL) {
set_current_installation_nodegroup(installation_node_group);
pfree(installation_node_group);
return t_thrd.pgxc_cxt.current_installation_nodegroup;
}
return NULL;
}
* Brief: generate consistant-hash bucketmap for new installation nodegroup, this case
* happens in cluster expansion(shunk)
*
* Input
* @stmt: NodeGroup creation handler
* @nodes_array: oid array of new node group
* @pgxc_group_rel: relation handler of pgxc_group, opened in PgxcGroupCreate
* @tuple: tuple of pgxc_group of old installation group, its bucketmap is used
* as input for new bucket generation
* @bucket_prt: output paremeter for new generate node group
*/
static void generateConsistentHashBucketmap(CreateGroupStmt* stmt, oidvector* nodes_array, Relation pgxc_group_rel,
HeapTuple tuple, uint2* bucket_ptr, int bucketmap_mode)
{
int member_count = list_length(stmt->nodes);
int i = 0;
int2 *node_count_array = NULL;
int2 *node_count_bucket = NULL;
int2 *new_node_array = NULL;
int skip_bucket, target_bucket, tmp_skip_bucket;
uint2* old_bucket_map = NULL;
int old_bucket_cnt = 0;
int j, new_member_count, old_member_count, b_node, b_nnode;
int len = BUCKETDATALEN * sizeof(uint2);
text* groupbucket = NULL;
bool isNull = false;
bool need_free = false;
oidvector* gmember = NULL;
errno_t rc = 0;
if (stmt->buckets != NIL)
getBucketsFromBucketList(stmt->buckets, member_count, bucket_ptr);
else {
if (bucketmap_mode == BUCKETMAP_MODE_DEFAULT) {
for (i = 0; i < BUCKETDATALEN; i++) {
bucket_ptr[i] = i % member_count;
}
return;
}
gmember = get_group_member_ref(tuple, &need_free);
old_member_count = gmember->dim1;
new_member_count = member_count - old_member_count;
if (new_member_count == 0) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_OPERATION),
errmsg("new node group must contain different number of nodes with before!")));
} else if (new_member_count < 0) {
contractBucketMap(pgxc_group_rel, tuple, nodes_array, bucket_ptr);
} else {
node_count_array = (int2*)palloc(old_member_count * sizeof(int2));
skip_bucket = member_count;
target_bucket = BUCKETDATALEN / member_count;
old_bucket_map = (uint2*)palloc0(len);
groupbucket =
(text*)heap_getattr(tuple, Anum_pgxc_group_buckets, RelationGetDescr(pgxc_group_rel), &isNull);
if (!isNull) {
text_to_bktmap(groupbucket, old_bucket_map, &old_bucket_cnt);
}
new_node_array = (int2*)palloc(new_member_count * sizeof(int2));
rc = memcpy_s(bucket_ptr, len, old_bucket_map, len);
securec_check(rc, "\0", "\0");
for (i = 0; i < new_member_count; i++)
new_node_array[i] = i + old_member_count;
node_count_bucket = (int2*)palloc0(member_count * sizeof(int2));
for (i = 0; i < BUCKETDATALEN; i++) {
Assert(bucket_ptr[i] < member_count);
node_count_bucket[bucket_ptr[i]]++;
}
for (j = 0; j < new_member_count; j++) {
b_nnode = new_node_array[j];
tmp_skip_bucket = skip_bucket;
while (node_count_bucket[b_nnode] < target_bucket) {
for (i = 0; i < old_member_count; i++)
node_count_array[i] = tmp_skip_bucket;
for (i = 0; i < BUCKETDATALEN; i++) {
if (bucket_ptr[i] < old_member_count) {
b_node =
old_bucket_map[i];
if (node_count_array[b_node] == tmp_skip_bucket &&
node_count_bucket[b_node] > target_bucket) {
bucket_ptr[i] = b_nnode;
node_count_array[b_node] = 1;
node_count_bucket[b_node]--;
node_count_bucket[b_nnode]++;
if (node_count_bucket[b_nnode] >= target_bucket)
break;
} else
node_count_array[b_node]++;
}
}
if ((tmp_skip_bucket - 1) < 1)
tmp_skip_bucket = 1;
else
tmp_skip_bucket--;
}
}
tmp_skip_bucket = 0;
for (j = 0; j < member_count; j++) {
b_node = 0;
while (node_count_bucket[j] > target_bucket + 1) {
for (i = b_node; i < BUCKETDATALEN; i++) {
if (bucket_ptr[i] == j) {
b_node = i;
for (b_nnode = tmp_skip_bucket; b_nnode < member_count; b_nnode++) {
if (node_count_bucket[b_nnode] == target_bucket) {
bucket_ptr[i] = b_nnode;
node_count_bucket[b_nnode]++;
tmp_skip_bucket = b_nnode;
break;
}
}
break;
}
}
node_count_bucket[j]--;
}
}
pfree(node_count_bucket);
pfree(node_count_array);
pfree(new_node_array);
pfree(old_bucket_map);
}
}
if (need_free)
pfree_ext(gmember);
}
* CanPgxcGroupRemove
* Check whether the group is empty(no table exists)
*
* -input:
* @group_name: target nodegroup that to check whether we can go ahead to remove
*
* -return:
* @true: empty, we can go to remove nodegroup
* @false: not empty node group contains tables
*/
bool CanPgxcGroupRemove(Oid group_oid, bool ignore_pmk)
{
char cmd[CHAR_BUF_SIZE];
char tbs_cmd[CHAR_BUF_SIZE];
int val = 0;
int rc = 0;
char* database_name = NULL;
Relation pg_database_rel = NULL;
TableScanDesc scan;
HeapTuple tup = NULL;
Datum datum;
bool isNull = false;
bool result = true;
if (ignore_pmk) {
rc = snprintf_s(cmd,
CHAR_BUF_SIZE,
CHAR_BUF_SIZE - 1,
"select count(*) from pgxc_class "
"JOIN pg_class c ON pcrelid = c.oid "
"JOIN pgxc_group ON pgroup = group_name "
"LEFT JOIN pg_namespace n ON n.oid = c.relnamespace "
"where pgxc_group.oid=%u "
"AND n.nspname<>'pmk';",
group_oid);
} else {
rc = snprintf_s(cmd,
CHAR_BUF_SIZE,
CHAR_BUF_SIZE - 1,
"select count(*) from pgxc_class, pgxc_group "
"where pgroup=group_name and pgxc_group.oid=%u;",
group_oid);
}
securec_check_ss(rc, "\0", "\0");
pg_database_rel = heap_open(DatabaseRelationId, AccessShareLock);
scan = tableam_scan_begin(pg_database_rel, SnapshotNow, 0, NULL);
while ((tup = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection)) != NULL) {
datum = heap_getattr(tup, Anum_pg_database_datname, RelationGetDescr(pg_database_rel), &isNull);
Assert(!isNull);
database_name = (char*)pstrdup((const char*)DatumGetCString(datum));
if (strcmp(database_name, "template0") == 0) {
continue;
}
val = GetTableCountByDatabase(database_name, cmd);
if (val > 0) {
result = false;
if (ignore_pmk) {
rc = snprintf_s(tbs_cmd,
CHAR_BUF_SIZE,
CHAR_BUF_SIZE - 1,
"select n.nspname,c.relname from pgxc_class "
"JOIN pg_class c ON pcrelid = c.oid "
"JOIN pgxc_group ON pgroup = group_name "
"LEFT JOIN pg_namespace n ON n.oid = c.relnamespace "
"where pgxc_group.oid=%u "
"AND n.nspname<>'pmk';",
group_oid);
} else {
rc = snprintf_s(tbs_cmd,
CHAR_BUF_SIZE,
CHAR_BUF_SIZE - 1,
"select n.nspname,c.relname from pgxc_class "
"JOIN pg_class c ON pcrelid = c.oid "
"JOIN pgxc_group ON pgroup = group_name "
"LEFT JOIN pg_namespace n ON n.oid = c.relnamespace "
"where pgxc_group.oid=%u;",
group_oid);
}
securec_check_ss(rc, "\0", "\0");
GetTablesByDatabase(database_name, tbs_cmd, strlen(tbs_cmd));
}
}
tableam_scan_end(scan);
heap_close(pg_database_rel, AccessShareLock);
return result;
}
* GetTableCountByDatabase()
*
* Get the count of table in a database
*/
int GetTableCountByDatabase(const char* database_name, char* query)
{
char conninfo[CHAR_BUF_SIZE];
PGconn* pgconn = NULL;
int ret;
PGresult* res = NULL;
* As we will lock pg_database with AccessShareLock twice when connect other databases
* to verify whether the to-be dropped node group is empty, if there is AccessExclusiveLock
* (e.g. vacuum full xxdb) happens between them, server may run into a deadlock scenario,
* so we have to add a timeout here to prevent that happens.
*/
ret = snprintf_s(conninfo,
sizeof(conninfo),
sizeof(conninfo) - 1,
"dbname=%s port=%d connect_timeout=60 options='-c xc_maintenance_mode=on'",
database_name,
g_instance.attr.attr_network.PostPortNumber);
securec_check_ss_c(ret, "\0", "\0");
pgconn = PQconnectdb(conninfo);
if (PQstatus(pgconn) != CONNECTION_OK) {
ereport(ERROR,
(errcode(ERRCODE_SYSTEM_ERROR), errmsg("Connection to database failed: %s", PQerrorMessage(pgconn))));
}
res = PQexec(pgconn, query);
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
ereport(ERROR,
(errcode(ERRCODE_SYSTEM_ERROR), errmsg("execute statement: %s failed: %s", query, PQerrorMessage(pgconn))));
}
if (PQntuples(res) <= 0) {
ereport(ERROR, (errcode(ERRCODE_SYSTEM_ERROR),
errmsg("PQtuples num is invalid : %d", PQntuples(res))));
}
int num = atoi(PQgetvalue(res, 0, 0));
PQclear(res);
PQfinish(pgconn);
return num;
}
* GetTablesByDatabase()
*
* Get the name of tables in a database
*/
static void GetTablesByDatabase(const char* database_name, char* query, size_t size)
{
char conninfo[CHAR_BUF_SIZE];
PGconn* pgconn = NULL;
int ret;
PGresult* res = NULL;
ret = snprintf_s(conninfo,
sizeof(conninfo),
sizeof(conninfo) - 1,
"dbname=%s port=%d connect_timeout=60 options='-c xc_maintenance_mode=on'",
database_name,
g_instance.attr.attr_network.PostPortNumber);
securec_check_ss_c(ret, "\0", "\0");
pgconn = PQconnectdb(conninfo);
if (PQstatus(pgconn) != CONNECTION_OK) {
ereport(ERROR,
(errcode(ERRCODE_SYSTEM_ERROR), errmsg("Connection to database failed: %s", PQerrorMessage(pgconn))));
}
res = PQexec(pgconn, query);
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
ereport(ERROR,
(errcode(ERRCODE_SYSTEM_ERROR), errmsg("execute statement: %s failed: %s", query, PQerrorMessage(pgconn))));
}
if (PQntuples(res) == 0) {
ereport(ERROR,
(errcode(ERRCODE_SYSTEM_ERROR),
errmsg("fail to get tables in database %s for query remain table", database_name)));
} else {
int i_nspname = PQfnumber(res, "nspname");
int i_relname = PQfnumber(res, "relname");
for (int i = 0; i < PQntuples(res); i++) {
char* nspname = PQgetvalue(res, i, i_nspname);
char* relname = PQgetvalue(res, i, i_relname);
ereport(LOG, (errmsg("table:%s.%s remain in database %s", nspname, relname, database_name)));
}
}
PQclear(res);
PQfinish(pgconn);
}
char* PgxcGroupGetInRedistributionGroup()
{
char* group_name = NULL;
Relation pgxc_group_rel = NULL;
TableScanDesc scan;
HeapTuple tup = NULL;
Datum datum;
bool isNull = false;
if (!isRestoreMode && IS_PGXC_DATANODE) {
return NULL;
}
if (t_thrd.pgxc_cxt.current_redistribution_nodegroup) {
return t_thrd.pgxc_cxt.current_redistribution_nodegroup;
}
PgxcOpenGroupRelation(&pgxc_group_rel);
scan = tableam_scan_begin(pgxc_group_rel, SnapshotNow, 0, NULL);
while ((tup = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection)) != NULL) {
datum = heap_getattr(tup, Anum_pgxc_group_in_redistribution, RelationGetDescr(pgxc_group_rel), &isNull);
if ('y' == DatumGetChar(datum)) {
datum = heap_getattr(tup, Anum_pgxc_group_name, RelationGetDescr(pgxc_group_rel), &isNull);
Assert(!isNull);
group_name = (char*)pstrdup((const char*)DatumGetCString(datum));
break;
}
}
tableam_scan_end(scan);
heap_close(pgxc_group_rel, AccessShareLock);
if (group_name != NULL) {
set_current_redistribution_nodegroup(group_name);
pfree(group_name);
return t_thrd.pgxc_cxt.current_redistribution_nodegroup;
}
return NULL;
}
void PgxcOpenGroupRelation(Relation* pgxc_group_rel)
{
*pgxc_group_rel = heap_open(PgxcGroupRelationId, AccessShareLock);
if (!(*pgxc_group_rel)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_OPERATION), errmsg("can not open pgxc_group")));
}
}
List* GetNodeGroupOidCompute(Oid role_id)
{
List* objects = NIL;
Relation pgxc_group_rel = NULL;
TableScanDesc scan;
HeapTuple tup = NULL;
Oid group_oid;
pgxc_group_rel = heap_open(PgxcGroupRelationId, AccessShareLock);
if (!pgxc_group_rel) {
ereport(ERROR, (errcode(ERRCODE_INVALID_OPERATION), errmsg("can not open pgxc_group")));
}
scan = tableam_scan_begin(pgxc_group_rel, SnapshotNow, 0, NULL);
while ((tup = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection)) != NULL) {
group_oid = HeapTupleGetOid(tup);
if (OidIsValid(group_oid)) {
AclResult aclresult = pg_nodegroup_aclcheck(group_oid, role_id, ACL_COMPUTE | ACL_USAGE);
if (aclresult == ACLCHECK_OK) {
objects = lappend_oid(objects, group_oid);
}
}
}
tableam_scan_end(scan);
heap_close(pgxc_group_rel, AccessShareLock);
return objects;
}
uint2* GetBucketMapByGroupName(const char* groupname, int *bucketlen)
{
Relation rel;
HeapTuple htup;
bool isNull = false;
Datum groupbucket;
uint2* bktmap = (uint2*)palloc0(BUCKETDATALEN * sizeof(uint2));
rel = heap_open(PgxcGroupRelationId, AccessShareLock);
htup = SearchSysCache1(PGXCGROUPNAME, CStringGetDatum(groupname));
if (!HeapTupleIsValid(htup))
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("cache lookup failed for pgxc_group %s", groupname)));
groupbucket = heap_getattr(htup, Anum_pgxc_group_buckets, RelationGetDescr(rel), &isNull);
if (!isNull) {
text_to_bktmap((text*)groupbucket, bktmap, bucketlen);
}
ReleaseSysCache(htup);
heap_close(rel, AccessShareLock);
return bktmap;
}
void InitNodeGroupStatus(void)
{
* Skip node group status cache optimization when current backend thread not
* in normal states or in AutoVacuum process or during upgrade
*/
if (!IsNormalProcessingMode() || IsAutoVacuumLauncherProcess() || !OidIsValid(u_sess->proc_cxt.MyDatabaseId) ||
u_sess->attr.attr_common.IsInplaceUpgrade) {
return;
}
if (unlikely(t_thrd.pgxc_cxt.current_installation_nodegroup != NULL)) {
pfree_ext(t_thrd.pgxc_cxt.current_installation_nodegroup);
}
if (unlikely(t_thrd.pgxc_cxt.current_redistribution_nodegroup != NULL)) {
pfree_ext(t_thrd.pgxc_cxt.current_redistribution_nodegroup);
}
(void)PgxcGroupGetInstallationGroup();
(void)PgxcGroupGetInRedistributionGroup();
}
void CleanNodeGroupStatus(void)
{
* Skip node group status cache optimization when current backend thread not
* in normal states
*/
if (!IsNormalProcessingMode()) {
return;
}
if (t_thrd.pgxc_cxt.current_redistribution_nodegroup != NULL) {
pfree_ext(t_thrd.pgxc_cxt.current_redistribution_nodegroup);
}
if (t_thrd.pgxc_cxt.current_installation_nodegroup != NULL) {
pfree_ext(t_thrd.pgxc_cxt.current_installation_nodegroup);
}
}
* If we ues the grammar "create table to node", when we support
* multi-nodegroup, it must be converted to "create table to group",
* but we are not sure to find the group correct through "to node".
*
* In non-logic cluster mode, we use "to node" for create table to installatation
* group or redistribution group in restore mode, so we do this conversion here.
*
* In logic cluster mode, we use "to node" for create table to logic cluster
* node group in restore mode.
*/
char* DeduceGroupByNodeList(Oid* nodeoids, int numnodes)
{
Oid* members = NULL;
int nmembers = 0;
bool check_node = false;
char* installationgroup = PgxcGroupGetInstallationGroup();
char* redistributiongroup = PgxcGroupGetInRedistributionGroup();
char* group_name = NULL;
SortRelationDistributionNodes(nodeoids, numnodes);
if (installationgroup != NULL) {
nmembers = get_pgxc_groupmembers(get_pgxc_groupoid(installationgroup), &members);
SortRelationDistributionNodes(members, nmembers);
if (numnodes == nmembers) {
check_node = true;
for (int i = 0; i < nmembers; i++) {
if (nodeoids[i] != members[i]) {
check_node = false;
break;
}
}
}
if (check_node != false) {
group_name = installationgroup;
}
}
if (check_node == false && redistributiongroup != NULL) {
nmembers = get_pgxc_groupmembers(get_pgxc_groupoid(redistributiongroup), &members);
SortRelationDistributionNodes(members, nmembers);
if (numnodes == nmembers) {
check_node = true;
for (int i = 0; i < nmembers; i++) {
if (nodeoids[i] != members[i]) {
check_node = false;
break;
}
}
}
if (check_node != false) {
group_name = redistributiongroup;
}
}
if (check_node == false) {
Relation pgxc_group_rel = heap_open(PgxcGroupRelationId, AccessShareLock);
oidvector* gmember = NULL;
HeapTuple tup = NULL;
TableScanDesc scan = NULL;
scan = tableam_scan_begin(pgxc_group_rel, SnapshotNow, 0, NULL);
while ((tup = (HeapTuple) tableam_scan_getnexttuple(scan, ForwardScanDirection)) != NULL) {
bool need_free = false;
gmember = get_group_member_ref(tup, &need_free);
* should already be sorted by node_name, so sort is not needed.
*/
if (numnodes == gmember->dim1) {
for (int i = 0; i < numnodes; i++) {
if (nodeoids[i] != *(gmember->values + i)) {
continue;
}
}
check_node = true;
const char* gname = NameStr(((Form_pgxc_group)GETSTRUCT(tup))->group_name);
group_name = (char*)pstrdup(gname);
break;
}
if (need_free != false)
pfree_ext(gmember);
}
tableam_scan_end(scan);
heap_close(pgxc_group_rel, AccessShareLock);
}
if (check_node == false) {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("can not find node group by this node list.")));
}
Assert(group_name != NULL);
return group_name;
}
* PgxcGroupGetLogicClusterList
* get logic clusters from input accordingly.
*
* Parameters:
* @in nodeids: nodes for the logic clusters
*
* Return logic cluster list.
*/
List* PgxcGroupGetLogicClusterList(Bitmapset* nodeids)
{
List* objects = NIL;
Bitmapset* left_nodeids = nodeids;
Oid installation_group = InvalidOid;
Relation pgxc_group_rel = NULL;
SysScanDesc scan;
ScanKeyData skey[1];
HeapTuple tup = NULL;
Oid my_group = InvalidOid;
bool use_my_group = false;
if (bms_is_empty(left_nodeids)) {
ereport(DEBUG2, (errmsg("CalculateQueryMem: empty nodeids")));
bms_free(left_nodeids);
return NIL;
}
my_group = get_pgxc_logic_groupoid(GetCurrentUserId());
if (!OidIsValid(my_group))
return NIL;
installation_group = ng_get_installation_group_oid();
pgxc_group_rel = heap_open(PgxcGroupRelationId, AccessShareLock);
if (!pgxc_group_rel) {
ereport(ERROR, (errcode(ERRCODE_INVALID_OPERATION), errmsg("can not open pgxc_group")));
}
ScanKeyInit(&skey[0], ObjectIdAttributeNumber, BTGreaterStrategyNumber, F_OIDGT, ObjectIdGetDatum(0));
scan = systable_beginscan(pgxc_group_rel, PgxcGroupOidIndexId, true, NULL, 1, skey);
while ((tup = systable_getnext(scan)) != NULL) {
Oid group_oid = HeapTupleGetOid(tup);
if (installation_group != group_oid && OidIsValid(group_oid)) {
Bitmapset* this_nodeids = NULL;
int nmembers;
Oid* nodes = NULL;
oidvector* group_members_raw = NULL;
bool need_free = false;
group_members_raw = get_group_member_ref(tup, &need_free);
nodes = (Oid*)group_members_raw->values;
nmembers = group_members_raw->dim1;
for (int i = 0; i < nmembers; i++) {
int nodeId = PGXCNodeGetNodeId(nodes[i], PGXC_NODE_DATANODE);
if (nodeId < 0)
continue;
this_nodeids = bms_add_member(this_nodeids, nodeId);
}
if (need_free)
pfree_ext(group_members_raw);
Bitmapset* inter = bms_intersect(this_nodeids, left_nodeids);
if (!bms_is_empty(inter)) {
Bitmapset* tmp_nodeids = bms_difference(left_nodeids, this_nodeids);
bms_free(left_nodeids);
left_nodeids = tmp_nodeids;
Distribution* dist = NewDistribution();
dist->group_oid = group_oid;
dist->bms_data_nodeids = this_nodeids;
objects = lappend(objects, dist);
ereport(DEBUG2, (errmsg("CalculateQueryMem: query contains nodegroup %d", group_oid)));
if (group_oid == my_group)
use_my_group = true;
}
bms_free(inter);
if (bms_is_empty(left_nodeids))
break;
}
}
systable_endscan(scan);
heap_close(pgxc_group_rel, AccessShareLock);
if (!use_my_group) {
Distribution* dist = ng_get_group_distribution(my_group);
objects = lappend(objects, dist);
}
Assert(bms_is_empty(left_nodeids));
bms_free(left_nodeids);
return objects;
}
* gs_get_nodegroup_tablecount
* marked unsupported for centralized
*
* Parameters:
* @in node group name
*
* Return total table numbers.
*/
Datum gs_get_nodegroup_tablecount(PG_FUNCTION_ARGS)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
(errmsg("Unsupport feature"),
errdetail("gs_get_nodegroup_tablecount is not supported for centralize deployment"),
errcause("The function is not implemented."),
erraction("Do not use this function with centralize deployment."))));
PG_RETURN_INT32(0);
}