ddeda89a创建于 2023年6月25日历史提交
/*
 * Copyright (c) 2020 Huawei Technologies Co.,Ltd.
 *
 * openGauss is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * blockchain.cpp
 *     functions for recording user operation to gs_global_chain.
 *
 * IDENTIFICATION
 *    src/gausskernel/security/gs_ledger/blockchain.cpp
 *
 * -------------------------------------------------------------------------
 */
#include "gs_ledger/ledger_utils.h"
#include "executor/executor.h"
#include "catalog/gs_global_chain.h"
#include "catalog/pg_proc.h"
#include "commands/dbcommands.h"
#include "parser/parsetree.h"
#include "libpq/md5.h"
#include "gs_ledger/blockchain.h"
#include "utils/snapmgr.h"

/*
 * gen_global_hash -- generate globalhash of gchain
 *
 * hash_buffer: the buffer that ready to fill generated globalhash.
 * info_string: operate info string of current block.
 * exist: whether previous block exists.
 * prev_hash: the address of previous hash value.
 *
 * Note: globalhash is generated by operate info and previous globalhash using md5.
 */
bool gen_global_hash(hash32_t *hash_buffer, const char *info_string, bool exist, const hash32_t *prev_hash)
{
#ifdef ENABLE_LITE_MODE
    /*
     * If relation is in ledger schema, avoid procedure or function modifying it.
     */
    if (u_sess->SPI_cxt._connected > -1) {
        ereport(ERROR, (errcode(ERRCODE_INVALID_OPERATION),
            errmsg("ledger table cannot be modified by function, procedure or trigger.")));
        return false;
    }
#endif

    errno_t rc = EOK;
    int comb_strlen;
    char *comb_string = NULL;
    /*
     * Previous block not exists means current insertion block is genesis,
     * then we use global systable as origin combine string for globalhash
     * generation. If previous block exists, we will use previous global
     * hash as combine string to calculate globalhash.
     */
    if (!exist) {
        /* generate genesis block globalhash */
        comb_strlen = strlen(GCHAIN_NAME) + strlen(info_string) + 1;
        comb_string = (char *)palloc0(comb_strlen);
        rc = snprintf_s(comb_string, comb_strlen, comb_strlen - 1, "%s%s", GCHAIN_NAME, info_string);
        securec_check_ss(rc, "", "");
    } else {
        /* use previous globalhash and current block info to calculate globalhash. */
        char *pre_hash_str = DatumGetCString(DirectFunctionCall1(hash32out, HASH32GetDatum(prev_hash)));
        comb_strlen = strlen(pre_hash_str) + strlen(info_string) + 1;
        comb_string = (char *)palloc0(comb_strlen);
        rc = snprintf_s(comb_string, comb_strlen, comb_strlen - 1, "%s%s", info_string, pre_hash_str);
        securec_check_ss(rc, "", "");
        pfree_ext(pre_hash_str);
    }

    if (!pg_md5_binary(comb_string, comb_strlen - 1, hash_buffer->data)) {
        pfree(comb_string);
        ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("Failed to generate globalhash, out of memory")));
        return false;
    }
    pfree(comb_string);
    return true;
}

/*
 * set_gchain_comb_string -- combine block informations.
 *
 * db_name: the database name where executes cmd.
 * user_name: the user name who executes cmd
 * nsp_name: namespace name of usertable
 * rel_name: rel_name of usertable
 * cmd_text: the command query which modified user table.
 * rel_hash: rel_hash of current block.
 */
char *set_gchain_comb_string(const char *db_name, const char *user_name,
    const char *nsp_name, const char *rel_name, const char *cmd_text, uint64 rel_hash)
{
    if (cmd_text == NULL) {
        cmd_text = "";
    }
    int comb_len = strlen(db_name) + strlen(user_name) + strlen(nsp_name) +
        strlen(rel_name) + strlen(cmd_text) + PREVIOUS_HASH_LEN + 1;
    char *comb_str = (char *)palloc0(sizeof(char) * comb_len);
    errno_t rc = snprintf_s(comb_str, comb_len, comb_len - 1, "%s%s%s%s%s%lu",
        db_name, user_name, nsp_name, rel_name, cmd_text, rel_hash);
    securec_check_ss(rc, "\0", "\0");
    return comb_str;
}

/*
 * ledger_gchain_append -- record a block to gchain.
 *
 * relid: relation oid of usertable.
 * query_string: original query which modified usertable.
 * cn_hash: rel_hash in hist table generated by query_string.
 *
 * Note: after block inserted into gchain, its globalhash will flush
 * into gchain cache for next block. Thus, previous global hash is
 * come from cache directly.
 */
void ledger_gchain_append(Oid relid, const char *query_string, uint64 cn_hash)
{
    Datum current_time;
    Datum values[Natts_gs_global_chain] = {0};
    bool nulls[Natts_gs_global_chain] = {false};
    char *db_name = NULL;
    char *user_name = NULL;
    char *nsp_name = NULL;
    char *rel_name = NULL;
    char *combine_string = NULL;
    HeapTuple tup = NULL;
    Relation rel_gchain = NULL;
    GlobalPrevBlock current_block;

    /* get basic informations. */
    db_name = get_database_name(u_sess->proc_cxt.MyDatabaseId);
    user_name = GetUserNameFromId(GetCurrentUserId());
    current_time = TimestampTzGetDatum(GetCurrentTimestamp());
    nsp_name = get_namespace_name(get_rel_namespace(relid));
    rel_name = get_rel_name(relid);

    /* Make combine string of current record: rel_name + nsp_name + query_string + rel_hash */
    combine_string = set_gchain_comb_string(db_name, user_name, nsp_name, rel_name, query_string, cn_hash);

    /*
     * rel_hash: sum of hash in DN which generated by this query_string.
     * globalhash: hash for last record of gs_global_chain, it means blockchain prevhash.
     */
    current_block.blocknum = get_next_g_blocknum();
    gen_global_hash(&current_block.globalhash, combine_string, false, NULL);

    values[Anum_gs_global_chain_blocknum - 1] = UInt64GetDatum(current_block.blocknum);
    values[Anum_gs_global_chain_dbname - 1] = DirectFunctionCall1(namein, CStringGetDatum(db_name));
    values[Anum_gs_global_chain_username - 1] = DirectFunctionCall1(namein, CStringGetDatum(user_name));
    values[Anum_gs_global_chain_starttime - 1] = current_time;
    values[Anum_gs_global_chain_relid - 1] = ObjectIdGetDatum(relid);
    values[Anum_gs_global_chain_relnsp - 1] = DirectFunctionCall1(namein, CStringGetDatum(nsp_name));
    values[Anum_gs_global_chain_relname - 1] = DirectFunctionCall1(namein, CStringGetDatum(rel_name));
    values[Anum_gs_global_chain_relhash - 1] = UInt64GetDatum(cn_hash);
    values[Anum_gs_global_chain_globalhash - 1] = HASH32GetDatum(&current_block.globalhash);
    values[Anum_gs_global_chain_txcommand - 1] = CStringGetTextDatum(query_string);

    rel_gchain = heap_open(GsGlobalChainRelationId, RowExclusiveLock);
    tup = heap_form_tuple(rel_gchain->rd_att, values, nulls);

    simple_heap_insert(rel_gchain, tup);
    heap_freetuple(tup);

    /* set latest previous global chain block */
    heap_close(rel_gchain, RowExclusiveLock);
    pfree(combine_string);
}

/*
 * ledger_output_append_hash -- append relhash to response tag.
 *
 * resp_tag: response tag address.
 * operation: command operation.
 * hash: the hash that prepare to append.
 */
static void ledger_output_append_hash(char *resp_tag, CmdType operation, uint64 hash)
{
    Assert(resp_tag != NULL);
    size_t len = strlen(resp_tag);
    errno_t ret = EOK;

    switch (operation) {
        case CMD_INSERT:
        case CMD_UPDATE:
        case CMD_DELETE:
            ret = snprintf_s(resp_tag + len, COMPLETION_TAG_BUFSIZE - len, COMPLETION_TAG_BUFSIZE - len - 1,
                             " %lu\0", hash);
            securec_check_ss(ret, "\0", "\0");
            break;
        default:
            break;
    }
}

/*
 * ledger_ExecutorEnd -- record block to gchain.
 *
 * query_desc: query descript of executor.
 *
 * Note: append new block to gchain in CN or singlenode.
 * each DN will link its relhash to es_modifiedRowHash, CN or singlenode
 * use es_modifiedRowHash to receive all DN relhash and accumulate them
 * as cn_relhash for insertion.
 */
static void ledger_ExecutorEnd(QueryDesc *query_desc)
{
    uint64 hashsum;
    bool has_remote_hash = query_desc->estate->es_modifiedRowHash != NIL;
    hashsum = hash_combiner(query_desc->estate->es_modifiedRowHash);
    if ((IS_PGXC_COORDINATOR || g_instance.role == VSINGLENODE) && has_remote_hash) {
        Oid relid = InvalidOid;
        Relation rel = NULL;
        int relnum = query_desc->estate->es_num_result_relations;
        if (relnum > 0) {
            rel = query_desc->estate->es_result_relations->ri_RelationDesc;
            /* gs_global_chain only records following actions */
            switch (query_desc->operation) {
                case CMD_INSERT:
                case CMD_DELETE:
                case CMD_UPDATE:
                    relid = RelationGetRelid(rel);
                    if (rel->rd_isblockchain) {
                        ledger_gchain_append(relid, query_desc->sourceText, hashsum);
                    }
                    break;
                default:
                    break;
            }
        }
    }

    if (u_sess->ledger_cxt.resp_tag != NULL && has_remote_hash && !IsConnFromApp()) {
        ledger_output_append_hash(u_sess->ledger_cxt.resp_tag, query_desc->operation, hashsum);
        u_sess->ledger_cxt.resp_tag = NULL;
    }
    if (t_thrd.security_ledger_cxt.prev_ExecutorEnd) {
        ((ExecutorEnd_hook_type)t_thrd.security_ledger_cxt.prev_ExecutorEnd)(query_desc);
    } else {
        standard_ExecutorEnd(query_desc);
    }
}

/*
 * light_ledger_ExecutorEnd -- record block to gchain in light proxy.
 *
 * query: query of executor.
 * relhash: sum of all DN relhash for insertion.
 *
 * Note: in light proxy logical, process will extract and
 * accumulate relhash from response text of DNs as cn_relhash.
 */
void light_ledger_ExecutorEnd(Query *query, uint64 relhash)
{
    if (!IS_PGXC_COORDINATOR && g_instance.role != VSINGLENODE) {
        return;
    }
    Oid relid = InvalidOid;

    switch (query->commandType) {
        case CMD_INSERT:
        case CMD_DELETE:
        case CMD_UPDATE:
            relid = get_target_query_relid(query->rtable, linitial_int(query->resultRelations));
            if (is_ledger_usertable(relid)) {
                ledger_gchain_append(relid, query->sql_statement, relhash);
            }
            break;
        /* Not support others */
        default:
            break;
    }
}

/*
 * light_ledger_ExecutorEnd -- record block to gchain in opfusion.
 *
 * fusiontype: operator type.
 * relid: relation oid of usertable.
 * query: original query which modified usertable.
 * relhash: relhash in hist table generated by sourceText.
 */
void opfusion_ledger_ExecutorEnd(FusionType fusiontype, Oid relid, const char *query, uint64 relhash)
{
    if (g_instance.role == VDATANODE || !is_ledger_usertable(relid)) {
        return;
    }

    switch (fusiontype) {
        case INSERT_FUSION:
        case UPDATE_FUSION:
        case DELETE_FUSION:
        case INSERT_SUB_FUSION:
        case DELETE_SUB_FUSION:
            if (is_ledger_usertable(relid)) {
                ledger_gchain_append(relid, query, relhash);
            }
            break;
        /* Not support others */
        default:
            break;
    }
}

/*
 * ledger_hook_init -- install of gchain block record hook.
 */
void ledger_hook_init(void)
{
    t_thrd.security_ledger_cxt.prev_ExecutorEnd = (void *)ExecutorEnd_hook;
    ExecutorEnd_hook = ledger_ExecutorEnd;
}

/*
 * ledger_hook_fini -- uninstall of gchain block record hook.
 */
void ledger_hook_fini(void)
{
    ExecutorEnd_hook = (ExecutorEnd_hook_type)t_thrd.security_ledger_cxt.prev_ExecutorEnd;
}