* Copyright (c) 2021 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* userchain.cpp
* functions for ledger history table achivement.
*
* IDENTIFICATION
* src/gausskernel/security/gs_ledger/userchain.cpp
*
* -------------------------------------------------------------------------
*/
#include "gs_ledger/ledger_utils.h"
#include "gs_ledger/userchain.h"
#include "gs_ledger/blockchain.h"
#include "catalog/indexing.h"
#include "catalog/pg_proc.h"
#include "catalog/cstore_ctlg.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "access/hash.h"
#include "access/visibilitymap.h"
#include "access/hbucket_am.h"
#include "libpq/md5.h"
#include "utils/uuid.h"
#include "utils/snapmgr.h"
#include "catalog/gs_global_chain.h"
* create_hist_relation -- create a hist table based on the original user table.
*
* rel: The original user relation
* reloptions: relation options used to define new relation
* mainTblStmt: Some statement of the query when create the new relation.
*/
void create_hist_relation(Relation rel, Datum reloptions, CreateStmt *mainTblStmt)
{
errno_t rc;
char hist_name[NAMEDATALEN];
Oid relid = RelationGetRelid(rel);
Oid nsp_oid = PG_BLOCKCHAIN_NAMESPACE;
Oid hist_oid;
Oid collationObjectId[1];
Oid classObjectId[1];
int16 coloptions[1];
bool shared_relation = rel->rd_rel->relisshared;
get_hist_name(relid, get_rel_name(relid), hist_name);
* history chain table contains all the columns from the origin user table, and then need to
* record the command type, blocknum, and hash value of last block record.
*/
TupleDesc chain_desc = CreateTemplateTupleDesc(USERCHAIN_COLUMN_NUM, false);
TupleDescInitEntry(chain_desc, USERCHAIN_COLUMN_REC_NUM + 1, "rec_num", INT8OID, -1, 0);
TupleDescInitEntry(chain_desc, USERCHAIN_COLUMN_HASH_INS + 1, "hash_ins", HASH16OID, -1, 0);
TupleDescInitEntry(chain_desc, USERCHAIN_COLUMN_HASH_DEL + 1, "hash_del", HASH16OID, -1, 0);
TupleDescInitEntry(chain_desc, USERCHAIN_COLUMN_PREVHASH + 1, "pre_hash", HASH32OID, -1, 0);
reloptions = AddInternalOption(reloptions, INTERNAL_MASK_DALTER | INTERNAL_MASK_DDELETE |
INTERNAL_MASK_DINSERT | INTERNAL_MASK_DUPDATE);
hist_oid = heap_create_with_catalog(hist_name, nsp_oid, rel->rd_rel->reltablespace, InvalidOid,
InvalidOid, InvalidOid, rel->rd_rel->relowner, chain_desc, NIL, 'r',
(rel->rd_rel->relpersistence == 't') ? 'u' : rel->rd_rel->relpersistence,
shared_relation, false, true, 0, ONCOMMIT_NOOP, reloptions, false, true,
NULL, REL_CMPRS_NOT_SUPPORT, NULL, false);
CommandCounterIncrement();
#ifdef ENABLE_MULTIPLE_NODES
bool is_initdb_on_dn = false;
* the create table command is executed on datanode during initialization .
* In this case, we do not write created table info in pgxc_class.
*/
if ((*t_thrd.pgxc_cxt.shmemNumDataNodes + *t_thrd.pgxc_cxt.shmemNumCoords) == 1) {
is_initdb_on_dn = true;
}
if ((!u_sess->attr.attr_common.IsInplaceUpgrade || !IsSystemNamespace(nsp_oid)) &&
(IS_PGXC_COORDINATOR || (isRestoreMode && mainTblStmt->distributeby != NULL && !is_initdb_on_dn))) {
AddRelationDistribution(hist_name, hist_oid, NULL, mainTblStmt->subcluster,
InvalidOid, chain_desc, true);
CommandCounterIncrement();
RelationCacheInvalidateEntry(hist_oid);
}
#endif
char hist_index_name[NAMEDATALEN];
rc = snprintf_s(hist_index_name, NAMEDATALEN, NAMEDATALEN - 1, "gs_hist_%u_index", relid);
securec_check_ss(rc, "", "");
Relation hist_rel = heap_open(hist_oid, ShareLock);
IndexInfo *hist_index = makeNode(IndexInfo);
hist_index->ii_NumIndexAttrs = 1;
hist_index->ii_NumIndexKeyAttrs = 1;
hist_index->ii_KeyAttrNumbers[0] = 1;
hist_index->ii_Expressions = NIL;
hist_index->ii_ExpressionsState = NIL;
hist_index->ii_Predicate = NIL;
hist_index->ii_PredicateState = NIL;
hist_index->ii_ExclusionOps = NULL;
hist_index->ii_ExclusionProcs = NULL;
hist_index->ii_ExclusionStrats = NULL;
hist_index->ii_Unique = true;
hist_index->ii_ReadyForInserts = true;
hist_index->ii_Concurrent = false;
hist_index->ii_BrokenHotChain = false;
hist_index->ii_PgClassAttrId = Anum_pg_class_relhasindex;
collationObjectId[0] = InvalidOid;
classObjectId[0] = INT4_BTREE_OPS_OID;
coloptions[0] = 0;
IndexCreateExtraArgs extra;
extra.existingPSortOid = InvalidOid;
extra.isPartitionedIndex = false;
extra.isGlobalPartitionedIndex = false;
index_create(hist_rel, hist_index_name, InvalidOid, InvalidOid,
hist_index, list_make1((void *)"rec_num"), BTREE_AM_OID,
rel->rd_rel->reltablespace, collationObjectId, classObjectId,
coloptions, (Datum) 0, true, false, false, false,
true, false, false, &extra, false);
heap_close(hist_rel, NoLock);
ObjectAddress myself;
ObjectAddress referenced;
myself.classId = RelationRelationId;
myself.objectId = hist_oid;
myself.objectSubId = 0;
referenced.classId = RelationRelationId;
referenced.objectId = relid;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
pfree_ext(chain_desc);
* Make changes visible
*/
CommandCounterIncrement();
}
* rename_hist_by_usertable -- rename hist table name by its user table and new usertable name
*
* relid: relation oid of user table
* new_usertable_name: the new table name of user table
*
* Note: This function is used after origin user table renamed, and then caller
* can use this function to rename the corresponding hist table name.
*/
void rename_hist_by_usertable(Oid relid, const char *new_usertable_name)
{
Oid hist_oid = get_hist_oid(relid);
char new_hist_name[NAMEDATALEN];
get_hist_name(relid, new_usertable_name, new_hist_name);
RenameRelationInternal(hist_oid, new_hist_name);
}
* rename_hist_by_newnsp -- rename one hist table while altering schema name
*
* user_relid: relation oid of user table
* new_nsp_name: the new schema name of user table
*/
void rename_hist_by_newnsp(Oid user_relid, const char *new_nsp_name)
{
Oid hist_oid;
char old_hist_name[NAMEDATALEN] = {0};
char new_hist_name[NAMEDATALEN] = {0};
get_hist_name(user_relid, get_rel_name(user_relid), old_hist_name);
hist_oid = get_relname_relid(old_hist_name, PG_BLOCKCHAIN_NAMESPACE);
if (!OidIsValid(hist_oid)) {
return;
}
get_hist_name(user_relid, get_rel_name(user_relid), new_hist_name, get_rel_namespace(user_relid), new_nsp_name);
RenameRelationInternal(hist_oid, new_hist_name);
}
* rename_histlist_by_newnsp -- rename a list of hist table while altering schema name
*
* usertable_oid_list: relation oid list of user tables
* new_nsp_name: the new schema name of user table
*/
void rename_histlist_by_newnsp(List *usertable_oid_list, const char *new_nsp_name)
{
ListCell *lc = NULL;
foreach (lc, usertable_oid_list) {
Oid relid = (Oid)lfirst_oid(lc);
rename_hist_by_newnsp(relid, new_nsp_name);
}
}
* user_hash_attrno -- get the attribute number of user table's hash column.
*
* rd_att: tuple description of user table
*/
int user_hash_attrno(const TupleDesc rd_att)
{
int hash_natt = -1;
Form_pg_attribute rel_attr = NULL;
for (int i = rd_att->natts - 1; i >= 0; i--) {
rel_attr = &rd_att->attrs[i];
if (strcmp(rel_attr->attname.data, "hash") == 0) {
hash_natt = i;
break;
}
}
return hash_natt;
}
* hash_combine_tuple_data -- generate hash of each attribute and return the combination string.
*
* data_string: combination of hash that calculate from each attribute
* tabledesc: tuple description of user table
* tuple: row data of user table
*/
static void hash_combine_tuple_data(char *buf, int buf_size, TupleDesc tabledesc, HeapTuple tuple)
{
int natts = tabledesc->natts;
int buflen = 0;
errno_t rc = EOK;
char hash_str[UINT64STRSIZE + 1] = {0};
Datum *values = (Datum *) palloc0(natts * sizeof(Datum));
bool *nulls = (bool *) palloc0(natts * sizeof(bool));
heap_deform_tuple(tuple, tabledesc, values, nulls);
for (int i = 0; i < natts - 1; ++i) {
if (nulls[i]) {
continue;
}
uint64 col_hash = compute_hash(tabledesc->attrs[i].atttypid, values[i], LOCATOR_TYPE_HASH);
rc = snprintf_s(hash_str, UINT64STRSIZE + 1, UINT64STRSIZE, "%lu", col_hash);
securec_check_ss(rc, "", "");
rc = snprintf_s(buf + buflen, buf_size - buflen, buf_size - buflen - 1, "%s", hash_str);
securec_check_ss(rc, "", "");
buflen += strlen(hash_str);
}
pfree_ext(values);
pfree_ext(nulls);
}
* get_user_tuple_hash -- get the hash value of usertable's tuple.
*
* tuple: row data of user table
* desc: tuple description of user table
*/
uint64 get_user_tuple_hash(HeapTuple tuple, TupleDesc desc)
{
Datum value;
bool isnull = false;
int hash_attno = user_hash_attrno(desc);
value = heap_getattr(tuple, hash_attno + 1, desc, &isnull);
Assert(!isnull);
return DatumGetUInt64(value);
}
* gen_user_tuple_hash -- generate hash of each user table's tuple.
*
* rel: user table
* tuple: row data of user table
*/
static uint64 gen_user_tuple_hash(Relation rel, HeapTuple tuple)
{
TupleDesc tabledesc = RelationGetDescr(rel);
int data_size = UINT64STRSIZE * tabledesc->natts + 1;
char *data_string = (char *)palloc0(data_size * sizeof(char));
hash_combine_tuple_data(data_string, data_size, tabledesc, tuple);
uint8 sum[16];
if (pg_md5_binary(data_string, strlen(data_string), sum) == false) {
pfree_ext(data_string);
ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory")));
}
uint64 result = 0;
for (int i = 0; i < 7; i++) {
result |= sum[4 + i];
result = (result << 8);
}
result |= sum[11];
pfree_ext(data_string);
return result;
}
* set_user_tuple_hash -- calculate and fill the hash attribute of user table's tuple.
*
* tup: row data of user table
* rel: user table
* hash_exists: whether tuple comes with tuplehash.
*
* Note: if hash_exists is true, we should recompute
* tuple hash and compare with tuplehash of itself.
*/
HeapTuple set_user_tuple_hash(HeapTuple tup, Relation rel, TupleTableSlot *slot, bool hash_exists)
{
uint64 row_hash = gen_user_tuple_hash(rel, tup);
int hash_attrno = user_hash_attrno(rel->rd_att);
if (hash_exists) {
bool is_null;
Datum hash = heap_getattr(tup, hash_attrno + 1, rel->rd_att, &is_null);
if (is_null || row_hash != DatumGetUInt64(hash)) {
ereport(ERROR, (errcode(ERRCODE_OPERATE_INVALID_PARAM), errmsg("Invalid tuple hash.")));
}
return tup;
}
Datum *values = NULL;
bool *nulls = NULL;
bool *replaces = NULL;
int2 nattrs = RelationGetNumberOfAttributes(rel);
values = (Datum*)palloc0(nattrs * sizeof(Datum));
nulls = (bool*)palloc0(nattrs * sizeof(bool));
replaces = (bool*)palloc0(nattrs * sizeof(bool));
values[hash_attrno] = UInt64GetDatum(row_hash);
replaces[hash_attrno] = true;
HeapTuple newtup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces);
if (slot) {
ExecStoreTuple((Tuple)newtup, slot, InvalidBuffer, false);
}
pfree_ext(values);
pfree_ext(nulls);
pfree_ext(replaces);
return newtup;
}
* get_hist_oid -- get the oid of history table by oid, name and namespace name of user table
*
* relid: relation oid of user table
* rel_name: relation name of user table
* rel_nsp: namespace name of user table
*/
Oid get_hist_oid(Oid relid, const char *rel_name, Oid rel_nsp)
{
if (rel_name == NULL) {
rel_name = get_rel_name(relid);
}
char hist_name[NAMEDATALEN];
get_hist_name(relid, rel_name, hist_name, rel_nsp);
Oid hist_oid = get_relname_relid(hist_name, PG_BLOCKCHAIN_NAMESPACE);
return hist_oid;
}
* get_user_tupleid_hash -- get the hash value of usertable's tupleid
*
* relation: relation of user table
* tupleid: tupleid of user tuple
*/
uint64 get_user_tupleid_hash(Relation relation, ItemPointer tupleid)
{
BlockNumber block;
Buffer buffer;
Buffer vmbuffer = InvalidBuffer;
Page page;
ItemId lp;
HeapTupleData tp;
TupleDesc tabledescr;
uint64 result;
tabledescr = RelationGetDescr(relation);
block = ItemPointerGetBlockNumber(tupleid);
buffer = ReadBuffer(relation, block);
page = BufferGetPage(buffer);
if (PageIsAllVisible(page)) {
visibilitymap_pin(relation, block, &vmbuffer);
}
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tupleid));
tp.t_tableOid = RelationGetRelid(relation);
tp.t_data = (HeapTupleHeader) PageGetItem(page, lp);
tp.t_len = ItemIdGetLength(lp);
tp.t_self = *tupleid;
result = get_user_tuple_hash(&tp, tabledescr);
UnlockReleaseBuffer(buffer);
if (vmbuffer != InvalidBuffer) {
ReleaseBuffer(vmbuffer);
}
return result;
}
* gen_hist_tuple_hash -- calculate pre_hash of hist table
*
* relid: relation oid of history table
* current_block_data: data of current row, includes hash_ins and hash_del
* pre_row_exist: when current row is the first row, it's true
* pre_row_hash: the pre_hash value of previous row
* hash: the result hash
*/
void gen_hist_tuple_hash(Oid relid, char *current_block_data, bool pre_row_exist,
hash32_t *pre_row_hash, hash32_t *hash)
{
errno_t rc;
int buf_size = strlen(current_block_data) + NAMEDATALEN + 1;
char *data_string = (char *)palloc0(buf_size * sizeof(char));
if (pre_row_exist) {
char *pre_hash_str = DatumGetCString(DirectFunctionCall1(hash32out, HASH32GetDatum(pre_row_hash)));
rc = snprintf_s(data_string, buf_size, buf_size - 1, "%s%s", current_block_data, pre_hash_str);
pfree_ext(pre_hash_str);
} else {
char *rel_name = get_rel_name(relid);
rc = snprintf_s(data_string, buf_size, buf_size - 1, "%s%s", current_block_data, rel_name);
}
securec_check_ss(rc, "", "");
if (!pg_md5_binary(data_string, strlen(data_string), hash->data)) {
pfree_ext(data_string);
ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory")));
}
pfree_ext(data_string);
}
* fill_hist_block -- get newest preblock in cache, and prepare new block for flushing.
*
* histoid: relation oid of history table
* hash_ins: hash_ins value of current row
* hash_del: hash_del value of current row
* block: row id and pre_hash from current row
*/
static void fill_hist_block(Oid histoid, uint64 hash_ins, uint64 hash_del, HistBlock *block)
{
char data[NAMEDATALEN] = {0};
block->rec_num = get_next_recnum(histoid);
error_t rc = sprintf_s(data, NAMEDATALEN, "%lu%lu%lu", block->rec_num, hash_ins, hash_del);
securec_check_ss(rc, "", "");
gen_hist_tuple_hash(histoid, data, false, NULL, &block->prev_hash);
}
* hist_table_record_internal -- append record to history table when user table is modified
*
* hist_oid: relation oid of history table
* hash_ins: hash of row that added by current operation
* hash_del: hash of row that deleted by current operation
*/
bool hist_table_record_internal(Oid hist_oid, const uint64 *hash_ins, const uint64 *hash_del)
{
Datum values[USERCHAIN_COLUMN_NUM] = {0};
bool nulls[USERCHAIN_COLUMN_NUM] = {false};
bool ins_null = hash_ins == NULL;
bool del_null = hash_del == NULL;
uint64 t_ins = ins_null ? 0 : *hash_ins;
uint64 t_del = del_null ? 0 : *hash_del;
HistBlock block;
if (!OidIsValid(hist_oid)) {
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_TABLE), errmsg("could not find history table")));
return false;
}
fill_hist_block(hist_oid, t_ins, t_del, &block);
values[USERCHAIN_COLUMN_REC_NUM] = UInt64GetDatum(block.rec_num);
values[USERCHAIN_COLUMN_HASH_INS] = UInt64GetDatum(t_ins);
values[USERCHAIN_COLUMN_HASH_DEL] = UInt64GetDatum(t_del);
values[USERCHAIN_COLUMN_PREVHASH] = HASH32GetDatum(&block.prev_hash);
nulls[USERCHAIN_COLUMN_REC_NUM] = false;
nulls[USERCHAIN_COLUMN_HASH_INS] = ins_null;
nulls[USERCHAIN_COLUMN_HASH_DEL] = del_null;
nulls[USERCHAIN_COLUMN_PREVHASH] = false;
Relation hist_rel = heap_open(hist_oid, RowExclusiveLock);
TupleDesc hist_desc = RelationGetDescr(hist_rel);
HeapTuple tuple = heap_form_tuple(hist_desc, values, nulls);
simple_heap_insert(hist_rel, tuple);
CatalogUpdateIndexes(hist_rel,tuple);
heap_freetuple(tuple);
heap_close(hist_rel, RowExclusiveLock);
return true;
}
* hist_table_record_insert -- append a record while inserting into user table
*
* rel: relation of user table
* tup: the tuple which is inserted into user table
* res_hash: delta of user table hash
*/
bool hist_table_record_insert(Relation rel, HeapTuple tup, uint64 *res_hash)
{
if (tup == NULL || rel == NULL) {
return false;
}
uint64 hash_ins = get_user_tuple_hash(tup, rel->rd_att);
Oid hist_oid = get_hist_oid(RelationGetRelid(rel), RelationGetRelationName(rel), RelationGetNamespace(rel));
*res_hash = hash_ins;
return hist_table_record_internal(hist_oid, &hash_ins, NULL);
}
* hist_table_record_delete -- append a record while deleting from user table
*
* rel: relation of user table
* hash_del: the hash of deleted tuple
* res_hash: delta of user table hash
*/
bool hist_table_record_delete(Relation rel, uint64 hash_del, uint64 *res_hash)
{
Oid hist_oid = get_hist_oid(RelationGetRelid(rel), RelationGetRelationName(rel), RelationGetNamespace(rel));
*res_hash = -hash_del;
return hist_table_record_internal(hist_oid, NULL, &hash_del);
}
* hist_table_record_update -- append a record while updating user table
*
* rel: relation of user table
* newtup: the tupleid which is updated from user table
* hash_del: the hash of deleted tuple
* res_hash: delta of user table hash
*/
bool hist_table_record_update(Relation rel, HeapTuple newtup, uint64 hash_del, uint64 *res_hash)
{
uint64 hash_ins = get_user_tuple_hash(newtup, rel->rd_att);
Oid hist_oid = get_hist_oid(RelationGetRelid(rel), RelationGetRelationName(rel), RelationGetNamespace(rel));
*res_hash = hash_ins - hash_del;
return hist_table_record_internal(hist_oid, &hash_ins, &hash_del);
}
* get_copyfrom_line_relhash -- extract hash from each copyfrom line
*
* row_data: string line from txt
* len: length of row_data
* hash_colno: the number of split char before hash text
* split: split char
* hash: the buffer of getting hash.
*/
bool get_copyfrom_line_relhash(const char *row_data, int len, int hash_colno, char split, uint64 *hash)
{
int pos;
if (hash_colno == -1) {
return false;
}
for (pos = 0; pos < len && hash_colno > 0; ++pos) {
if (row_data[pos] == split) {
--hash_colno;
}
}
if (hash_colno == 0) {
int remain_len = len - pos;
const char *hash_str = row_data + pos;
if (remain_len > 0) {
*hash = DatumGetUInt64(DirectFunctionCall1(hash16in, CStringGetDatum(hash_str)));
return true;
}
}
return false;
}