* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* dtc_dmon.c
*
*
* IDENTIFICATION
* src/cluster/dtc_dmon.c
*
* -------------------------------------------------------------------------
*/
#include "knl_cluster_module.h"
#include "dtc_dmon.h"
#include "dtc_database.h"
#include "dtc_tran.h"
#define SCN_BROADCAST_CLOCK 5
#define TIME_BROADCAST_CLOCK (10 * SECONDS_PER_MIN * MILLISECS_PER_SECOND)
static g_cluster_time_interval_t g_cluster_time_interval_instance = { 0 };
g_cluster_time_interval_t *g_cluster_time_interval_pitr = &g_cluster_time_interval_instance;
static void dmon_scn_broadcast(knl_session_t *session)
{
mes_scn_bcast_t bcast;
uint64 success_inst;
mes_init_send_head(&bcast.head, MES_CMD_SCN_BROADCAST, sizeof(mes_scn_bcast_t), OG_INVALID_ID32,
g_dtc->profile.inst_id, OG_INVALID_ID8, session->id, OG_INVALID_ID16);
bcast.scn = KNL_GET_SCN(&g_dtc->kernel->scn);
bcast.min_scn = KNL_GET_SCN(&g_dtc->kernel->local_min_scn);
bcast.lsn = cm_atomic_get(&g_dtc->kernel->lsn);
(void)cm_gettimeofday(&(bcast.cur_time));
mes_broadcast(session->id, MES_BROADCAST_ALL_INST, &bcast, &success_inst);
}
static void dmon_time_broadcast(knl_session_t *session)
{
mes_time_bcast_t bcast;
uint64 success_inst;
mes_init_send_head(&bcast.head, MES_CMD_TIME_BROADCAST, sizeof(mes_time_bcast_t), OG_INVALID_ID32,
g_dtc->profile.inst_id, OG_INVALID_ID8, session->id, OG_INVALID_ID16);
(void)cm_gettimeofday(&(bcast.cur_time));
mes_broadcast(session->id, MES_BROADCAST_ALL_INST, &bcast, &success_inst);
}
static void dmon_proc(thread_t *thread)
{
knl_session_t *session = (knl_session_t *)thread->argument;
dmon_context_t *ogx = &g_dtc->dmon_ctx;
uint32 ticks = 0;
ogx->session = session;
cm_set_thread_name("dmon");
OG_LOG_RUN_INF("dmon thread started");
KNL_SESSION_SET_CURR_THREADID(session, cm_get_current_thread_id());
while (!thread->closed) {
if (ticks % SCN_BROADCAST_CLOCK == 0) {
dmon_scn_broadcast(session);
}
if (ticks % TIME_BROADCAST_CLOCK == 0) {
dmon_time_broadcast(session);
}
cm_sleep(1);
ticks++;
}
}
status_t dmon_startup(void)
{
knl_session_t *session = NULL;
if (g_knl_callback.alloc_knl_session(OG_TRUE, (knl_handle_t *)&session) != OG_SUCCESS) {
return OG_ERROR;
}
if (cm_create_thread(dmon_proc, 0, session, &g_dtc->dmon_ctx.thread) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
void dmon_close(void)
{
dmon_context_t *ogx = &g_dtc->dmon_ctx;
cm_close_thread(&ogx->thread);
if (ogx->session != NULL) {
g_knl_callback.release_knl_session(ogx->session);
ogx->session = NULL;
}
}
void dtc_process_scn_req(void *sess, mes_message_t *msg)
{
mes_scn_bcast_t bcast;
knl_session_t *session = (knl_session_t *)sess;
mes_init_send_head(&bcast.head, MES_CMD_SCN_BROADCAST, sizeof(mes_scn_bcast_t), msg->head->rsn, msg->head->dst_inst,
msg->head->src_inst, session->id, msg->head->src_sid);
bcast.scn = DB_CURR_SCN(session);
mes_release_message_buf(msg->buffer);
mes_send_data((void *)&bcast);
}
static void dtc_keep_time_interval(uint64 time_interval_us)
{
date_t date_now = cm_now();
if (!cm_spin_try_lock(&g_cluster_time_interval_pitr->lock)) {
return;
}
uint16 number = g_cluster_time_interval_pitr->number;
if (number >= CLUSTER_TIME_INTERVAL_ARRAY_SIZE) {
cm_spin_unlock(&g_cluster_time_interval_pitr->lock);
return;
}
g_cluster_time_interval_pitr->date_record[number] = date_now;
g_cluster_time_interval_pitr->interval_record[number] = time_interval_us;
g_cluster_time_interval_pitr->number++;
cm_spin_unlock(&g_cluster_time_interval_pitr->lock);
}
void dtc_check_time_interval(timeval_t db_time)
{
timeval_t p_now;
(void)cm_gettimeofday(&p_now);
int64 time_interval_us = (int64)(1000000 * (p_now.tv_sec - db_time.tv_sec) + p_now.tv_usec - db_time.tv_usec);
dtc_keep_time_interval(abs(time_interval_us));
}
void dtc_process_scn_broadcast(void *sess, mes_message_t *msg)
{
if (sizeof(mes_scn_bcast_t) != msg->head->size) {
OG_LOG_RUN_ERR("scn broadcast is invalid, msg size %u.", msg->head->size);
mes_release_message_buf(msg->buffer);
return;
}
mes_scn_bcast_t *bcast = (mes_scn_bcast_t *)msg->buffer;
knl_scn_t lamport_scn = bcast->scn;
int64 lamport_lsn = bcast->lsn;
knl_session_t *session = (knl_session_t *)sess;
if (msg->head->src_inst >= OG_MAX_INSTANCES) {
mes_release_message_buf(msg->buffer);
OG_LOG_RUN_ERR("Do not process scn broadcast, because src_inst is invalid: %u", msg->head->src_inst);
return;
}
KNL_SET_SCN(&g_dtc->profile.min_scn[msg->head->src_inst], bcast->min_scn);
mes_release_message_buf(msg->buffer);
dtc_update_scn(session, lamport_scn);
dtc_update_lsn(session, lamport_lsn);
}
void dtc_process_lsn_broadcast(void *sess, mes_message_t *msg)
{
if (sizeof(mes_lsn_bcast_t) != msg->head->size) {
OG_LOG_RUN_ERR("msg is invalid, msg size %u.", msg->head->size);
mes_release_message_buf(msg->buffer);
return;
}
mes_lsn_bcast_t *bcast = (mes_lsn_bcast_t *)msg->buffer;
int64 lamport_lsn = bcast->lsn;
knl_session_t *session = (knl_session_t *)sess;
mes_release_message_buf(msg->buffer);
dtc_update_lsn(session, lamport_lsn);
}
void dtc_process_time_broadcast(void *sess, mes_message_t *msg)
{
if (sizeof(mes_time_bcast_t) != msg->head->size) {
OG_LOG_RUN_ERR("time broadcast is invalid, msg size %u.", msg->head->size);
mes_release_message_buf(msg->buffer);
return;
}
mes_time_bcast_t *bcast = (mes_time_bcast_t *)msg->buffer;
timeval_t db_time = bcast->cur_time;
if (msg->head->src_inst >= OG_MAX_INSTANCES) {
mes_release_message_buf(msg->buffer);
OG_LOG_RUN_ERR("Do not process time broadcast, because src_inst is invalid: %u", msg->head->src_inst);
return;
}
mes_release_message_buf(msg->buffer);
dtc_check_time_interval(db_time);
}
* get the cluster min_scn as current instance min_scn
*/
knl_scn_t dtc_get_min_scn(knl_scn_t cur_min_scn)
{
dtc_profile_t *profile = &g_dtc->profile;
cluster_view_t view;
rc_get_cluster_view(&view, OG_FALSE);
knl_scn_t min_scn = cur_min_scn;
for (uint32 i = 0; i < profile->node_count; i++) {
if (i == profile->inst_id) {
continue;
}
if (!rc_bitmap64_exist(&view.bitmap, i)) {
continue;
}
if (profile->min_scn[i] != 0 && profile->min_scn[i] < min_scn) {
min_scn = profile->min_scn[i];
}
}
return min_scn;
}