/*
 * Copyright (c) 2022 Huawei Technologies Co.,Ltd.
 *
 * openGauss is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * ---------------------------------------------------------------------------------------
 *
 * ss_xmin.cpp
 *  ss xmin related
 *
 *
 * IDENTIFICATION
 *        src/gausskernel/ddes/adapter/ss_xmin.cpp
 *
 * ---------------------------------------------------------------------------------------
 */

#include "ddes/dms/ss_xmin.h"
#include "storage/procarray.h"
#include "storage/ipc.h"
#include "ddes/dms/ss_dms_bufmgr.h"
#include "ddes/dms/ss_dms.h"
#include "ddes/dms/ss_common_attr.h"
#include "knl/knl_instance.h"

extern void CalculateLocalLatestSnapshot(bool forceCalc);

uint32 SSSnapshotXminKeyHashCode(const ss_snap_xmin_key_t *key)
{
    return get_hash_value(g_instance.dms_cxt.SSXminInfo.snap_cache, key);
}

uint64 GetOldestXminInNodeTable()
{
    ss_xmin_info_t *xmin_info = &g_instance.dms_cxt.SSXminInfo;
    uint64 min_xmin = MaxTransactionId;
    for (int i = 0; i < DMS_MAX_INSTANCES; i++) {
        if ((uint64)(1 << i) > g_instance.dms_cxt.SSXminInfo.bitmap_active_nodes) {
            break;
        }

        if (!xmin_info->node_table[i].active) {
            continue;
        }

        ss_node_xmin_item_t *item = &xmin_info->node_table[i];
        SpinLockAcquire(&item->item_lock);
        if (TransactionIdPrecedes(item->notify_oldest_xmin, min_xmin)) {
            min_xmin = item->notify_oldest_xmin;
        }
        SpinLockRelease(&item->item_lock);
    }
    return min_xmin;
}

void MaintXminInPrimary(void)
{
    if (SS_IN_REFORM) {
        pg_usleep(SS_REFORM_WAIT_TIME);
        return;
    }
    ss_xmin_info_t *xmin_info = &g_instance.dms_cxt.SSXminInfo;
    struct HTAB* snap_cache = g_instance.dms_cxt.SSXminInfo.snap_cache;
    HASH_SEQ_STATUS hash_seq;
    hash_seq_init(&hash_seq, snap_cache);
    ss_snap_xmin_item_t *xmin_item;
    uint64 snap_xmin = MaxTransactionId;
    TimestampTz cur_time = GetCurrentTimestamp();

    for (int i = 0; i < NUM_SS_SNAPSHOT_XMIN_CACHE_PARTITIONS; i++) {
        LWLock* partition_lock = SSSnapshotXminHashPartitionLockByIndex(i);
        LWLockAcquire(partition_lock, LW_EXCLUSIVE);
    }

    while ((xmin_item = (ss_snap_xmin_item_t*)hash_seq_search(&hash_seq)) != NULL) {
        if (TimestampDifferenceExceeds(xmin_item->timestamp, cur_time, DMS_MSG_MAX_WAIT_TIME)) {
            ss_snap_xmin_key_t key{.xmin = xmin_item->xmin};
            if (hash_search(snap_cache, &key, HASH_REMOVE, NULL) == NULL) {
                ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED),
                    errmsg("snapshot xmin cache hash table corrupted")));
            }
            continue;
        }

        if (TransactionIdPrecedes(xmin_item->xmin, snap_xmin)) {
            snap_xmin = xmin_item->xmin;
        }
    }

    for (int i = NUM_SS_SNAPSHOT_XMIN_CACHE_PARTITIONS - 1; i >= 0; i--) {
        LWLock* partition_lock = SSSnapshotXminHashPartitionLockByIndex(i);
        LWLockRelease(partition_lock);
    }
    xmin_info->snap_oldest_xmin = snap_xmin;
    uint64 new_global_xmin = GetOldestXminInNodeTable();
    if (TransactionIdPrecedes(snap_xmin, new_global_xmin)) {
        new_global_xmin = snap_xmin;
    }

    if (new_global_xmin == MaxTransactionId) {
        return;
    }

    SpinLockAcquire(&xmin_info->global_oldest_xmin_lock);
    if (xmin_info->global_oldest_xmin_active) {
        xmin_info->global_oldest_xmin = new_global_xmin;
    }
    SpinLockRelease(&xmin_info->global_oldest_xmin_lock);
}

void MaintXminInStandby(void)
{
    if (SS_IN_REFORM) {
        pg_usleep(SS_REFORM_WAIT_TIME);
        return;
    }
    uint64 oldest_xmin = MaxTransactionId;
    GetOldestGlobalProcXmin(&oldest_xmin);
    dms_context_t dms_cxt = {0};
    InitDmsContext(&dms_cxt);
    dms_send_opengauss_oldest_xmin(&dms_cxt, oldest_xmin, SS_PRIMARY_ID);
}

bool RecordSnapshotBeforeSend(uint8 inst_id, uint64 xmin)
{
    ss_xmin_info_t *xmin_info = &g_instance.dms_cxt.SSXminInfo;
    struct HTAB* snap_cache = xmin_info->snap_cache;
    if (snap_cache == NULL) {
        ereport(WARNING, (errmodule(MOD_DMS), errmsg("snap_cache is not ready, please retry")));
        return false;
    }

    ss_snap_xmin_key_t key = {.xmin = xmin};
    uint32 key_hash = SSSnapshotXminKeyHashCode(&key);
    LWLock *partition_lock = SSSnapshotXminHashPartitionLock(key_hash);
    LWLockAcquire(partition_lock, LW_EXCLUSIVE);
    ss_snap_xmin_item_t *xmin_item = (ss_snap_xmin_item_t*)hash_search(snap_cache, &key, HASH_ENTER_NULL, NULL);
    if (xmin_item == NULL) {
        LWLockRelease(partition_lock);
        ereport(WARNING, (errmodule(MOD_DMS), errmsg("insert snapshot into snap_cache table failed, "
            "capacity is not enough")));
        return false;
    }
    xmin_item->xmin = xmin;
    TimestampTz send_time = GetCurrentTimestamp();
    xmin_item->timestamp = send_time;
    LWLockRelease(partition_lock);
    return true;
}

uint64 SSGetGlobalOldestXmin(uint64 globalxmin)
{
    ss_xmin_info_t *xmin_info = &g_instance.dms_cxt.SSXminInfo;
    uint64 ret_globalxmin = globalxmin;
    SpinLockAcquire(&xmin_info->global_oldest_xmin_lock);
    if (!xmin_info->global_oldest_xmin_active) {
        if (TransactionIdPrecedes(xmin_info->prev_global_oldest_xmin, globalxmin)) {
            ret_globalxmin = xmin_info->prev_global_oldest_xmin;
        }
        SpinLockRelease(&xmin_info->global_oldest_xmin_lock);
        return ret_globalxmin;
    }

    if (TransactionIdPrecedes(xmin_info->global_oldest_xmin, globalxmin)) {
        ret_globalxmin = xmin_info->global_oldest_xmin;
    }
    SpinLockRelease(&xmin_info->global_oldest_xmin_lock);
    return ret_globalxmin;
}

void SSUpdateNodeOldestXmin(uint8 inst_id, unsigned long long oldest_xmin)
{
    if (!TransactionIdIsNormal(oldest_xmin)) {
        return;
    }
    
    ss_xmin_info_t *xmin_info = &g_instance.dms_cxt.SSXminInfo;
    ss_reform_info_t *reform_info = &g_instance.dms_cxt.SSReformInfo;

    SpinLockAcquire(&xmin_info->node_table[inst_id].item_lock);
    xmin_info->node_table[inst_id].notify_oldest_xmin = oldest_xmin;
    SpinLockRelease(&xmin_info->node_table[inst_id].item_lock);
    
    if (SS_IN_REFORM) {
        if ((reform_info->bitmap_nodes |= (1 << inst_id)) == 0) {
            return;
        }

        SpinLockAcquire(&xmin_info->bitmap_active_nodes_lock);
        xmin_info->bitmap_active_nodes |= 1 << inst_id;
        xmin_info->node_table[inst_id].active = true;
        SpinLockRelease(&xmin_info->bitmap_active_nodes_lock);

        SpinLockAcquire(&xmin_info->global_oldest_xmin_lock);
        if (TransactionIdPrecedes(oldest_xmin, xmin_info->global_oldest_xmin)) {
            xmin_info->global_oldest_xmin = oldest_xmin;
        }
        SpinLockRelease(&xmin_info->global_oldest_xmin_lock);

        SpinLockAcquire(&xmin_info->bitmap_active_nodes_lock);
        if (xmin_info->bitmap_active_nodes == reform_info->bitmap_nodes) {
            SpinLockAcquire(&xmin_info->global_oldest_xmin_lock);
            xmin_info->global_oldest_xmin_active = true;
            SpinLockRelease(&xmin_info->global_oldest_xmin_lock);
        }
        SpinLockRelease(&xmin_info->bitmap_active_nodes_lock);
    }
    return;
}

void SSSyncOldestXminWhenReform(uint8 reformer_id)
{
    if (ENABLE_SS_BCAST_GETOLDESTXMIN) {
        return;
    }

    ss_xmin_info_t *xmin_info = &g_instance.dms_cxt.SSXminInfo;
    ss_reform_info_t *reform_info = &g_instance.dms_cxt.SSReformInfo;

    if (reform_info->dms_role == DMS_ROLE_REFORMER) {
        while (!xmin_info->global_oldest_xmin_active) {
            if (dms_reform_failed()) {
                break;
            }
            uint64 tmp_nodes = xmin_info->bitmap_active_nodes | (1 << SS_MY_INST_ID);
            if (tmp_nodes == reform_info->bitmap_nodes) {
                break;
            }
            pg_usleep(1000L);
        }
        LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
        CalculateLocalLatestSnapshot(true);
        LWLockRelease(ProcArrayLock);
        SpinLockAcquire(&xmin_info->snapshot_available_lock);
        xmin_info->snapshot_available = true;
        SpinLockRelease(&xmin_info->snapshot_available_lock);
    } else {
        int ret = DMS_SUCCESS;
        do {
            if (dms_reform_failed()) {
                break;
            }
            uint64 oldest_xmin = MaxTransactionId;
            GetOldestGlobalProcXmin(&oldest_xmin);
            dms_context_t dms_cxt = {0};
            InitDmsContext(&dms_cxt);
            ret = dms_send_opengauss_oldest_xmin(&dms_cxt, oldest_xmin, reformer_id);
        } while (ret != DMS_SUCCESS);
    }
}