* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* blockchain.cpp
* functions for recording user operation to gs_global_chain.
*
* IDENTIFICATION
* src/gausskernel/security/gs_ledger/blockchain.cpp
*
* -------------------------------------------------------------------------
*/
#include "gs_ledger/ledger_utils.h"
#include "executor/executor.h"
#include "catalog/gs_global_chain.h"
#include "catalog/pg_proc.h"
#include "commands/dbcommands.h"
#include "parser/parsetree.h"
#include "libpq/md5.h"
#include "gs_ledger/blockchain.h"
#include "utils/snapmgr.h"
* gen_global_hash -- generate globalhash of gchain
*
* hash_buffer: the buffer that ready to fill generated globalhash.
* info_string: operate info string of current block.
* exist: whether previous block exists.
* prev_hash: the address of previous hash value.
*
* Note: globalhash is generated by operate info and previous globalhash using md5.
*/
bool gen_global_hash(hash32_t *hash_buffer, const char *info_string, bool exist, const hash32_t *prev_hash)
{
#ifdef ENABLE_LITE_MODE
* If relation is in ledger schema, avoid procedure or function modifying it.
*/
if (u_sess->SPI_cxt._connected > -1) {
ereport(ERROR, (errcode(ERRCODE_INVALID_OPERATION),
errmsg("ledger table cannot be modified by function, procedure or trigger.")));
return false;
}
#endif
errno_t rc = EOK;
int comb_strlen;
char *comb_string = NULL;
* Previous block not exists means current insertion block is genesis,
* then we use global systable as origin combine string for globalhash
* generation. If previous block exists, we will use previous global
* hash as combine string to calculate globalhash.
*/
if (!exist) {
comb_strlen = strlen(GCHAIN_NAME) + strlen(info_string) + 1;
comb_string = (char *)palloc0(comb_strlen);
rc = snprintf_s(comb_string, comb_strlen, comb_strlen - 1, "%s%s", GCHAIN_NAME, info_string);
securec_check_ss(rc, "", "");
} else {
char *pre_hash_str = DatumGetCString(DirectFunctionCall1(hash32out, HASH32GetDatum(prev_hash)));
comb_strlen = strlen(pre_hash_str) + strlen(info_string) + 1;
comb_string = (char *)palloc0(comb_strlen);
rc = snprintf_s(comb_string, comb_strlen, comb_strlen - 1, "%s%s", info_string, pre_hash_str);
securec_check_ss(rc, "", "");
pfree_ext(pre_hash_str);
}
if (!pg_md5_binary(comb_string, comb_strlen - 1, hash_buffer->data)) {
pfree(comb_string);
ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("Failed to generate globalhash, out of memory")));
return false;
}
pfree(comb_string);
return true;
}
* set_gchain_comb_string -- combine block informations.
*
* db_name: the database name where executes cmd.
* user_name: the user name who executes cmd
* nsp_name: namespace name of usertable
* rel_name: rel_name of usertable
* cmd_text: the command query which modified user table.
* rel_hash: rel_hash of current block.
*/
char *set_gchain_comb_string(const char *db_name, const char *user_name,
const char *nsp_name, const char *rel_name, const char *cmd_text, uint64 rel_hash)
{
if (cmd_text == NULL) {
cmd_text = "";
}
int comb_len = strlen(db_name) + strlen(user_name) + strlen(nsp_name) +
strlen(rel_name) + strlen(cmd_text) + PREVIOUS_HASH_LEN + 1;
char *comb_str = (char *)palloc0(sizeof(char) * comb_len);
errno_t rc = snprintf_s(comb_str, comb_len, comb_len - 1, "%s%s%s%s%s%lu",
db_name, user_name, nsp_name, rel_name, cmd_text, rel_hash);
securec_check_ss(rc, "\0", "\0");
return comb_str;
}
* ledger_gchain_append -- record a block to gchain.
*
* relid: relation oid of usertable.
* query_string: original query which modified usertable.
* cn_hash: rel_hash in hist table generated by query_string.
*
* Note: after block inserted into gchain, its globalhash will flush
* into gchain cache for next block. Thus, previous global hash is
* come from cache directly.
*/
void ledger_gchain_append(Oid relid, const char *query_string, uint64 cn_hash)
{
Datum current_time;
Datum values[Natts_gs_global_chain] = {0};
bool nulls[Natts_gs_global_chain] = {false};
char *db_name = NULL;
char *user_name = NULL;
char *nsp_name = NULL;
char *rel_name = NULL;
char *combine_string = NULL;
HeapTuple tup = NULL;
Relation rel_gchain = NULL;
GlobalPrevBlock current_block;
db_name = get_database_name(u_sess->proc_cxt.MyDatabaseId);
user_name = GetUserNameFromId(GetCurrentUserId());
current_time = TimestampTzGetDatum(GetCurrentTimestamp());
nsp_name = get_namespace_name(get_rel_namespace(relid));
rel_name = get_rel_name(relid);
combine_string = set_gchain_comb_string(db_name, user_name, nsp_name, rel_name, query_string, cn_hash);
* rel_hash: sum of hash in DN which generated by this query_string.
* globalhash: hash for last record of gs_global_chain, it means blockchain prevhash.
*/
current_block.blocknum = get_next_g_blocknum();
gen_global_hash(¤t_block.globalhash, combine_string, false, NULL);
values[Anum_gs_global_chain_blocknum - 1] = UInt64GetDatum(current_block.blocknum);
values[Anum_gs_global_chain_dbname - 1] = DirectFunctionCall1(namein, CStringGetDatum(db_name));
values[Anum_gs_global_chain_username - 1] = DirectFunctionCall1(namein, CStringGetDatum(user_name));
values[Anum_gs_global_chain_starttime - 1] = current_time;
values[Anum_gs_global_chain_relid - 1] = ObjectIdGetDatum(relid);
values[Anum_gs_global_chain_relnsp - 1] = DirectFunctionCall1(namein, CStringGetDatum(nsp_name));
values[Anum_gs_global_chain_relname - 1] = DirectFunctionCall1(namein, CStringGetDatum(rel_name));
values[Anum_gs_global_chain_relhash - 1] = UInt64GetDatum(cn_hash);
values[Anum_gs_global_chain_globalhash - 1] = HASH32GetDatum(¤t_block.globalhash);
values[Anum_gs_global_chain_txcommand - 1] = CStringGetTextDatum(query_string);
rel_gchain = heap_open(GsGlobalChainRelationId, RowExclusiveLock);
tup = heap_form_tuple(rel_gchain->rd_att, values, nulls);
simple_heap_insert(rel_gchain, tup);
heap_freetuple(tup);
heap_close(rel_gchain, RowExclusiveLock);
pfree(combine_string);
}
* ledger_output_append_hash -- append relhash to response tag.
*
* resp_tag: response tag address.
* operation: command operation.
* hash: the hash that prepare to append.
*/
static void ledger_output_append_hash(char *resp_tag, CmdType operation, uint64 hash)
{
Assert(resp_tag != NULL);
size_t len = strlen(resp_tag);
errno_t ret = EOK;
switch (operation) {
case CMD_INSERT:
case CMD_UPDATE:
case CMD_DELETE:
ret = snprintf_s(resp_tag + len, COMPLETION_TAG_BUFSIZE - len, COMPLETION_TAG_BUFSIZE - len - 1,
" %lu\0", hash);
securec_check_ss(ret, "\0", "\0");
break;
default:
break;
}
}
* ledger_ExecutorEnd -- record block to gchain.
*
* query_desc: query descript of executor.
*
* Note: append new block to gchain in CN or singlenode.
* each DN will link its relhash to es_modifiedRowHash, CN or singlenode
* use es_modifiedRowHash to receive all DN relhash and accumulate them
* as cn_relhash for insertion.
*/
static void ledger_ExecutorEnd(QueryDesc *query_desc)
{
uint64 hashsum;
bool has_remote_hash = query_desc->estate->es_modifiedRowHash != NIL;
hashsum = hash_combiner(query_desc->estate->es_modifiedRowHash);
if ((IS_PGXC_COORDINATOR || g_instance.role == VSINGLENODE) && has_remote_hash) {
Oid relid = InvalidOid;
Relation rel = NULL;
int relnum = query_desc->estate->es_num_result_relations;
if (relnum > 0) {
rel = query_desc->estate->es_result_relations->ri_RelationDesc;
switch (query_desc->operation) {
case CMD_INSERT:
case CMD_DELETE:
case CMD_UPDATE:
relid = RelationGetRelid(rel);
if (rel->rd_isblockchain) {
ledger_gchain_append(relid, query_desc->sourceText, hashsum);
}
break;
default:
break;
}
}
}
if (u_sess->ledger_cxt.resp_tag != NULL && has_remote_hash && !IsConnFromApp()) {
ledger_output_append_hash(u_sess->ledger_cxt.resp_tag, query_desc->operation, hashsum);
u_sess->ledger_cxt.resp_tag = NULL;
}
if (t_thrd.security_ledger_cxt.prev_ExecutorEnd) {
((ExecutorEnd_hook_type)t_thrd.security_ledger_cxt.prev_ExecutorEnd)(query_desc);
} else {
standard_ExecutorEnd(query_desc);
}
}
* light_ledger_ExecutorEnd -- record block to gchain in light proxy.
*
* query: query of executor.
* relhash: sum of all DN relhash for insertion.
*
* Note: in light proxy logical, process will extract and
* accumulate relhash from response text of DNs as cn_relhash.
*/
void light_ledger_ExecutorEnd(Query *query, uint64 relhash)
{
if (!IS_PGXC_COORDINATOR && g_instance.role != VSINGLENODE) {
return;
}
Oid relid = InvalidOid;
switch (query->commandType) {
case CMD_INSERT:
case CMD_DELETE:
case CMD_UPDATE:
relid = get_target_query_relid(query->rtable, linitial_int(query->resultRelations));
if (is_ledger_usertable(relid)) {
ledger_gchain_append(relid, query->sql_statement, relhash);
}
break;
default:
break;
}
}
* light_ledger_ExecutorEnd -- record block to gchain in opfusion.
*
* fusiontype: operator type.
* relid: relation oid of usertable.
* query: original query which modified usertable.
* relhash: relhash in hist table generated by sourceText.
*/
void opfusion_ledger_ExecutorEnd(FusionType fusiontype, Oid relid, const char *query, uint64 relhash)
{
if (g_instance.role == VDATANODE || !is_ledger_usertable(relid)) {
return;
}
switch (fusiontype) {
case INSERT_FUSION:
case UPDATE_FUSION:
case DELETE_FUSION:
case INSERT_SUB_FUSION:
case DELETE_SUB_FUSION:
if (is_ledger_usertable(relid)) {
ledger_gchain_append(relid, query, relhash);
}
break;
default:
break;
}
}
* ledger_hook_init -- install of gchain block record hook.
*/
void ledger_hook_init(void)
{
t_thrd.security_ledger_cxt.prev_ExecutorEnd = (void *)ExecutorEnd_hook;
ExecutorEnd_hook = ledger_ExecutorEnd;
}
* ledger_hook_fini -- uninstall of gchain block record hook.
*/
void ledger_hook_fini(void)
{
ExecutorEnd_hook = (ExecutorEnd_hook_type)t_thrd.security_ledger_cxt.prev_ExecutorEnd;
}