*
* indextuple.cpp
* This file contains index tuple accessor and mutator routines,
* as well as various tuple utilities.
*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/gausskernel/storage/access/common/indextuple.cpp
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/nbtree.h"
#include "access/ubtree.h"
#include "access/ubtreepcr.h"
#include "access/heapam.h"
#include "access/tableam.h"
#include "access/itup.h"
#include "access/tuptoaster.h"
#include "access/sysattr.h"
#include "utils/rel.h"
* index_ tuple interface routines
* ----------------------------------------------------------------
*/
static inline bool index_findattr(Relation irel, IndexTuple itup, AttrNumber attrno, Datum *value)
{
bool isnull = false;
TupleDesc tupdesc = RelationGetDescr(irel);
int nattrs = IndexRelationGetNumberOfAttributes(irel);
for (int i = 0; i < nattrs; i++) {
if (irel->rd_index->indkey.values[i] == attrno) {
*value = index_getattr(itup, i + 1, tupdesc, &isnull);
break;
}
}
return !isnull;
}
Oid index_getattr_tableoid(Relation irel, IndexTuple itup)
{
Datum val = 0;
Oid tableoid = InvalidOid;
Assert(RelationIsIndex(irel));
if (index_findattr(irel, itup, TableOidAttributeNumber, &val)) {
tableoid = DatumGetUInt32(val);
}
return tableoid;
}
int2 index_getattr_bucketid(Relation irel, IndexTuple itup)
{
Datum val = 0;
int2 bucketid = InvalidBktId;
Assert(RelationIsIndex(irel));
if (index_findattr(irel, itup, BucketIdAttributeNumber, &val)) {
bucketid = DatumGetInt16(val);
}
return bucketid;
}
* index_form_tuple
*
* This shouldn't leak any memory; otherwise, callers such as
* tuplesort_putindextuplevalues() will be very unhappy.
* ----------------
*/
IndexTuple index_form_tuple(TupleDesc tuple_descriptor, Datum* values, const bool* isnull,
bool is_ubtree, bool is_ubtree_pcr)
{
char *tp = NULL;
IndexTuple tuple = NULL;
Size size, data_size, hoff;
int i;
unsigned short infomask = 0;
bool hasnull = false;
uint16 tupmask = 0;
int attributeNum = tuple_descriptor->natts;
Size (*computedatasize_tuple)(TupleDesc tuple_desc, Datum* values, const bool* isnull);
void (*filltuple)(TupleDesc tuple_desc, Datum* values, const bool* isnull, char* data, Size data_size, uint16* infomask, bits8* bit);
computedatasize_tuple = &heap_compute_data_size;
filltuple = &heap_fill_tuple;
#ifdef TOAST_INDEX_HACK
Datum untoasted_values[INDEX_MAX_KEYS];
bool untoasted_free[INDEX_MAX_KEYS];
#endif
if (attributeNum > INDEX_MAX_KEYS)
ereport(ERROR,
(errcode(ERRCODE_TOO_MANY_COLUMNS),
errmsg("number of index columns (%d) exceeds limit (%d)", attributeNum, INDEX_MAX_KEYS)));
#ifdef TOAST_INDEX_HACK
uint32 toastTarget = TOAST_INDEX_TARGET;
if (tuple_descriptor->td_tam_ops == TableAmUstore) {
toastTarget = UTOAST_INDEX_TARGET;
}
for (i = 0; i < attributeNum; i++) {
Form_pg_attribute att = &tuple_descriptor->attrs[i];
untoasted_values[i] = values[i];
untoasted_free[i] = false;
if (isnull[i] || att->attlen != -1)
continue;
* If value is stored EXTERNAL, must fetch it so we are not depending
* on outside storage. This should be improved someday.
*/
Pointer val = DatumGetPointer(values[i]);
checkHugeToastPointer((varlena *)val);
if (VARATT_IS_EXTERNAL(val)) {
untoasted_values[i] = PointerGetDatum(heap_tuple_fetch_attr((struct varlena*)DatumGetPointer(values[i])));
untoasted_free[i] = true;
}
* If value is above size target, and is of a compressible datatype,
* try to compress it in-line.
*/
if (!VARATT_IS_EXTENDED(DatumGetPointer(untoasted_values[i])) &&
VARSIZE(DatumGetPointer(untoasted_values[i])) > toastTarget &&
(att->attstorage == 'x' || att->attstorage == 'm')) {
Datum cvalue = toast_compress_datum(untoasted_values[i]);
if (DatumGetPointer(cvalue) != NULL) {
if (untoasted_free[i])
pfree(DatumGetPointer(untoasted_values[i]));
untoasted_values[i] = cvalue;
untoasted_free[i] = true;
}
}
}
#endif
for (i = 0; i < attributeNum; i++) {
if (isnull[i]) {
hasnull = true;
break;
}
}
if (hasnull)
infomask |= INDEX_NULL_MASK;
hoff = IndexInfoFindDataOffset(infomask);
#ifdef TOAST_INDEX_HACK
data_size = computedatasize_tuple(tuple_descriptor, untoasted_values, isnull);
#else
data_size = computedatasize_tuple(tuple_descriptor, values, isnull);
#endif
size = hoff + data_size;
size = MAXALIGN(size);
tp = (char*)palloc0(size);
tuple = (IndexTuple)tp;
filltuple(tuple_descriptor,
#ifdef TOAST_INDEX_HACK
untoasted_values,
#else
values,
#endif
isnull,
(char*)tp + hoff,
data_size,
&tupmask,
(hasnull ? (bits8*)tp + sizeof(IndexTupleData) : NULL));
#ifdef TOAST_INDEX_HACK
for (i = 0; i < attributeNum; i++) {
if (untoasted_free[i])
pfree(DatumGetPointer(untoasted_values[i]));
}
#endif
* We do this because heap_fill_tuple wants to initialize a "tupmask"
* which is used for HeapTuples, but we want an indextuple infomask. The
* only relevant info is the "has variable attributes" field. We have
* already set the hasnull bit above.
*/
if (tupmask & HEAP_HASVARWIDTH)
infomask |= INDEX_VAR_MASK;
* Here we make sure that the size will fit in the field reserved for it
* in t_info. Ubtree page reserved xmin/xmax.
*/
Size check_size = size + (is_ubtree && !is_ubtree_pcr ? sizeof(ShortTransactionId) * 2 : 0);
if ((check_size & INDEX_SIZE_MASK) != check_size)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("index row requires %lu bytes, maximum size is %lu",
(unsigned long)check_size,
(unsigned long)INDEX_SIZE_MASK)));
infomask |= size;
* initialize metadata
*/
tuple->t_info = infomask;
return tuple;
}
* nocache_index_getattr
*
* This gets called from index_getattr() macro, and only in cases
* where we can't use cacheoffset and the value is not null.
*
* This caches attribute offsets in the attribute descriptor.
*
* An alternative way to speed things up would be to cache offsets
* with the tuple, but that seems more difficult unless you take
* the storage hit of actually putting those offsets into the
* tuple you send to disk. Yuck.
*
* This scheme will be slightly slower than that, but should
* perform well for queries which hit large #'s of tuples. After
* you cache the offsets once, examining all the other tuples using
* the same attribute descriptor will go much quicker. -cim 5/4/91
* ----------------
*/
Datum nocache_index_getattr(IndexTuple tup, uint32 attnum, TupleDesc tuple_desc)
{
FormData_pg_attribute* att = tuple_desc->attrs;
char* tp = NULL;
bits8* bp = NULL;
bool slow = false;
int data_off;
int off;
* Three cases:
* 1: No nulls and no variable-width attributes.
* 2: Has a null or a var-width AFTER att.
* 3: Has nulls or var-widths BEFORE att.
*/
data_off = IndexInfoFindDataOffset(tup->t_info);
attnum--;
* there's a null somewhere in the tuple
*
* check to see if desired att is null
*/
if (IndexTupleHasNulls(tup)) {
bp = (bits8*)((char*)tup + sizeof(IndexTupleData));
* Now check to see if any preceding bits are null...
*/
{
int byte = attnum >> 3;
int finalbit = attnum & 0x07;
if ((~bp[byte]) & (((uint)1 << finalbit) - 1))
slow = true;
else {
int i;
for (i = 0; i < byte; i++) {
if (bp[i] != 0xFF) {
slow = true;
break;
}
}
}
}
}
tp = (char*)tup + data_off;
if (!slow) {
* If we get here, there are no nulls up to and including the target
* attribute. If we have a cached offset, we can use it.
*/
if (att[attnum].attcacheoff >= 0) {
return fetchatt(&att[attnum], tp + att[attnum].attcacheoff);
}
* Otherwise, check for non-fixed-length attrs up to and including
* target. If there aren't any, it's safe to cheaply initialize the
* cached offsets for these attrs.
*/
if (IndexTupleHasVarwidths(tup)) {
int j;
for (j = 0; j <= (int)attnum; j++) {
if (att[j].attlen <= 0) {
slow = true;
break;
}
}
}
}
if (!slow) {
int natts = tuple_desc->natts;
int j = 1;
* If we get here, we have a tuple with no nulls or var-widths up to
* and including the target attribute, so we can use the cached offset
* ... only we don't have it yet, or we'd not have got here. Since
* it's cheap to compute offsets for fixed-width columns, we take the
* opportunity to initialize the cached offsets for *all* the leading
* fixed-width columns, in hope of avoiding future visits to this
* routine.
*/
att[0].attcacheoff = 0;
while (j < natts && att[j].attcacheoff > 0)
j++;
off = att[j - 1].attcacheoff + att[j - 1].attlen;
for (; j < natts; j++) {
if (att[j].attlen <= 0)
break;
off = att_align_nominal((uint32)off, att[j].attalign);
att[j].attcacheoff = off;
off += att[j].attlen;
}
Assert(j > (int)attnum);
off = att[attnum].attcacheoff;
} else {
bool usecache = true;
int i;
* Now we know that we have to walk the tuple CAREFULLY. But we still
* might be able to cache some offsets for next time.
*
* Note - This loop is a little tricky. For each non-null attribute,
* we have to first account for alignment padding before the attr,
* then advance over the attr based on its length. Nulls have no
* storage and no alignment padding either. We can use/set
* attcacheoff until we reach either a null or a var-width attribute.
*/
off = 0;
for (i = 0;; i++) {
if (IndexTupleHasNulls(tup) && att_isnull(i, bp)) {
usecache = false;
continue;
}
if (usecache && att[i].attcacheoff >= 0)
off = att[i].attcacheoff;
else if (att[i].attlen == -1) {
* We can only cache the offset for a varlena attribute if the
* offset is already suitably aligned, so that there would be
* no pad bytes in any case: then the offset will be valid for
* either an aligned or unaligned value.
*/
if (usecache && (uintptr_t)(off) == att_align_nominal((uint32)off, att[i].attalign))
att[i].attcacheoff = off;
else {
off = att_align_pointer((uint32)off, att[i].attalign, -1, tp + off);
usecache = false;
}
} else {
off = att_align_nominal((uint32)off, att[i].attalign);
if (usecache)
att[i].attcacheoff = off;
}
if (i == (int)attnum)
break;
off = att_addlength_pointer(off, att[i].attlen, tp + off);
if (usecache && att[i].attlen <= 0)
usecache = false;
}
}
return fetchatt(&att[attnum], tp + off);
}
* Convert an index tuple into Datum/isnull arrays.
*
* The caller must allocate sufficient storage for the output arrays.
* (INDEX_MAX_KEYS entries should be enough.)
*/
void index_deform_tuple(IndexTuple tup, TupleDesc tuple_descriptor, Datum* values, bool* isnull)
{
if (u_sess->attr.attr_common.enable_indexscan_optimization) {
int hasnulls = IndexTupleHasNulls(tup);
int natts = tuple_descriptor->natts;
int attnum;
char *tp;
int off;
bits8 *bp;
bool slow = false;
Assert(natts <= INDEX_MAX_KEYS);
bp = (bits8 *)((char *)tup + sizeof(IndexTupleData));
tp = (char *)tup + IndexInfoFindDataOffset(tup->t_info);
off = 0;
for (attnum = 0; attnum < natts; attnum++) {
Form_pg_attribute thisatt = TupleDescAttr(tuple_descriptor, attnum);
if (hasnulls && att_isnull(attnum, bp)) {
values[attnum] = (Datum)0;
isnull[attnum] = true;
slow = true;
continue;
}
isnull[attnum] = false;
if (!slow && thisatt->attcacheoff >= 0)
off = thisatt->attcacheoff;
else if (thisatt->attlen == -1) {
if (!slow && (uint)off == att_align_nominal(off, thisatt->attalign))
thisatt->attcacheoff = off;
else {
off = att_align_pointer(off, thisatt->attalign, -1, tp + off);
slow = true;
}
} else {
off = att_align_nominal(off, thisatt->attalign);
if (!slow)
thisatt->attcacheoff = off;
}
values[attnum] = fetchatt(thisatt, tp + off);
off = att_addlength_pointer(off, thisatt->attlen, tp + off);
if (thisatt->attlen <= 0)
slow = true;
}
} else {
int i;
Assert(tuple_descriptor->natts <= INDEX_MAX_KEYS);
for (i = 0; i < tuple_descriptor->natts; i++) {
values[i] = index_getattr(tup, i + 1, tuple_descriptor, &isnull[i]);
}
}
}
* Create a palloc'd copy of an index tuple.
*/
IndexTuple CopyIndexTuple(IndexTuple source)
{
IndexTuple result;
Size size;
errno_t rc = EOK;
size = IndexTupleSize(source);
result = (IndexTuple)palloc(size);
rc = memcpy_s(result, size, source, size);
securec_check(rc, "\0", "\0");
return result;
}
* Create a palloc'd copy of an index tuple with a reserved space.
*/
IndexTuple CopyIndexTupleAndReserveSpace(IndexTuple source, Size reserved_size)
{
IndexTuple result;
Size size;
errno_t rc = EOK;
size = IndexTupleSize(source);
result = (IndexTuple)palloc0(size + reserved_size);
rc = memcpy_s(result, size, source, size);
securec_check(rc, "\0", "\0");
IndexTupleSetSize(result, size + reserved_size);
return result;
}
* Truncate tailing attributes from given index tuple leaving it with
* new_indnatts number of attributes.
*/
IndexTuple index_truncate_tuple(TupleDesc tupleDescriptor, IndexTuple olditup, int new_indnatts)
{
TupleDesc itupdesc = CreateTupleDescCopyConstr(tupleDescriptor);
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
IndexTuple newitup;
int indnatts = tupleDescriptor->natts;
Assert(indnatts <= INDEX_MAX_KEYS);
Assert(new_indnatts > 0);
Assert(new_indnatts <= indnatts);
if (new_indnatts == indnatts) {
return CopyIndexTuple(olditup);
}
index_deform_tuple(olditup, tupleDescriptor, values, isnull);
itupdesc->natts = new_indnatts;
newitup = index_form_tuple(itupdesc, values, isnull);
newitup->t_tid = olditup->t_tid;
FreeTupleDesc(itupdesc);
Assert(IndexTupleSize(newitup) <= IndexTupleSize(olditup));
return newitup;
}
* Truncate tailing attributes from given index tuple leaving it with
* new_indnatts number of attributes.
*/
IndexTuple UBTreeIndexTruncateTuple(TupleDesc tupleDescriptor, IndexTuple olditup, int leavenatts, bool itup_extended)
{
TupleDesc itupdesc = CreateTupleDescCopyConstr(tupleDescriptor);
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
IndexTuple newitup;
int indnatts = tupleDescriptor->natts;
if (indnatts > INDEX_MAX_KEYS) {
ereport(ERROR,
(errcode(ERRCODE_TOO_MANY_COLUMNS),
errmsg("number of index columns (%d) exceeds limit (%d)", indnatts, INDEX_MAX_KEYS)));
}
Assert(leavenatts > 0);
Assert(leavenatts <= indnatts);
if (leavenatts == indnatts) {
if (itup_extended) {
IndexTuple copiedItup;
Size oldsz = IndexTupleSize(olditup);
Size newsz = oldsz - TXNINFOSIZE;
IndexTupleSetSize(olditup, newsz);
copiedItup = CopyIndexTuple(olditup);
IndexTupleSetSize(olditup, oldsz);
return copiedItup;
}
return CopyIndexTuple(olditup);
}
index_deform_tuple(olditup, tupleDescriptor, values, isnull);
itupdesc->natts = leavenatts;
newitup = index_form_tuple(itupdesc, values, isnull, true);
newitup->t_tid = olditup->t_tid;
Assert(IndexTupleSize(newitup) <= IndexTupleSize(olditup));
FreeTupleDesc(itupdesc);
return newitup;
}