* Copyright (c) Huawei Technologies Co., Ltd. 2020-2020. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*/
#include "utils/knl_globalsystupcache.h"
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/genam.h"
#include "access/hash.h"
#include "access/heapam.h"
#include "access/relscan.h"
#include "access/sysattr.h"
#include "access/transam.h"
#include "access/tuptoaster.h"
#include "access/valid.h"
#include "catalog/gs_obsscaninfo.h"
#include "catalog/gs_policy_label.h"
#include "catalog/indexing.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "catalog/pg_attribute.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_amop.h"
#include "catalog/pg_amproc.h"
#include "catalog/pg_auth_members.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_obsscaninfo.h"
#include "catalog/pg_cast.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_conversion.h"
#include "catalog/pg_database.h"
#include "catalog/pg_db_role_setting.h"
#include "catalog/pg_default_acl.h"
#include "catalog/pg_depend.h"
#include "catalog/pg_description.h"
#include "catalog/pg_directory.h"
#include "catalog/pg_enum.h"
#include "catalog/pg_foreign_data_wrapper.h"
#include "catalog/pg_foreign_server.h"
#include "catalog/pg_foreign_table.h"
#include "catalog/pg_job.h"
#include "catalog/pg_job_proc.h"
#include "catalog/pg_language.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_object.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_opfamily.h"
#include "catalog/pg_partition.h"
#include "catalog/pg_partition_fn.h"
#include "catalog/pg_hashbucket.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_proc_fn.h"
#include "catalog/pg_range.h"
#include "catalog/pg_rewrite.h"
#include "catalog/pg_seclabel.h"
#include "catalog/pg_shseclabel.h"
#include "catalog/pg_shdescription.h"
#include "catalog/pg_shdepend.h"
#include "catalog/pg_statistic.h"
#include "catalog/pg_statistic_ext.h"
#include "catalog/pg_synonym.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_ts_config.h"
#include "catalog/pg_ts_config_map.h"
#include "catalog/pg_ts_dict.h"
#include "catalog/pg_ts_parser.h"
#include "catalog/pg_ts_template.h"
#include "catalog/pg_type.h"
#include "catalog/pg_user_mapping.h"
#include "catalog/pg_user_status.h"
#include "catalog/pg_extension_data_source.h"
#include "catalog/pg_streaming_stream.h"
#include "catalog/pg_streaming_cont_query.h"
#include "catalog/heap.h"
#include "executor/executor.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "parser/parse_relation.h"
#include "parser/parse_type.h"
#include "pgstat.h"
#ifdef ENABLE_MULTIPLE_NODES
#include "catalog/pgxc_class.h"
#include "catalog/pgxc_node.h"
#include "catalog/pgxc_group.h"
#include "catalog/pg_resource_pool.h"
#include "catalog/pg_workload_group.h"
#include "catalog/pg_app_workloadgroup_mapping.h"
#endif
#ifdef CatCache_STATS
#include "storage/ipc.h"
#endif
#include "storage/lmgr.h"
#include "storage/sinvaladt.h"
#include "utils/acl.h"
#include "utils/atomic.h"
#include "utils/datum.h"
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/extended_statistics.h"
#include "utils/fmgroids.h"
#include "utils/fmgrtab.h"
#include "utils/hashutils.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/rel_gs.h"
#include "utils/relcache.h"
#include "utils/resowner.h"
#include "utils/syscache.h"
#include "utils/snapmgr.h"
#include "utils/knl_relcache.h"
#include "utils/knl_catcache.h"
#include "utils/knl_globaltabdefcache.h"
#include "utils/sec_rls_utils.h"
void GlobalCatCTup::Release()
{
Assert(ct_magic == CT_MAGIC);
Assert(refcount > 0);
my_cache->ReleaseGlobalCatCTup(this);
}
void GlobalCatCList::Release()
{
Assert(cl_magic == CL_MAGIC);
Assert(refcount > 0);
my_cache->ReleaseGlobalCatCList(this);
}
void GlobalSysTupCache::ReleaseGlobalCatCTup(GlobalCatCTup *ct)
{
if (unlikely(!ct->canInsertGSC)) {
GlobalCatCTup::Free(ct);
return;
}
(void)pg_atomic_fetch_sub_u64(&ct->refcount, 1);
}
void GlobalSysTupCache::ReleaseGlobalCatCList(GlobalCatCList *cl)
{
if (unlikely(!cl->canInsertGSC)) {
GlobalCatCList::Free(cl);
return;
}
(void)pg_atomic_fetch_sub_u64(&cl->refcount, 1);
}
static uint64 GetClEstimateSize(GlobalCatCList *cl)
{
uint64 cl_size = GetMemoryChunkSpace(cl) +
cl->nkeys * (NAMEDATALEN + CHUNK_ALGIN_PAD);
return cl_size;
}
static uint64 GetCtEstimateSize(GlobalCatCTup *ct)
{
uint64 ct_size = GetMemoryChunkSpace(ct);
return ct_size;
}
void GlobalSysTupCache::AddHeadToCCList(GlobalCatCList *cl)
{
uint64 cl_size = GetClEstimateSize(cl);
m_dbEntry->MemoryEstimateAdd(cl_size);
pg_atomic_fetch_add_u64(m_tup_space, cl_size);
DLAddHead(&cc_lists, &cl->cache_elem);
}
static void free_dead_cl_internal(GlobalCatCList *cl, volatile uint64 *tup_space, GlobalSysDBCacheEntry *entry)
{
uint64 cl_size = GetClEstimateSize(cl);
entry->MemoryEstimateSub(cl_size);
pg_atomic_fetch_sub_u64(tup_space, cl_size);
GlobalCatCList::Free(cl);
}
void GlobalSysTupCache::AddHeadToBucket(Index hash_index, GlobalCatCTup *ct)
{
uint64 ct_size = GetCtEstimateSize(ct);
pg_atomic_fetch_add_u64(m_tup_space, ct_size);
m_dbEntry->MemoryEstimateAdd(ct_size);
pg_atomic_fetch_add_u64(m_tup_count, 1);
DLAddHead(&cc_buckets[hash_index], &ct->cache_elem);
}
static void free_dead_ct_internal(GlobalCatCTup *ct, volatile uint64 *tup_space, GlobalSysDBCacheEntry *entry)
{
uint64 ct_size = GetCtEstimateSize(ct);
pg_atomic_fetch_sub_u64(tup_space, ct_size);
entry->MemoryEstimateSub(ct_size);
GlobalCatCTup::Free(ct);
}
void GlobalSysTupCache::HandleDeadGlobalCatCTup(GlobalCatCTup *ct)
{
pg_atomic_fetch_sub_u64(m_tup_count, 1);
DLRemove(&ct->cache_elem);
ct->dead = true;
if (ct->refcount == 0) {
free_dead_ct_internal(ct, m_tup_space, m_dbEntry);
} else {
m_dead_cts.AddTail(&ct->cache_elem);
}
}
void GlobalSysTupCache::FreeDeadCts()
{
while (m_dead_cts.GetLength() > 0) {
Dlelem *elt = m_dead_cts.RemoveHead();
if (elt == NULL) {
break;
}
GlobalCatCTup *ct = (GlobalCatCTup *)DLE_VAL(elt);
if (ct->refcount != 0) {
m_dead_cts.AddTail(&ct->cache_elem);
break;
} else {
free_dead_ct_internal(ct, m_tup_space, m_dbEntry);
}
}
}
void GlobalSysTupCache::RemoveTailTupleElements(Index hash_index)
{
ResourceOwnerEnlargeGlobalIsExclusive(LOCAL_SYSDB_RESOWNER);
if (!atomic_compare_exchange_u32(&m_is_tup_swappingouts[hash_index], 0, 1)) {
return;
}
ResourceOwnerRememberGlobalIsExclusive(LOCAL_SYSDB_RESOWNER, &m_is_tup_swappingouts[hash_index]);
bool listBelowThreshold = GetBucket(hash_index)->dll_len < MAX_GSC_LIST_LENGTH;
uint64 swapout_count_once = 0;
PthreadRWlockWrlock(LOCAL_SYSDB_RESOWNER, &m_bucket_rw_locks[hash_index]);
uint64 max_swapout_count_once = GetSwapOutNum(listBelowThreshold, GetBucket(hash_index)->dll_len);
for (Dlelem *elt = DLGetTail(GetBucket(hash_index)); elt != NULL;) {
GlobalCatCTup *ct = (GlobalCatCTup *)DLE_VAL(elt);
elt = DLGetPred(elt);
if (ct->refcount != 0) {
* whatever, another thread does swappout again */
DLMoveToFront(&ct->cache_elem);
break;
}
HandleDeadGlobalCatCTup(ct);
swapout_count_once++;
if (swapout_count_once == max_swapout_count_once) {
break;
}
}
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, &m_bucket_rw_locks[hash_index]);
Assert(m_is_tup_swappingouts[hash_index] == 1);
atomic_compare_exchange_u32(&m_is_tup_swappingouts[hash_index], 1, 0);
ResourceOwnerForgetGlobalIsExclusive(LOCAL_SYSDB_RESOWNER, &m_is_tup_swappingouts[hash_index]);
}
void GlobalCatCList::Free(GlobalCatCList *cl)
{
Assert(cl->refcount == 0 || !cl->canInsertGSC);
for (int i = 0; i < cl->n_members; i++) {
cl->members[i]->Release();
}
CatCacheFreeKeys(cl->my_cache->GetCCTupleDesc(), cl->nkeys, cl->my_cache->GetCCKeyno(), cl->keys);
pfree_ext(cl);
}
void GlobalSysTupCache::HandleDeadGlobalCatCList(GlobalCatCList *cl)
{
DLRemove(&cl->cache_elem);
if (cl->refcount == 0) {
free_dead_cl_internal(cl, m_tup_space, m_dbEntry);
} else {
m_dead_cls.AddTail(&cl->cache_elem);
}
}
void GlobalSysTupCache::FreeDeadCls()
{
while (m_dead_cls.GetLength() > 0) {
Dlelem *elt = m_dead_cls.RemoveHead();
if (elt == NULL) {
break;
}
GlobalCatCList *cl = (GlobalCatCList *)DLE_VAL(elt);
if (cl->refcount != 0) {
m_dead_cls.AddTail(&cl->cache_elem);
break;
} else {
free_dead_cl_internal(cl, m_tup_space, m_dbEntry);
}
}
FreeDeadCts();
}
void GlobalSysTupCache::RemoveTailListElements()
{
ResourceOwnerEnlargeGlobalIsExclusive(LOCAL_SYSDB_RESOWNER);
if (!atomic_compare_exchange_u32(&m_is_list_swappingout, 0, 1)) {
return;
}
ResourceOwnerRememberGlobalIsExclusive(LOCAL_SYSDB_RESOWNER, &m_is_list_swappingout);
bool listBelowThreshold = cc_lists.dll_len < MAX_GSC_LIST_LENGTH;
uint64 swapout_count = 0;
PthreadRWlockWrlock(LOCAL_SYSDB_RESOWNER, m_list_rw_lock);
uint64 max_swapout_count_once = GetSwapOutNum(listBelowThreshold, cc_lists.dll_len);
for (Dlelem *elt = DLGetTail(&cc_lists); elt != NULL;) {
GlobalCatCList *cl = (GlobalCatCList *)DLE_VAL(elt);
elt = DLGetPred(elt);
if (cl->refcount != 0) {
DLMoveToFront(&cl->cache_elem);
* whatever, another thread does swappout again */
break;
}
HandleDeadGlobalCatCList(cl);
swapout_count++;
if (swapout_count > max_swapout_count_once) {
break;
}
}
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, m_list_rw_lock);
Assert(m_is_list_swappingout == 1);
atomic_compare_exchange_u32(&m_is_list_swappingout, 1, 0);
ResourceOwnerForgetGlobalIsExclusive(LOCAL_SYSDB_RESOWNER, &m_is_list_swappingout);
}
void GlobalSysTupCache::RemoveAllTailElements()
{
if (!m_isInited) {
return;
}
RemoveTailListElements();
#ifndef ENABLE_LITE_MODE
if (g_instance.global_sysdbcache.MemoryUnderControl()) {
return;
}
#endif
for (int hash_index = 0; hash_index < cc_nbuckets; hash_index++) {
RemoveTailTupleElements(hash_index);
#ifndef ENABLE_LITE_MODE
if (g_instance.global_sysdbcache.MemoryUnderControl()) {
break;
}
#endif
}
}
GlobalSysTupCache::GlobalSysTupCache(Oid dbOid, int cache_id, bool isShared, GlobalSysDBCacheEntry *entry)
{
m_isInited = false;
m_searches = &entry->m_dbstat->tup_searches;
m_hits = &entry->m_dbstat->tup_hits;
m_newloads = &entry->m_dbstat->tup_newloads;
m_dbEntry = entry;
m_dbOid = dbOid;
m_cache_id = cache_id;
m_bucket_rw_locks = NULL;
m_list_rw_lock = NULL;
m_concurrent_lock = NULL;
cc_id = -1;
cc_nbuckets = -1;
m_relinfo.cc_tupdesc = NULL;
m_relinfo.cc_relname = NULL;
m_relinfo.cc_tupdesc = NULL;
m_relinfo.cc_reloid = -1;
m_relinfo.cc_indexoid = -1;
m_relinfo.cc_nkeys = -1;
m_relinfo.cc_relisshared = isShared;
Assert((isShared && m_dbOid == InvalidOid) || (!isShared && m_dbOid != InvalidOid));
for (int i = 0; i < CATCACHE_MAXKEYS; i++) {
m_relinfo.cc_keyno[i] = -1;
errno_t rc = memset_s(&cc_skey[i], sizeof(ScanKeyData), 0, sizeof(ScanKeyData));
securec_check(rc, "", "");
m_relinfo.cc_hashfunc[i] = NULL;
m_relinfo.cc_fastequal[i] = NULL;
}
cc_buckets = NULL;
m_is_tup_swappingouts = NULL;
m_is_list_swappingout = 0;
enable_rls = false;
}
void GlobalSysTupCache::SetStatInfoPtr(volatile uint64 *tup_count, volatile uint64 *tup_space)
{
m_tup_count = tup_count;
m_tup_space = tup_space;
}
GlobalCatCTup *GlobalSysTupCache::FindSearchKeyTupleFromCache(InsertCatTupInfo *tup_info, int *location)
{
uint32 hash_value = tup_info->hash_value;
Index hash_index = tup_info->hash_index;
Datum *arguments = tup_info->arguments;
int index = 0;
for (Dlelem *elt = DLGetHead(GetBucket(hash_index)); elt != NULL; elt = DLGetSucc(elt)) {
index++;
GlobalCatCTup *ct = (GlobalCatCTup *)DLE_VAL(elt);
if (ct->hash_value != hash_value) {
continue;
}
if (!CatalogCacheCompareTuple(m_relinfo.cc_fastequal, m_relinfo.cc_nkeys, ct->keys, arguments)) {
continue;
}
if (unlikely(u_sess->attr.attr_common.IsInplaceUpgrade) && ct->tuple.t_self.ip_posid == 0) {
Assert(ct->tuple.t_tableOid == InvalidOid);
continue;
}
pg_atomic_fetch_add_u64(&ct->refcount, 1);
*location = index;
return ct;
}
return NULL;
}
GlobalCatCTup *GlobalSysTupCache::FindScanKeyTupleFromCache(InsertCatTupInfo *tup_info)
{
uint32 hash_value = tup_info->hash_value;
Index hash_index = tup_info->hash_index;
Datum *arguments = tup_info->arguments;
for (Dlelem *elt = DLGetHead(GetBucket(hash_index)); elt != NULL; elt = DLGetSucc(elt)) {
GlobalCatCTup *ct = (GlobalCatCTup *)DLE_VAL(elt);
if (ct->hash_value != hash_value) {
continue;
}
if (!CatalogCacheCompareTuple(m_relinfo.cc_fastequal, m_relinfo.cc_nkeys, ct->keys, arguments)) {
continue;
}
if (unlikely(u_sess->attr.attr_common.IsInplaceUpgrade) && ct->tuple.t_self.ip_posid == 0) {
Assert(ct->tuple.t_tableOid == InvalidOid);
continue;
}
pg_atomic_fetch_add_u64(&ct->refcount, 1);
return ct;
}
return NULL;
}
GlobalCatCTup *GlobalSysTupCache::FindHashTupleFromCache(InsertCatTupInfo *tup_info)
{
uint32 hash_value = tup_info->hash_value;
Index hash_index = tup_info->hash_index;
for (Dlelem *elt = DLGetHead(GetBucket(hash_index)); elt != NULL; elt = DLGetSucc(elt)) {
GlobalCatCTup *ct = (GlobalCatCTup *)DLE_VAL(elt);
if (ct->hash_value != hash_value) {
continue;
}
pg_atomic_fetch_add_u64(&ct->refcount, 1);
return ct;
}
return NULL;
}
GlobalCatCTup *GlobalSysTupCache::FindPgAttrTupleFromCache(InsertCatTupInfo *tup_info)
{
Index hash_index = tup_info->hash_index;
int16 attnum = tup_info->attnum;
CatalogRelationBuildParam *catalogDesc = tup_info->catalogDesc;
for (Dlelem *elt = DLGetHead(GetBucket(hash_index)); elt != NULL; elt = DLGetSucc(elt)) {
GlobalCatCTup *ct = (GlobalCatCTup *)DLE_VAL(elt);
bool attnumIsNull = false;
int curAttnum = DatumGetInt16(SysCacheGetAttr(cc_id, &ct->tuple, Anum_pg_attribute_attnum, &attnumIsNull));
if ((attnum < catalogDesc->natts && curAttnum != attnum) ||
(attnum >= catalogDesc->natts && curAttnum != -(attnum - catalogDesc->natts + 1))) {
continue;
}
pg_atomic_fetch_add_u64(&ct->refcount, 1);
return ct;
}
return NULL;
}
GlobalCatCTup *GlobalSysTupCache::FindSameTupleFromCache(InsertCatTupInfo *tup_info)
{
uint32 hash_value = tup_info->hash_value;
Index hash_index = tup_info->hash_index;
HeapTuple ntp = tup_info->ntp;
for (Dlelem *elt = DLGetHead(GetBucket(hash_index)); elt != NULL; elt = DLGetSucc(elt)) {
GlobalCatCTup *ct = (GlobalCatCTup *)DLE_VAL(elt);
if (ct->hash_value != hash_value) {
continue;
}
if (IsProcCache(m_relinfo.cc_reloid) && IsSystemObjOid(HeapTupleGetOid(&(ct->tuple))) &&
likely(u_sess->attr.attr_common.IsInplaceUpgrade == false)) {
continue;
}
if (IsAttributeCache(m_relinfo.cc_reloid)) {
bool attIsNull = false;
Oid attrelid =
DatumGetObjectId(SysCacheGetAttr(cc_id, &(ct->tuple), Anum_pg_attribute_attrelid, &attIsNull));
if (IsSystemObjOid(attrelid) && IsValidCatalogParam(GetCatalogParam(attrelid))) {
continue;
}
}
if (!ItemPointerEqualsNoCheck(&(ct->tuple.t_self), &(ntp->t_self))) {
continue;
}
pg_atomic_fetch_add_u64(&ct->refcount, 1);
return ct;
}
return NULL;
}
* insert a new tuple into globalcatcache, if there is already one, do nothing
*/
GlobalCatCTup *GlobalSysTupCache::InsertHeapTupleIntoGlobalCatCache(InsertCatTupInfo *tup_info)
{
MemoryContext oldcxt = MemoryContextSwitchTo(m_dbEntry->GetRandomMemCxt());
GlobalCatCTup *new_ct = (GlobalCatCTup *)palloc(sizeof(GlobalCatCTup) + MAXIMUM_ALIGNOF + tup_info->ntp->t_len);
MemoryContextSwitchTo(oldcxt);
new_ct->ct_magic = CT_MAGIC;
new_ct->my_cache = this;
new_ct->dead = false;
new_ct->hash_value = tup_info->hash_value;
DLInitElem(&new_ct->cache_elem, (void *)new_ct);
CopyTupleIntoGlobalCatCTup(new_ct, tup_info->ntp);
new_ct->refcount = 1;
new_ct->canInsertGSC = true;
pthread_rwlock_t *bucket_lock = &m_bucket_rw_locks[tup_info->hash_index];
PthreadRWlockWrlock(LOCAL_SYSDB_RESOWNER, bucket_lock);
GlobalCatCTup *ct = FindTupleFromCache(tup_info);
if (unlikely(ct != NULL)) {
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, bucket_lock);
GlobalCatCTup::Free(new_ct);
return ct;
}
AddHeadToBucket(tup_info->hash_index, new_ct);
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, bucket_lock);
return new_ct;
}
static inline MemoryContext GetLSCMemcxt(int cache_id)
{
return t_thrd.lsc_cxt.lsc->systabcache.GetLocalTupCacheMemoryCxt(cache_id);
}
* insert committed tuple into globalcatcache, uncommitted tuple into localcatcache
*/
GlobalCatCTup *GlobalSysTupCache::InsertHeapTupleIntoLocalCatCache(InsertCatTupInfo *tup_info)
{
MemoryContext oldcxt =
MemoryContextSwitchTo(GetLSCMemcxt(m_cache_id));
GlobalCatCTup *new_ct = (GlobalCatCTup *)palloc(sizeof(GlobalCatCTup) + MAXIMUM_ALIGNOF + tup_info->ntp->t_len);
MemoryContextSwitchTo(oldcxt);
new_ct->ct_magic = CT_MAGIC;
new_ct->my_cache = this;
new_ct->dead = false;
new_ct->hash_value = tup_info->hash_value;
DLInitElem(&new_ct->cache_elem, (void *)new_ct);
CopyTupleIntoGlobalCatCTup(new_ct, tup_info->ntp);
new_ct->refcount = 1;
new_ct->canInsertGSC = false;
return new_ct;
}
* insert tuple into globalcatcache memory, others into localcatcache memory
*/
GlobalCatCTup *GlobalSysTupCache::InsertHeapTupleIntoCatCacheInSingle(InsertCatTupInfo *tup_info)
{
HeapTuple ntp = tup_info->ntp;
if (HeapTupleHasExternal(ntp)) {
tup_info->ntp = toast_flatten_tuple(ntp, m_relinfo.cc_tupdesc);
Assert(tup_info->hash_value ==
CatalogCacheComputeTupleHashValue(cc_id, m_relinfo.cc_keyno, m_relinfo.cc_tupdesc, m_relinfo.cc_hashfunc,
m_relinfo.cc_reloid, m_relinfo.cc_nkeys, tup_info->ntp));
}
GlobalCatCTup *ct;
if (tup_info->canInsertGSC) {
ct = InsertHeapTupleIntoGlobalCatCache(tup_info);
} else {
ct = InsertHeapTupleIntoLocalCatCache(tup_info);
}
if (tup_info->ntp != ntp) {
heap_freetuple_ext(tup_info->ntp);
tup_info->ntp = ntp;
}
return ct;
}
* insert tuple into globalcatcache memory, others into localcatcache memory
* should call only by searchsyscachelist
* we do this check because when accept inval msg, we clear cc_list, even they are valid
*/
GlobalCatCTup *GlobalSysTupCache::InsertHeapTupleIntoCatCacheInList(InsertCatTupInfo *tup_info)
{
HeapTuple ntp = tup_info->ntp;
if (HeapTupleHasExternal(ntp)) {
tup_info->ntp = toast_flatten_tuple(ntp, m_relinfo.cc_tupdesc);
Assert(tup_info->hash_value ==
CatalogCacheComputeTupleHashValue(cc_id, m_relinfo.cc_keyno, m_relinfo.cc_tupdesc, m_relinfo.cc_hashfunc,
m_relinfo.cc_reloid, m_relinfo.cc_nkeys, tup_info->ntp));
}
GlobalCatCTup *ct = NULL;
if (tup_info->canInsertGSC) {
pthread_rwlock_t *bucket_lock = &m_bucket_rw_locks[tup_info->hash_index];
PthreadRWlockRdlock(LOCAL_SYSDB_RESOWNER, bucket_lock);
ct = FindTupleFromCache(tup_info);
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, bucket_lock);
if (unlikely(ct == NULL)) {
ct = InsertHeapTupleIntoGlobalCatCache(tup_info);
}
} else {
ct = InsertHeapTupleIntoLocalCatCache(tup_info);
}
if (tup_info->ntp != ntp) {
heap_freetuple_ext(tup_info->ntp);
tup_info->ntp = ntp;
}
return ct;
}
GlobalCatCTup *GlobalSysTupCache::SearchMissFromProcAndAttribute(InsertCatTupInfo *tup_info)
{
Datum *arguments = tup_info->arguments;
if (IsProcCache(m_relinfo.cc_reloid) && likely(u_sess->attr.attr_common.IsInplaceUpgrade == false)) {
CACHE2_elog(DEBUG2, "SearchGlobalCatCacheMiss(%d): function not found in pg_proc", cc_id);
HeapTuple ntp = SearchBuiltinProcCacheMiss(cc_id, m_relinfo.cc_nkeys, arguments);
if (HeapTupleIsValid(ntp)) {
CACHE2_elog(DEBUG2, "SearchGlobalCatCacheMiss(%d): match a built-in function", cc_id);
tup_info->ntp = ntp;
Assert(ntp->t_tableOid == InvalidOid);
tup_info->canInsertGSC = true;
GlobalCatCTup *ct = InsertHeapTupleIntoCatCacheInSingle(tup_info);
heap_freetuple(ntp);
return ct;
}
}
if (IsAttributeCache(m_relinfo.cc_reloid) && IsSystemObjOid(DatumGetObjectId(arguments[0]))) {
CACHE2_elog(DEBUG2, "SearchGlobalCatCacheMiss: cat tuple not in cat cache %d", cc_id);
HeapTuple ntp = SearchPgAttributeCacheMiss(cc_id, m_relinfo.cc_tupdesc, m_relinfo.cc_nkeys, arguments);
if (HeapTupleIsValid(ntp)) {
tup_info->ntp = ntp;
tup_info->canInsertGSC = true;
GlobalCatCTup *ct = InsertHeapTupleIntoCatCacheInSingle(tup_info);
heap_freetuple(ntp);
return ct;
}
}
return NULL;
}
void AcquireGSCTableReadLock(bool *has_concurrent_lock, pthread_rwlock_t *concurrent_lock)
{
int cur_index = t_thrd.lsc_cxt.lsc->rdlock_info.count;
if (unlikely(cur_index == MAX_GSC_READLOCK_COUNT) ||
PthreadRWlockTryRdlock(LOCAL_SYSDB_RESOWNER, concurrent_lock) != 0) {
*has_concurrent_lock = false;
return;
}
*has_concurrent_lock = true;
t_thrd.lsc_cxt.lsc->rdlock_info.concurrent_lock[cur_index] = concurrent_lock;
t_thrd.lsc_cxt.lsc->rdlock_info.has_concurrent_lock[cur_index] = has_concurrent_lock;
t_thrd.lsc_cxt.lsc->rdlock_info.count++;
}
void ReleaseGSCTableReadLock(bool *has_concurrent_lock, pthread_rwlock_t *concurrent_lock)
{
Assert(*has_concurrent_lock);
*has_concurrent_lock = false;
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, concurrent_lock);
Assert(t_thrd.lsc_cxt.lsc->rdlock_info.concurrent_lock[t_thrd.lsc_cxt.lsc->rdlock_info.count - 1] ==
concurrent_lock);
Assert(t_thrd.lsc_cxt.lsc->rdlock_info.has_concurrent_lock[t_thrd.lsc_cxt.lsc->rdlock_info.count - 1] ==
has_concurrent_lock);
t_thrd.lsc_cxt.lsc->rdlock_info.count--;
}
GlobalCatCTup *GlobalSysTupCache::SearchTupleFromFile(uint32 hash_value, Datum *arguments, bool is_exclusive)
{
InsertCatTupInfo tup_info;
tup_info.find_type = SCAN_TUPLE_SKIP;
tup_info.arguments = arguments;
tup_info.hash_value = hash_value;
tup_info.hash_index = HASH_INDEX(hash_value, (uint32)cc_nbuckets);
tup_info.has_concurrent_lock = !is_exclusive;
tup_info.is_exclusive = is_exclusive;
Assert(is_exclusive);
GlobalCatCTup *ct = SearchTupleMiss(&tup_info);
if (ct != NULL) {
pg_atomic_fetch_add_u64(m_newloads, 1);
}
return ct;
}
GlobalCatCTup *GlobalSysTupCache::SearchTupleMiss(InsertCatTupInfo *tup_info)
{
GlobalCatCTup *ct = SearchMissFromProcAndAttribute(tup_info);
if (ct != NULL) {
return ct;
}
Relation relation = heap_open(m_relinfo.cc_reloid, AccessShareLock);
ereport(DEBUG1, (errmsg("cache->cc_reloid - %d", m_relinfo.cc_reloid)));
* Ok, need to make a lookup in the relation, copy the scankey and fill
* out any per-call fields.
*/
ScanKeyData cur_skey[CATCACHE_MAXKEYS];
errno_t rc = memcpy_s(cur_skey, sizeof(ScanKeyData) * CATCACHE_MAXKEYS, cc_skey,
sizeof(ScanKeyData) * m_relinfo.cc_nkeys);
securec_check(rc, "", "");
Datum *arguments = tup_info->arguments;
cur_skey[0].sk_argument = arguments[0];
cur_skey[1].sk_argument = arguments[1];
cur_skey[2].sk_argument = arguments[2];
cur_skey[3].sk_argument = arguments[3];
HeapTuple ntp;
SysScanDesc scandesc = NULL;
if (u_sess->hook_cxt.pluginSearchCatHook != NULL) {
ntp = ((searchCatFunc)(u_sess->hook_cxt.pluginSearchCatHook))(relation,
m_relinfo.cc_indexoid, cc_id, m_relinfo.cc_nkeys, cur_skey, &scandesc);
} else {
scandesc =
systable_beginscan(relation, m_relinfo.cc_indexoid, IndexScanOK(cc_id), NULL,
m_relinfo.cc_nkeys, cur_skey);
}
if (tup_info->has_concurrent_lock) {
AcquireGSCTableReadLock(&tup_info->has_concurrent_lock, m_concurrent_lock);
}
if (u_sess->hook_cxt.pluginSearchCatHook != NULL) {
if (HeapTupleIsValid(ntp)) {
tup_info->ntp = ntp;
if (!tup_info->has_concurrent_lock) {
tup_info->canInsertGSC = false;
} else {
tup_info->canInsertGSC = CanTupleInsertGSC(ntp);
if (!tup_info->canInsertGSC) {
ReleaseGSCTableReadLock(&tup_info->has_concurrent_lock, m_concurrent_lock);
}
}
ct = InsertHeapTupleIntoCatCacheInSingle(tup_info);
}
} else {
while (HeapTupleIsValid(ntp = systable_getnext(scandesc))) {
tup_info->ntp = ntp;
if (!tup_info->has_concurrent_lock) {
tup_info->canInsertGSC = false;
} else {
tup_info->canInsertGSC = CanTupleInsertGSC(ntp);
if (!tup_info->canInsertGSC) {
ReleaseGSCTableReadLock(&tup_info->has_concurrent_lock, m_concurrent_lock);
}
}
ct = InsertHeapTupleIntoCatCacheInSingle(tup_info);
break;
}
}
if (tup_info->has_concurrent_lock) {
ReleaseGSCTableReadLock(&tup_info->has_concurrent_lock, m_concurrent_lock);
}
systable_endscan(scandesc);
heap_close(relation, AccessShareLock);
#ifdef USE_SPQ
if ((cc_id == ATTNUM && DatumGetInt16(arguments[1]) == RootSelfItemPointerAttributeNumber) ||
(cc_id == ATTNAME && (strcmp(DatumGetCString(arguments[1]), "_root_ctid") == 0))) {
Form_pg_attribute tmp_attribute;
int attno = DatumGetInt16(arguments[1]);
Assert(attno > FirstLowInvalidHeapAttributeNumber && attno < 0);
tmp_attribute = SystemAttributeDefinition(attno, false, false, false);
relation = heap_open(m_relinfo.cc_reloid, AccessShareLock);
ntp = heaptuple_from_pg_attribute(relation, tmp_attribute);
heap_close(relation, AccessShareLock);
tup_info->ntp = ntp;
if (!tup_info->has_concurrent_lock) {
tup_info->canInsertGSC = false;
} else {
tup_info->canInsertGSC = CanTupleInsertGSC(ntp);
if (!tup_info->canInsertGSC) {
ReleaseGSCTableReadLock(&tup_info->has_concurrent_lock, m_concurrent_lock);
}
}
ct = InsertHeapTupleIntoCatCacheInSingle(tup_info);
if (tup_info->has_concurrent_lock) {
ReleaseGSCTableReadLock(&tup_info->has_concurrent_lock, m_concurrent_lock);
}
}
#endif
* global catcache match disk , not need negative tuple
*/
return ct;
}
* SearchTupleInternal
*
* This call searches a system cache for a tuple, opening the relation
* if necessary (on the first access to a particular cache).
*
* The result is NULL if not found, or a pointer to a HeapTuple in
* the cache. The caller must not modify the tuple, and must call
* Release() when done with it.
*
* The search key values should be expressed as Datums of the key columns'
* datatype(s). (Pass zeroes for any unused parameters.) As a special
* exception, the passed-in key for a NAME column can be just a C string;
* the caller need not go to the trouble of converting it to a fully
* null-padded NAME.
*/
* Work-horse for SearchGlobalCatCache/SearchGlobalCatCacheN.
*/
GlobalCatCTup *GlobalSysTupCache::SearchTupleInternal(uint32 hash_value, Datum *arguments)
{
FreeDeadCts();
pg_atomic_fetch_add_u64(m_searches, 1);
* scan the hash bucket until we find a match or exhaust our tuples
*/
Index hash_index = HASH_INDEX(hash_value, (uint32)cc_nbuckets);
pthread_rwlock_t *bucket_lock = &m_bucket_rw_locks[hash_index];
InsertCatTupInfo tup_info;
tup_info.find_type = SEARCH_TUPLE_SKIP;
tup_info.arguments = arguments;
tup_info.hash_value = hash_value;
tup_info.hash_index = hash_index;
int location = INVALID_LOCATION;
PthreadRWlockRdlock(LOCAL_SYSDB_RESOWNER, bucket_lock);
GlobalCatCTup *ct = FindSearchKeyTupleFromCache(&tup_info, &location);
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, bucket_lock);
if (ct != NULL) {
pg_atomic_fetch_add_u64(m_hits, 1);
TopnLruMoveToFront(&ct->cache_elem, GetBucket(hash_index), bucket_lock, location);
return ct;
}
tup_info.find_type = SCAN_TUPLE_SKIP;
bool canInsertGSC = !g_instance.global_sysdbcache.StopInsertGSC();
if (unlikely(GetBucket(hash_index)->dll_len >= MAX_GSC_LIST_LENGTH)) {
RemoveTailTupleElements(hash_index);
canInsertGSC = canInsertGSC && GetBucket(hash_index)->dll_len < MAX_GSC_LIST_LENGTH;
}
tup_info.is_exclusive = !canInsertGSC;
tup_info.has_concurrent_lock = canInsertGSC;
ct = SearchTupleMiss(&tup_info);
if (ct != NULL) {
pg_atomic_fetch_add_u64(m_newloads, 1);
}
return ct;
}
GlobalCatCList *GlobalSysTupCache::FindListInternal(uint32 hash_value, int nkeys, Datum *arguments, int *location)
{
int index = 0;
for (Dlelem *elt = DLGetHead(&cc_lists); elt; elt = DLGetSucc(elt)) {
if (index > MAX_GSC_LIST_LENGTH) {
break;
}
index++;
GlobalCatCList *cl = (GlobalCatCList *)DLE_VAL(elt);
if (likely(cl->hash_value != hash_value)) {
continue;
}
if (cl->nkeys != nkeys) {
continue;
}
if (!CatalogCacheCompareTuple(m_relinfo.cc_fastequal, nkeys, cl->keys, arguments)) {
continue;
}
pg_atomic_fetch_add_u64(&cl->refcount, 1);
CACHE2_elog(DEBUG2, "SearchGlobalCatCacheList(%s): found list", m_relinfo.cc_relname);
*location = index;
return cl;
}
return NULL;
}
* SearchGlobalCatCacheList
*
* Generate a list of all tuples matching a partial key (that is,
* a key specifying just the first K of the cache's N key columns).
*
* The caller must not modify the list object or the pointed-to tuples,
* and must call Release() when done with the list.
*/
GlobalCatCList *GlobalSysTupCache::SearchListInternal(uint32 hash_value, int nkeys, Datum *arguments)
{
FreeDeadCls();
Assert(nkeys > 0 && nkeys < m_relinfo.cc_nkeys);
int location = INVALID_LOCATION;
PthreadRWlockRdlock(LOCAL_SYSDB_RESOWNER, m_list_rw_lock);
GlobalCatCList *cl = FindListInternal(hash_value, nkeys, arguments, &location);
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, m_list_rw_lock);
if (cl != NULL) {
ResourceOwnerRememberGlobalCatCList(LOCAL_SYSDB_RESOWNER, cl);
TopnLruMoveToFront(&cl->cache_elem, &cc_lists, m_list_rw_lock, location);
return cl;
}
bool canInsertGSC = !g_instance.global_sysdbcache.StopInsertGSC();
if (cc_lists.dll_len >= MAX_GSC_LIST_LENGTH) {
RemoveTailListElements();
canInsertGSC = canInsertGSC && cc_lists.dll_len < MAX_GSC_LIST_LENGTH;
}
cl = SearchListFromFile(hash_value, nkeys, arguments, !canInsertGSC);
return cl;
}
GlobalCatCList *GlobalSysTupCache::CreateCatCacheList(InsertCatListInfo *list_info)
{
MemoryContext old;
if (list_info->canInsertGSC) {
old = MemoryContextSwitchTo(m_dbEntry->GetRandomMemCxt());
} else {
old = MemoryContextSwitchTo(GetLSCMemcxt(m_cache_id));
}
GlobalCatCList *new_cl = (GlobalCatCList *)palloc(offsetof(GlobalCatCList, members) +
(list_length(list_info->ctlist) + 1) * sizeof(GlobalCatCTup *));
ResourceOwnerRememberGlobalCatCList(LOCAL_SYSDB_RESOWNER, new_cl);
new_cl->refcount = 1;
new_cl->canInsertGSC = false;
new_cl->nkeys = list_info->nkeys;
new_cl->my_cache = this;
new_cl->cl_magic = CL_MAGIC;
new_cl->hash_value = list_info->hash_value;
DLInitElem(&new_cl->cache_elem, new_cl);
new_cl->ordered = list_info->ordered;
new_cl->n_members = list_length(list_info->ctlist);
int index = 0;
ListCell *lc;
foreach (lc, list_info->ctlist) {
new_cl->members[index] = (GlobalCatCTup *)lfirst(lc);
index++;
}
* Avoid double-free elemnts of list_info->ctlist by Resowner and Try-Catch(),
* to be safely we free info_list->ctlist here to avoid double-free happen, normally
* Resowner will do this
*/
list_free_ext(list_info->ctlist);
Assert(index == new_cl->n_members);
errno_t rc = memset_s(new_cl->keys, list_info->nkeys * sizeof(Datum), 0, list_info->nkeys * sizeof(Datum));
securec_check(rc, "", "");
CatCacheCopyKeys(m_relinfo.cc_tupdesc, list_info->nkeys, m_relinfo.cc_keyno, list_info->arguments, new_cl->keys);
MemoryContextSwitchTo(old);
new_cl->canInsertGSC = list_info->canInsertGSC;
return new_cl;
}
GlobalCatCList *GlobalSysTupCache::InsertListIntoCatCacheList(InsertCatListInfo *list_info, GlobalCatCList *cl)
{
if (!cl->canInsertGSC) {
return cl;
}
int location = INVALID_LOCATION;
PthreadRWlockWrlock(LOCAL_SYSDB_RESOWNER, m_list_rw_lock);
GlobalCatCList *exist_cl = FindListInternal(list_info->hash_value, list_info->nkeys, list_info->arguments,
&location);
if (exist_cl != NULL) {
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, m_list_rw_lock);
ResourceOwnerForgetGlobalCatCList(LOCAL_SYSDB_RESOWNER, cl);
ResourceOwnerRememberGlobalCatCList(LOCAL_SYSDB_RESOWNER, exist_cl);
cl->refcount = 0;
GlobalCatCList::Free(cl);
cl = exist_cl;
} else {
AddHeadToCCList(cl);
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, m_list_rw_lock);
}
return cl;
}
GlobalCatCList *GlobalSysTupCache::SearchListMiss(InsertCatListInfo *list_info)
{
Relation relation;
ScanKeyData cur_skey[CATCACHE_MAXKEYS];
SysScanDesc scandesc;
errno_t rc;
* Ok, need to make a lookup in the relation, copy the scankey and
* fill out any per-call fields.
*/
rc = memcpy_s(cur_skey, sizeof(ScanKeyData) * CATCACHE_MAXKEYS, cc_skey,
sizeof(ScanKeyData) * m_relinfo.cc_nkeys);
securec_check(rc, "", "");
cur_skey[0].sk_argument = list_info->arguments[0];
cur_skey[1].sk_argument = list_info->arguments[1];
cur_skey[2].sk_argument = list_info->arguments[2];
cur_skey[3].sk_argument = list_info->arguments[3];
relation = heap_open(m_relinfo.cc_reloid, AccessShareLock);
scandesc = systable_beginscan(relation, m_relinfo.cc_indexoid, IndexScanOK(cc_id),
NULL, list_info->nkeys, cur_skey);
if (list_info->has_concurrent_lock) {
AcquireGSCTableReadLock(&list_info->has_concurrent_lock, m_concurrent_lock);
}
list_info->ordered = (scandesc->irel != NULL);
HeapTuple ntp;
InsertCatTupInfo tup_info;
tup_info.find_type = SCAN_LIST_SKIP;
tup_info.arguments = NULL;
while (HeapTupleIsValid(ntp = systable_getnext(scandesc))) {
if (IsProcCache(m_relinfo.cc_reloid) && IsSystemObjOid(HeapTupleGetOid(ntp)) &&
likely(u_sess->attr.attr_common.IsInplaceUpgrade == false)) {
continue;
}
if (IsAttributeCache(m_relinfo.cc_reloid)) {
bool attIsNull = false;
Oid attrelid = DatumGetObjectId(SysCacheGetAttr(cc_id, ntp, Anum_pg_attribute_attrelid, &attIsNull));
if (IsSystemObjOid(attrelid) && IsValidCatalogParam(GetCatalogParam(attrelid))) {
continue;
}
}
* See if there's an entry for this tuple already.
*/
InitInsertCatTupInfo(&tup_info, ntp, list_info->arguments);
tup_info.canInsertGSC = list_info->has_concurrent_lock && CanTupleInsertGSC(ntp);
GlobalCatCTup *ct = InsertHeapTupleIntoCatCacheInList(&tup_info);
list_info->canInsertGSC = list_info->canInsertGSC && ct->canInsertGSC;
* Careful here: enlarge resource owner catref array, add entry to ctlist, then bump its refcount.
* This way leaves state correct if enlarge or lappend runs out of memory_context_list
* We use resource owner to track referenced cachetups for safety reasons. Because ctlist is now
* built from different sources, i.e. built-in catalogs and physical relation tuples. If failure in later
* sources is not caught, resource owner will clean up ref count for cachetups got from previous sources.
*/
list_info->ctlist = lappend(list_info->ctlist, ct);
}
systable_endscan(scandesc);
heap_close(relation, AccessShareLock);
if (list_info->has_concurrent_lock && (!list_info->canInsertGSC || list_length(list_info->ctlist) == 0)) {
ReleaseGSCTableReadLock(&list_info->has_concurrent_lock, m_concurrent_lock);
list_info->canInsertGSC = false;
}
GlobalCatCList *cl = CreateCatCacheList(list_info);
return cl;
}
GlobalCatCList *GlobalSysTupCache::SearchListFromFile(uint32 hash_value, int nkeys, Datum *arguments,
bool is_exclusive)
{
InsertCatListInfo list_info;
list_info.ordered = false;
list_info.ctlist = NIL;
list_info.nkeys = nkeys;
list_info.arguments = arguments;
list_info.hash_value = hash_value;
list_info.is_exclusive = is_exclusive;
list_info.has_concurrent_lock = !is_exclusive;
list_info.canInsertGSC = !is_exclusive;
* List was not found in cache, so we have to build it by reading the
* relation. For each matching tuple found in the relation, use an
* existing cache entry if possible, else build a new one.
*
* We have to bump the member refcounts temporarily to ensure they won't
* get dropped from the cache while loading other members. We use a PG_TRY
* block to ensure we can undo those refcounts if we get an error before
* we finish constructing the CatCList.
*/
* which has the same name with the one we want to find, lappend it
* into the ctlist
*/
if (IsProcCache(m_relinfo.cc_reloid) && CacheIsProcNameArgNsp(cc_id) &&
likely(u_sess->attr.attr_common.IsInplaceUpgrade == false)) {
SearchBuiltinProcCacheList(&list_info);
}
if (IsAttributeCache(m_relinfo.cc_reloid) && IsSystemObjOid(DatumGetObjectId(arguments[0]))) {
SearchPgAttributeCacheList(&list_info);
}
GlobalCatCList *cl = NULL;
PG_TRY();
{
cl = SearchListMiss(&list_info);
}
PG_CATCH();
{
ReleaseAllGSCRdConcurrentLock();
ReleaseTempList(list_info.ctlist);
PG_RE_THROW();
}
PG_END_TRY();
cl = InsertListIntoCatCacheList(&list_info, cl);
if (list_info.has_concurrent_lock) {
ReleaseGSCTableReadLock(&list_info.has_concurrent_lock, m_concurrent_lock);
}
return cl;
}
void GlobalSysTupCache::ReleaseTempList(const List *ctlist)
{
ListCell *ctlist_item = NULL;
GlobalCatCTup *ct = NULL;
foreach (ctlist_item, ctlist) {
ct = (GlobalCatCTup *)lfirst(ctlist_item);
ct->Release();
}
}
void GlobalSysTupCache::InitInsertCatTupInfo(InsertCatTupInfo *tup_info, HeapTuple ntp, Datum *arguments)
{
uint32 hash_value = CatalogCacheComputeTupleHashValue(cc_id, m_relinfo.cc_keyno, m_relinfo.cc_tupdesc,
m_relinfo.cc_hashfunc, m_relinfo.cc_reloid, m_relinfo.cc_nkeys, ntp);
Index hash_index = HASH_INDEX(hash_value, (uint32)(cc_nbuckets));
tup_info->arguments = arguments;
tup_info->hash_value = hash_value;
tup_info->hash_index = hash_index;
tup_info->ntp = ntp;
}
void GlobalSysTupCache::SearchPgAttributeCacheList(InsertCatListInfo *list_info)
{
const FormData_pg_attribute *catlogAttrs = NULL;
Assert(list_info->nkeys == 1);
Oid relOid = ObjectIdGetDatum(list_info->arguments[0]);
CatalogRelationBuildParam catalogDesc = GetCatalogParam(relOid);
catlogAttrs = catalogDesc.attrs;
if (catalogDesc.oid == InvalidOid) {
return;
}
PG_TRY();
{
bool hasBucketAttr = false;
for (int16 attnum = 0; attnum < catalogDesc.natts + GetSysAttLength(hasBucketAttr); attnum++) {
Form_pg_attribute attr;
FormData_pg_attribute tempAttr;
if (attnum < catalogDesc.natts) {
tempAttr = catlogAttrs[attnum];
attr = &tempAttr;
} else {
int16 index = attnum - catalogDesc.natts;
if (!catalogDesc.hasoids && index == 1) {
continue;
}
attr = SystemAttributeDefinition(-(index + 1), catalogDesc.hasoids, false, false);
attr->attrelid = relOid;
}
HeapTuple ntp = GetPgAttributeAttrTuple(m_relinfo.cc_tupdesc, attr);
InsertCatTupInfo tup_info;
tup_info.find_type = PGATTR_LIST_SKIP;
InitInsertCatTupInfo(&tup_info, ntp, list_info->arguments);
tup_info.is_exclusive = list_info->is_exclusive;
tup_info.canInsertGSC = true;
tup_info.attnum = attnum;
tup_info.catalogDesc = &catalogDesc;
GlobalCatCTup *ct = InsertHeapTupleIntoCatCacheInList(&tup_info);
heap_freetuple(ntp);
* Careful here: enlarge resource owner catref array, add entry to ctlist, then bump its refcount.
* This way leaves state correct if enlarge or lappend runs out of memory_context_list
* We use resource owner to track referenced cachetups for safety reasons. Because ctlist is now
* built from different sources, i.e. built-in catalogs and physical relation tuples. If failure in later
* sources is not caught, resource owner will clean up ref count for cachetups got from previous sources.
*/
list_info->ctlist = lappend(list_info->ctlist, ct);
}
}
PG_CATCH();
{
ReleaseAllGSCRdConcurrentLock();
ReleaseTempList(list_info->ctlist);
PG_RE_THROW();
}
PG_END_TRY();
}
void GlobalSysTupCache::SearchBuiltinProcCacheList(InsertCatListInfo *list_info)
{
int i;
GlobalCatCTup *ct = NULL;
const FuncGroup *gfuncs = NULL;
Assert(list_info->nkeys == 1);
char *funcname = NameStr(*(DatumGetName(list_info->arguments[0])));
gfuncs = SearchBuiltinFuncByName(funcname);
if (gfuncs == NULL) {
CACHE3_elog(DEBUG2, "%s: the function \"%s\" does not in built-in list", __FUNCTION__, funcname);
return;
}
PG_TRY();
{
for (i = 0; i < gfuncs->fnums; i++) {
const Builtin_func *bfunc = &gfuncs->funcs[i];
HeapTuple ntp = CreateHeapTuple4BuiltinFunc(bfunc, NULL);
InsertCatTupInfo tup_info;
tup_info.find_type = PROC_LIST_SKIP;
InitInsertCatTupInfo(&tup_info, ntp, list_info->arguments);
tup_info.is_exclusive = list_info->is_exclusive;
tup_info.canInsertGSC = true;
ct = InsertHeapTupleIntoCatCacheInList(&tup_info);
heap_freetuple(ntp);
list_info->ctlist = lappend(list_info->ctlist, ct);
}
}
PG_CATCH();
{
ReleaseAllGSCRdConcurrentLock();
ReleaseTempList(list_info->ctlist);
PG_RE_THROW();
}
PG_END_TRY();
}
* CatalogCacheCreateEntry
* Create a new GlobalCatCTup entry, copying the given HeapTuple and other
* supplied data into it. The new entry initially has refcount 0.
* this func makes sure the tuple has a shared memory context
* this func never fail
*/
void GlobalSysTupCache::CopyTupleIntoGlobalCatCTup(GlobalCatCTup *ct, HeapTuple dtp)
{
Assert(!HeapTupleHasExternal(dtp));
ct->tuple = *dtp;
Assert(dtp->tupTableType == HEAP_TUPLE);
ct->tuple.tupTableType = HEAP_TUPLE;
ct->tuple.t_data = (HeapTupleHeader)MAXALIGN(((char *)ct) + sizeof(GlobalCatCTup));
errno_t rc = memcpy_s((char *)ct->tuple.t_data, dtp->t_len, (const char *)dtp->t_data, dtp->t_len);
securec_check(rc, "", "");
for (int i = 0; i < m_relinfo.cc_nkeys; i++) {
Datum atp;
bool isnull = false;
atp = heap_getattr(&ct->tuple, m_relinfo.cc_keyno[i], m_relinfo.cc_tupdesc, &isnull);
#ifndef ENABLE_MULTIPLE_NODES
Assert(!isnull || ((m_relinfo.cc_reloid == ProcedureRelationId) && atp == 0));
#else
Assert(!isnull);
#endif
ct->keys[i] = atp;
}
}
void GlobalSysTupCache::InitHashTable()
{
MemoryContext old = MemoryContextSwitchTo(m_dbEntry->GetRandomMemCxt());
size_t sz = (cc_nbuckets + 1) * sizeof(Dllist) + PG_CACHE_LINE_SIZE;
if (cc_buckets == NULL) {
void *origin_ptr = palloc0(sz);
cc_buckets = (Dllist *)CACHELINEALIGN(origin_ptr);
m_dbEntry->MemoryEstimateAdd(GetMemoryChunkSpace(origin_ptr));
}
if (m_bucket_rw_locks == NULL) {
m_bucket_rw_locks = (pthread_rwlock_t *)palloc0((cc_nbuckets + 1) * sizeof(pthread_rwlock_t));
m_dbEntry->MemoryEstimateAdd(GetMemoryChunkSpace(m_bucket_rw_locks));
for (int i = 0; i <= cc_nbuckets; i++) {
PthreadRwLockInit(&m_bucket_rw_locks[i], NULL);
}
}
if (m_is_tup_swappingouts == NULL) {
m_is_tup_swappingouts = (volatile uint32 *)palloc0(sizeof(volatile uint32) * (cc_nbuckets + 1));
m_dbEntry->MemoryEstimateAdd(GetMemoryChunkSpace((void *)m_is_tup_swappingouts));
}
m_is_list_swappingout = 0;
(void)MemoryContextSwitchTo(old);
}
void GlobalSysTupCache::InitCacheInfo(Oid reloid, Oid indexoid, int nkeys, const int *key, int nbuckets)
{
int i;
* nbuckets is the number of hash buckets to use in this GlobalCatCache.
* Currently we just use a hard-wired estimate of an appropriate size for
* each cache; maybe later make them dynamically resizable?
*
* nbuckets must be a power of two. We check this via Assert rather than
* a full runtime check because the values will be coming from constant
* tables.
*
* If you're confused by the power-of-two check, see comments in
* bitmapset.c for an explanation.
*/
Assert(nbuckets > 0 && (nbuckets & -nbuckets) == nbuckets);
* first switch to the cache context so our allocations do not vanish at
* the end of a transaction
*/
cc_nbuckets = ResizeHashBucket(nbuckets, g_instance.global_sysdbcache.dynamic_hash_bucket_strategy);
* Allocate a new cache structure, aligning to a cacheline boundary
*
* Note: we assume zeroing initializes the Dllist headers correctly
*/
InitHashTable();
MemoryContext old = MemoryContextSwitchTo(m_dbEntry->GetRandomMemCxt());
DLInitList(&cc_lists);
* initialize the cache's relation information for the relation
* corresponding to this cache, and initialize some of the new cache's
* other internal fields. But don't open the relation yet.
*/
cc_id = m_cache_id;
m_relinfo.cc_relname = NULL;
m_relinfo.cc_reloid = reloid;
m_relinfo.cc_indexoid = indexoid;
m_relinfo.cc_tupdesc = (TupleDesc)NULL;
m_relinfo.cc_nkeys = nkeys;
for (i = 0; i < nkeys; ++i) {
m_relinfo.cc_keyno[i] = key[i];
}
if (m_list_rw_lock == NULL) {
m_list_rw_lock = (pthread_rwlock_t *)palloc0(sizeof(pthread_rwlock_t));
m_dbEntry->MemoryEstimateAdd(GetMemoryChunkSpace(m_list_rw_lock));
PthreadRwLockInit(m_list_rw_lock, NULL);
}
if (m_concurrent_lock == NULL) {
m_concurrent_lock = (pthread_rwlock_t *)palloc0(sizeof(pthread_rwlock_t));
m_dbEntry->MemoryEstimateAdd(GetMemoryChunkSpace(m_concurrent_lock));
PthreadRwLockInit(m_concurrent_lock, NULL);
}
(void)MemoryContextSwitchTo(old);
}
uint64 GetTupdescSize(TupleDesc tupdesc)
{
uint64 main_size = GetMemoryChunkSpace(tupdesc);
uint64 constr_size = 0;
uint64 clusterkey_size = 0;
if (tupdesc->constr != NULL) {
TupleConstr *constr = tupdesc->constr;
constr_size += GetMemoryChunkSpace(constr);
if (constr->num_defval > 0) {
constr_size += GetMemoryChunkSpace(constr->defval);
for (int i = 0; i < constr->num_defval; i++) {
constr_size += GetMemoryChunkSpace(constr->defval[i].adbin);
}
constr_size += GetMemoryChunkSpace(constr->generatedCols);
}
if (constr->num_check) {
constr_size += GetMemoryChunkSpace(constr->check);
for (int i = 0; i < constr->num_check; i++) {
if (constr->check[i].ccname) {
constr_size += GetMemoryChunkSpace(constr->check[i].ccname);
}
if (constr->check[i].ccbin) {
constr_size += GetMemoryChunkSpace(constr->check[i].ccbin);
}
}
}
if (constr->clusterKeyNum != 0) {
clusterkey_size += GetMemoryChunkSpace(constr->clusterKeys);
}
}
uint64 defval_size = 0;
if (tupdesc->initdefvals != NULL) {
defval_size += GetMemoryChunkSpace(tupdesc->initdefvals);
TupInitDefVal *pInitDefVal = tupdesc->initdefvals;
for (int i = 0; i < tupdesc->natts; i++) {
if (!pInitDefVal[i].isNull) {
defval_size += GetMemoryChunkSpace(pInitDefVal[i].datum);
}
}
}
uint64 tupdesc_size = main_size + clusterkey_size + constr_size + defval_size;
return tupdesc_size;
}
void GlobalSysTupCache::InitRelationInfo()
{
Relation relation;
int i;
* During inplace or online upgrade, the to-be-fabricated catalogs are still missing,
* for which we can not throw an ERROR.
*/
LockRelationOid(m_relinfo.cc_reloid, AccessShareLock);
relation = RelationIdGetRelation(m_relinfo.cc_reloid);
if (!RelationIsValid(relation)) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not open relation with OID %u", m_relinfo.cc_reloid)));
}
pgstat_initstats(relation);
* switch to the cache context so our allocations do not vanish at the end
* of a transaction
*/
MemoryContext old = MemoryContextSwitchTo(m_dbEntry->GetRandomMemCxt());
* copy the relcache's tuple descriptor to permanent cache storage
*/
if (m_relinfo.cc_tupdesc == NULL) {
m_relinfo.cc_tupdesc = CopyTupleDesc(RelationGetDescr(relation));
m_dbEntry->MemoryEstimateAdd(GetTupdescSize(m_relinfo.cc_tupdesc));
}
* save the relation's name and relisshared flag, too (cc_relname is used
* only for debugging purposes)
*/
if (m_relinfo.cc_relname == NULL) {
m_relinfo.cc_relname = pstrdup(RelationGetRelationName(relation));
m_dbEntry->MemoryEstimateAdd(GetMemoryChunkSpace((void *)m_relinfo.cc_relname));
}
Assert(m_relinfo.cc_relisshared == RelationGetForm(relation)->relisshared);
* return to the caller's memory context and close the rel
*/
MemoryContextSwitchTo(old);
enable_rls = RelationEnableRowSecurity(relation);
heap_close(relation, AccessShareLock);
* initialize cache's key information
*/
for (i = 0; i < m_relinfo.cc_nkeys; ++i) {
Oid keytype;
RegProcedure eqfunc;
if (m_relinfo.cc_keyno[i] > 0)
keytype = m_relinfo.cc_tupdesc->attrs[m_relinfo.cc_keyno[i] - 1].atttypid;
else {
if (m_relinfo.cc_keyno[i] != ObjectIdAttributeNumber)
ereport(FATAL, (errmsg("only sys attr supported in caches is OID")));
keytype = OIDOID;
}
if (u_sess->hook_cxt.pluginCCHashEqFuncs == NULL ||
!((pluginCCHashEqFuncs)(u_sess->hook_cxt.pluginCCHashEqFuncs))(keytype, &m_relinfo.cc_hashfunc[i], &eqfunc,
&m_relinfo.cc_fastequal[i], cc_id)) {
GetCCHashEqFuncs(keytype, &m_relinfo.cc_hashfunc[i], &eqfunc, &m_relinfo.cc_fastequal[i]);
}
* Do equality-function lookup (we assume this won't need a catalog
* lookup for any supported type)
*/
pfree_ext(cc_skey[i].sk_func.fnLibPath);
fmgr_info_cxt(eqfunc, &cc_skey[i].sk_func, m_dbEntry->GetRandomMemCxt());
cc_skey[i].sk_attno = m_relinfo.cc_keyno[i];
cc_skey[i].sk_strategy = BTEqualStrategyNumber;
cc_skey[i].sk_subtype = InvalidOid;
if (keytype == TEXTOID) {
cc_skey[i].sk_collation = DEFAULT_COLLATION_OID;
} else {
cc_skey[i].sk_collation = InvalidOid;
}
}
}
void GlobalSysTupCache::Init()
{
if (m_isInited) {
return;
}
* init func followed by new, so m_isInited must be false
* lock first when call this func */
const cachedesc *cache_info = &cacheinfo[m_cache_id];
InitCacheInfo(cache_info->reloid, cache_info->indoid, cache_info->nkeys, cache_info->key, cache_info->nbuckets);
InitRelationInfo();
pg_memory_barrier();
m_isInited = true;
}
* CatalogCacheIdInvalidate
*
* Invalidate entries in the specified cache, given a hash value.
*
* We delete cache entries that match the hash value.
* We don't care whether the invalidation is the result
* of a tuple insertion or a deletion.
*
* We used to try to match positive cache entries by TID, but that is
* unsafe after a VACUUM FULL on a system catalog: an inval event could
* be queued before VACUUM FULL, and then processed afterwards, when the
* target tuple that has to be invalidated has a different TID than it
* did when the event was created. So now we just compare hash values and
* accept the small risk of unnecessary invalidations due to false matches.
*
* This routine is only quasi-public: it should only be used by inval.c.
*/
* HeapTupleInvalidate()
*
* pay attention:
* match disk one by one, so call this func when update , insert or delete every time
*/
void GlobalSysTupCache::HashValueInvalidate(uint32 hash_value)
{
if (!m_isInited) {
return;
}
* inspect caches to find the proper cache
* list of cache_header is never changed after init, so lock is not needed
* Invalidate *all *CatCLists in this cache; it's too hard to tell
* which searches might still be correct, so just zap 'em all.
*/
PthreadRWlockWrlock(LOCAL_SYSDB_RESOWNER, m_concurrent_lock);
if (cc_lists.dll_len > 0) {
PthreadRWlockWrlock(LOCAL_SYSDB_RESOWNER, m_list_rw_lock);
for (Dlelem *elt = DLGetHead(&cc_lists); elt;) {
GlobalCatCList *cl = (GlobalCatCList *)DLE_VAL(elt);
elt = DLGetSucc(elt);
HandleDeadGlobalCatCList(cl);
}
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, m_list_rw_lock);
} else {
* no one can scan table now */
}
uint32 hash_index = HASH_INDEX(hash_value, cc_nbuckets);
if (GetBucket(hash_index)->dll_len > 0) {
pthread_rwlock_t *bucket_lock = &m_bucket_rw_locks[hash_index];
PthreadRWlockWrlock(LOCAL_SYSDB_RESOWNER, bucket_lock);
for (Dlelem *elt = DLGetHead(GetBucket(hash_index)); elt;) {
GlobalCatCTup *ct = (GlobalCatCTup *)DLE_VAL(elt);
elt = DLGetSucc(elt);
if (hash_value != ct->hash_value) {
continue;
}
HandleDeadGlobalCatCTup(ct);
}
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, bucket_lock);
} else {
* no one can scan table now */
}
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, m_concurrent_lock);
}
* ResetGlobalCatCache
*
* Reset one catalog cache to empty.
*
* This is not very efficient if the target cache is nearly empty.
* However, it shouldn't need to be efficient; we don't invoke it often.
*/
template <bool force>
void GlobalSysTupCache::ResetCatalogCache()
{
if (!m_isInited) {
return;
}
if (force) {
PthreadRWlockWrlock(LOCAL_SYSDB_RESOWNER, m_concurrent_lock);
}
if (cc_lists.dll_len > 0) {
PthreadRWlockWrlock(LOCAL_SYSDB_RESOWNER, m_list_rw_lock);
for (Dlelem *elt = DLGetHead(&cc_lists); elt;) {
GlobalCatCList *cl = (GlobalCatCList *)DLE_VAL(elt);
elt = DLGetSucc(elt);
if (force || cl->refcount == 0) {
HandleDeadGlobalCatCList(cl);
}
}
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, m_list_rw_lock);
}
for (int i = 0; i < cc_nbuckets; i++) {
if (GetBucket(i)->dll_len == 0) {
continue;
}
pthread_rwlock_t *bucket_lock = &m_bucket_rw_locks[i];
PthreadRWlockWrlock(LOCAL_SYSDB_RESOWNER, bucket_lock);
for (Dlelem *elt = DLGetHead(&cc_buckets[i]); elt;) {
GlobalCatCTup *ct = (GlobalCatCTup *)DLE_VAL(elt);
elt = DLGetSucc(elt);
if (force || ct->refcount == 0) {
HandleDeadGlobalCatCTup(ct);
}
}
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, bucket_lock);
}
if (force) {
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, m_concurrent_lock);
}
}
template void GlobalSysTupCache::ResetCatalogCache<false>();
template void GlobalSysTupCache::ResetCatalogCache<true>();
List *GlobalSysTupCache::GetGlobalCatCTupStat()
{
List *tuple_stat_list = NIL;
if (!m_isInited) {
return tuple_stat_list;
}
for (int i = 0; i < cc_nbuckets; i++) {
if (GetBucket(i)->dll_len == 0) {
continue;
}
pthread_rwlock_t *bucket_lock = &m_bucket_rw_locks[i];
PthreadRWlockRdlock(LOCAL_SYSDB_RESOWNER, bucket_lock);
for (Dlelem *elt = DLGetHead(&cc_buckets[i]); elt;) {
GlobalCatCTup *ct = (GlobalCatCTup *)DLE_VAL(elt);
elt = DLGetSucc(elt);
GlobalCatalogTupleStat *tup_stat = (GlobalCatalogTupleStat*)palloc(sizeof(GlobalCatalogTupleStat));
tup_stat->db_oid = m_dbOid;
tup_stat->db_name_datum = CStringGetTextDatum(m_dbEntry->m_dbName);
tup_stat->rel_oid = m_relinfo.cc_reloid;
tup_stat->rel_name_datum = CStringGetTextDatum(m_relinfo.cc_relname);
tup_stat->cache_id = m_cache_id;
tup_stat->self = ct->tuple.t_self;
tup_stat->ctid = ct->tuple.t_data->t_ctid;
tup_stat->infomask = ct->tuple.t_data->t_infomask;
tup_stat->infomask2 = ct->tuple.t_data->t_infomask2;
tup_stat->hash_value = ct->hash_value;
tup_stat->refcount = ct->refcount;
tuple_stat_list = lappend(tuple_stat_list, tup_stat);
}
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, bucket_lock);
}
return tuple_stat_list;
}
#ifndef ENABLE_MULTIPLE_NODES
GlobalCatCTup *GlobalSysTupCache::SearchTupleWithArgModes(uint32 hash_value, Datum *arguments, oidvector* argModes)
{
FreeDeadCts();
pg_atomic_fetch_add_u64(m_searches, 1);
* scan the hash bucket until we find a match or exhaust our tuples
*/
Index hash_index = HASH_INDEX(hash_value, (uint32)cc_nbuckets);
pthread_rwlock_t *bucket_lock = &m_bucket_rw_locks[hash_index];
InsertCatTupInfo tup_info;
tup_info.find_type = SEARCH_TUPLE_SKIP;
tup_info.arguments = arguments;
tup_info.hash_value = hash_value;
tup_info.hash_index = hash_index;
int location = INVALID_LOCATION;
int index = 0;
GlobalCatCTup *ct = NULL;
PthreadRWlockRdlock(LOCAL_SYSDB_RESOWNER, bucket_lock);
for (Dlelem *elt = DLGetHead(GetBucket(hash_index)); elt != NULL; elt = DLGetSucc(elt)) {
index++;
ct = (GlobalCatCTup *)DLE_VAL(elt);
if (ct->hash_value != hash_value) {
ct = NULL;
continue;
}
if (!CatalogCacheCompareTuple(m_relinfo.cc_fastequal, m_relinfo.cc_nkeys, ct->keys, arguments)) {
ct = NULL;
continue;
}
if (unlikely(u_sess->attr.attr_common.IsInplaceUpgrade) && ct->tuple.t_self.ip_posid == 0) {
Assert(ct->tuple.t_tableOid == InvalidOid);
ct = NULL;
continue;
}
* Make sure that the in-out-info(argmodes) should be the same.
*/
if (!IsProArgModesEqualByTuple(&ct->tuple, m_relinfo.cc_tupdesc, argModes)) {
ct = NULL;
continue;
}
pg_atomic_fetch_add_u64(&ct->refcount, 1);
location = index;
break;
}
PthreadRWlockUnlock(LOCAL_SYSDB_RESOWNER, bucket_lock);
if (ct != NULL) {
pg_atomic_fetch_add_u64(m_hits, 1);
TopnLruMoveToFront(&ct->cache_elem, GetBucket(hash_index), bucket_lock, location);
return ct;
}
tup_info.find_type = SCAN_TUPLE_SKIP;
bool canInsertGSC = !g_instance.global_sysdbcache.StopInsertGSC();
if (cc_lists.dll_len >= MAX_GSC_LIST_LENGTH) {
RemoveTailListElements();
canInsertGSC = canInsertGSC && cc_lists.dll_len < MAX_GSC_LIST_LENGTH;
}
tup_info.is_exclusive = !canInsertGSC;
tup_info.has_concurrent_lock = canInsertGSC;
ct = SearchTupleMissWithArgModes(&tup_info, argModes);
if (ct != NULL) {
pg_atomic_fetch_add_u64(m_newloads, 1);
}
return ct;
}
GlobalCatCTup *GlobalSysTupCache::SearchTupleFromFileWithArgModes(
uint32 hash_value, Datum *arguments, oidvector* argModes, bool is_exclusive)
{
InsertCatTupInfo tup_info;
tup_info.find_type = SCAN_TUPLE_SKIP;
tup_info.arguments = arguments;
tup_info.hash_value = hash_value;
tup_info.hash_index = HASH_INDEX(hash_value, (uint32)cc_nbuckets);
tup_info.has_concurrent_lock = !is_exclusive;
tup_info.is_exclusive = is_exclusive;
Assert(is_exclusive);
GlobalCatCTup *ct = SearchTupleMissWithArgModes(&tup_info, argModes);
if (ct != NULL) {
pg_atomic_fetch_add_u64(m_newloads, 1);
}
return ct;
}
GlobalCatCTup *GlobalSysTupCache::SearchTupleMissWithArgModes(InsertCatTupInfo *tup_info, oidvector* argModes)
{
GlobalCatCTup *ct = SearchMissFromProcAndAttribute(tup_info);
if (ct != NULL) {
return ct;
}
Relation relation = heap_open(m_relinfo.cc_reloid, AccessShareLock);
ereport(DEBUG1, (errmsg("cache->cc_reloid - %d", m_relinfo.cc_reloid)));
* Ok, need to make a lookup in the relation, copy the scankey and fill
* out any per-call fields.
*/
ScanKeyData cur_skey[CATCACHE_MAXKEYS];
errno_t rc = memcpy_s(cur_skey, sizeof(ScanKeyData) * CATCACHE_MAXKEYS, cc_skey,
sizeof(ScanKeyData) * m_relinfo.cc_nkeys);
securec_check(rc, "", "");
Datum *arguments = tup_info->arguments;
cur_skey[0].sk_argument = arguments[0];
cur_skey[1].sk_argument = arguments[1];
cur_skey[2].sk_argument = arguments[2];
cur_skey[3].sk_argument = arguments[3];
SysScanDesc scandesc =
systable_beginscan(relation, m_relinfo.cc_indexoid, IndexScanOK(cc_id), NULL,
m_relinfo.cc_nkeys, cur_skey);
tup_info->has_concurrent_lock = !tup_info->is_exclusive;
if (tup_info->has_concurrent_lock) {
AcquireGSCTableReadLock(&tup_info->has_concurrent_lock, m_concurrent_lock);
}
HeapTuple ntp;
while (HeapTupleIsValid(ntp = systable_getnext(scandesc))) {
* The key2 (pg_proc_allargtypes) can be duplicate in table.
* We need to compare the proargmodes to make sure the function is correct.
*/
if (!IsProArgModesEqualByTuple(ntp, m_relinfo.cc_tupdesc, argModes)) {
continue;
}
tup_info->ntp = ntp;
if (!tup_info->has_concurrent_lock) {
tup_info->canInsertGSC = false;
} else {
tup_info->canInsertGSC = CanTupleInsertGSC(ntp);
if (!tup_info->canInsertGSC) {
ReleaseGSCTableReadLock(&tup_info->has_concurrent_lock, m_concurrent_lock);
}
}
ct = InsertHeapTupleIntoCatCacheInSingle(tup_info);
break;
}
if (tup_info->has_concurrent_lock) {
ReleaseGSCTableReadLock(&tup_info->has_concurrent_lock, m_concurrent_lock);
}
systable_endscan(scandesc);
heap_close(relation, AccessShareLock);
* global catcache match disk , not need negative tuple
*/
return ct;
}
#endif