* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* srv_instance.c
*
*
* IDENTIFICATION
* src/server/srv_instance.c
*
* -------------------------------------------------------------------------
*/
#include "srv_module.h"
#include "cm_file.h"
#include "srv_instance.h"
#include "load_others.h"
#include "load_kernel.h"
#include "srv_view.h"
#include "cm_signal.h"
#include "cm_license.h"
#include "pl_ext_proc.h"
#ifndef WIN32
#include "srv_blackbox.h"
#endif
#include "cm_regexp.h"
#include "ogsql_serial.h"
#include "cm_utils.h"
#include "ogsql_self_func.h"
#include "ogsql_type_map.h"
#include "ogsql_service.h"
#include "ogsql_expr.h"
#include "ogsql_serial.h"
#include "ogsql_cond.h"
#include "srv_emerg.h"
#include "ogsql_dependency.h"
#include "ogsql_update.h"
#include "ddl_executor.h"
#include <locale.h>
#include "cm_thread_pool.h"
#include "cm_ip.h"
#include "ogsql_mtrl.h"
#include "pl_ddl_executor.h"
#include "pl_synonym.h"
#include "pl_logic.h"
#include "srv_stat.h"
#include "dtc_dcs.h"
#include "dtc_context.h"
#include "dtc_dls.h"
#include "dtc_database.h"
#ifdef __cplusplus
extern "C" {
#endif
instance_t *g_instance = NULL;
char *g_database_home = NULL;
static const char *g_lock_file = "ogracd.lck";
static status_t srv_wait_agents_done(void);
static void srv_close_threads(bool32 knl_flag);
static void srv_deinit_resource(void);
static status_t init_job_manager(void);
os_run_desc_t g_os_stat_desc_array[TOTAL_OS_RUN_INFO_TYPES] = {
{ "NUM_CPUS", "Number of CPUs or processors available", OG_FALSE, OG_FALSE },
{ "NUM_CPU_CORES", "Number of CPU cores available (includes subcores of multicore CPUs as well as single-core CPUs)", OG_FALSE, OG_FALSE },
{ "NUM_CPU_SOCKETS", "Number of CPU sockets available (represents an absolute count of CPU chips on the system, regardless of ""multithreading or multi-core architectures)", OG_FALSE, OG_FALSE },
{ "IDLE_TIME", "Number of hundredths of a second that a processor has been idle, totalled over all processors", OG_TRUE, OG_FALSE },
{ "BUSY_TIME", "Number of hundredths of a second that a processor has been busy executing user or kernel code, totalled over ""all processors", OG_TRUE, OG_FALSE },
{ "USER_TIME", "Number of hundredths of a second that a processor has been busy executing user code, totalled over all ""processors", OG_TRUE, OG_FALSE },
{ "SYS_TIME", "Number of hundredths of a second that a processor has been busy executing kernel code, totalled over all ""processors", OG_TRUE, OG_FALSE },
{ "IOWAIT_TIME", "Number of hundredths of a second that a processor has been waiting for I/O to complete, totalled over all ""processors", OG_TRUE, OG_FALSE },
{ "NICE_TIME", "Number of hundredths of a second that a processor has been busy executing low-priority user code, totalled over ""all processors", OG_TRUE, OG_FALSE },
{ "AVG_IDLE_TIME", "Number of hundredths of a second that a processor has been idle, averaged over all processors", OG_TRUE, OG_FALSE },
{ "AVG_BUSY_TIME", "Number of hundredths of a second that a processor has been busy executing user or kernel code, averaged over ""all processors", OG_TRUE, OG_FALSE },
{ "AVG_USER_TIME", "Number of hundredths of a second that a processor has been busy executing user code, averaged over all ""processors", OG_TRUE, OG_FALSE },
{ "AVG_SYS_TIME", "Number of hundredths of a second that a processor has been busy executing kernel code, averaged over all ""processors", OG_TRUE, OG_FALSE },
{ "AVG_IOWAIT_TIME", "Number of hundredths of a second that a processor has been waiting for I/O to complete, averaged over all ""processors", OG_TRUE, OG_FALSE },
{ "AVG_NICE_TIME", "Number of hundredths of a second that a processor has been busy executing low-priority user code, averaged over ""all processors", OG_TRUE, OG_FALSE },
{ "VM_PAGE_IN_BYTES", "Total number of bytes of data that have been paged in due to virtual memory paging", OG_TRUE, OG_FALSE },
{ "VM_PAGE_OUT_BYTES", "Total number of bytes of data that have been paged out due to virtual memory paging", OG_TRUE, OG_FALSE },
{ "LOAD", "Current number of processes that are either running or in the ready state, waiting to be selected by the ""operating-system scheduler to run. On many platforms, this statistic reflects the average load over the past ""minute.", OG_FALSE, OG_FALSE },
{ "PHYSICAL_MEMORY_BYTES", "Total number of bytes of physical memory", OG_FALSE, OG_FALSE }
};
const char *g_shutdown_mode_desc[SHUTDOWN_MODE_END] = {
"normal", "immediate", "signal", "abort"
};
static status_t init_job_manager(void)
{
knl_instance_t *kernel = &g_instance->kernel;
return cm_create_thread(jobs_proc, 0, kernel->sessions[SESSION_ID_JOB], &kernel->job_ctx.thread);
}
static void handle_signal_terminal(int sig_no)
{
g_instance->lsnr_abort_status = OG_TRUE;
}
static void exec_abnormal_terminal(void)
{
OG_LOG_RUN_WAR("exec_abnormal_terminal");
srv_instance_abort();
}
static void srv_destory_reserved_session(void)
{
session_t *session = NULL;
uint32 i;
for (i = 0; i < g_instance->kernel.reserved_sessions; i++) {
session = g_instance->session_pool.sessions[i];
if (session != NULL) {
knl_destroy_session(&g_instance->kernel, i);
CM_FREE_PTR(session->stack);
CM_FREE_PTR(session);
g_instance->session_pool.sessions[i] = NULL;
}
}
for (i = 0; i < g_instance->rm_pool.page_count; i++) {
CM_FREE_PTR(g_instance->rm_pool.pages[i]);
}
}
static status_t srv_ssl_check_params(int32 *alert_day)
{
int32 detect_day;
if (OG_SUCCESS != cm_str2int(cm_get_config_value(&g_instance->config, "SSL_EXPIRE_ALERT_THRESHOLD"), alert_day)) {
return OG_ERROR;
}
if (!(*alert_day >= OG_MIN_SSL_EXPIRE_THRESHOLD && *alert_day <= OG_MAX_SSL_EXPIRE_THRESHOLD)) {
OG_THROW_ERROR(ERR_PARAMETER_OVER_RANGE, "SSL_EXPIRE_ALERT_THRESHOLD", (int64)OG_MIN_SSL_EXPIRE_THRESHOLD,
(int64)OG_MAX_SSL_EXPIRE_THRESHOLD);
return OG_ERROR;
}
if (OG_SUCCESS != cm_str2int(cm_get_config_value(&g_instance->config, "SSL_PERIOD_DETECTION"), &detect_day)) {
return OG_ERROR;
}
if (!(detect_day >= OG_MIN_SSL_PERIOD_DETECTION && detect_day <= OG_MAX_SSL_PERIOD_DETECTION)) {
OG_THROW_ERROR(ERR_PARAMETER_OVER_RANGE, "SSL_PERIOD_DETECTION", (int64)OG_MIN_SSL_PERIOD_DETECTION,
(int64)OG_MAX_SSL_PERIOD_DETECTION);
return OG_ERROR;
}
if (detect_day > *alert_day) {
OG_LOG_RUN_ERR("SSL disabled: the value of SSL_PERIOD_DETECTION "
"is bigger than the value of SSL_EXPIRE_ALERT_THRESHOLD");
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t srv_init_ssl_communication()
{
ssl_config_t para;
char *keypwd_cipher = NULL;
char *verify_peer = NULL;
char plain[OG_PASSWD_MAX_LEN + OG_AESBLOCKSIZE + 4];
g_instance->ssl_acceptor_fd = NULL;
int32 alert_day = 0;
char real_path[OG_FILE_NAME_BUFFER_SIZE] = { 0 };
MEMS_RETURN_IFERR(memset_s(¶, sizeof(ssl_config_t), 0, sizeof(ssl_config_t)));
para.ca_file = cm_get_config_value(&g_instance->config, "SSL_CA");
para.cert_file = cm_get_config_value(&g_instance->config, "SSL_CERT");
para.key_file = cm_get_config_value(&g_instance->config, "SSL_KEY");
para.crl_file = cm_get_config_value(&g_instance->config, "SSL_CRL");
para.cipher = cm_get_config_value(&g_instance->config, "SSL_CIPHER");
keypwd_cipher = cm_get_config_value(&g_instance->config, "SSL_KEY_PASSWORD");
verify_peer = cm_get_config_value(&g_instance->config, "SSL_VERIFY_PEER");
if (cm_str_equal_ins(verify_peer, "TRUE")) {
para.verify_peer = OG_TRUE;
} else if (cm_str_equal_ins(verify_peer, "FALSE")) {
para.verify_peer = OG_FALSE;
} else {
OG_LOG_RUN_ERR("the value of parameter \"SSL_VERIFY_PEER\" is invalid");
return OG_ERROR;
}
(void)cm_alter_config(&g_instance->config, "HAVE_SSL", "FALSE", CONFIG_SCOPE_MEMORY, OG_TRUE);
if (CM_IS_EMPTY_STR(para.ca_file) && para.verify_peer) {
para.verify_peer = OG_FALSE;
(void)cm_alter_config(&g_instance->config, "SSL_VERIFY_PEER", "FALSE", CONFIG_SCOPE_MEMORY, OG_TRUE);
}
if (CM_IS_EMPTY_STR(para.cert_file) || CM_IS_EMPTY_STR(para.key_file)) {
OG_LOG_RUN_INF("SSL disabled: server certificate or private key file is not available.");
return OG_SUCCESS;
}
OG_RETURN_IFERR(realpath_file(para.ca_file, real_path, OG_FILE_NAME_BUFFER_SIZE));
if (cs_ssl_verify_file_stat(real_path) != OG_SUCCESS) {
OG_LOG_RUN_ERR("SSL CA certificate file \"%s\" has execute, group or world access permission.", para.ca_file);
return OG_ERROR;
}
OG_RETURN_IFERR(realpath_file(para.cert_file, real_path, OG_FILE_NAME_BUFFER_SIZE));
if (cs_ssl_verify_file_stat(real_path) != OG_SUCCESS) {
OG_LOG_RUN_ERR("SSL server certificate file \"%s\" has execute, group or world access permission.",
para.cert_file);
return OG_ERROR;
}
OG_RETURN_IFERR(realpath_file(para.key_file, real_path, OG_FILE_NAME_BUFFER_SIZE));
if (cs_ssl_verify_file_stat(real_path) != OG_SUCCESS) {
OG_LOG_RUN_ERR("SSL private key file \"%s\" has execute, group or world access permission.", para.key_file);
return OG_ERROR;
}
if (!CM_IS_EMPTY_STR(keypwd_cipher)) {
char *factor_key = cm_get_config_value(&g_instance->config, "_FACTOR_KEY");
char *local_key = cm_get_config_value(&g_instance->config, "LOCAL_KEY");
uint32 plain_len = sizeof(plain) - 1;
if (cm_decrypt_passwd(OG_TRUE, keypwd_cipher, (uint32)strlen(keypwd_cipher), plain, &plain_len,
local_key, factor_key) != OG_SUCCESS) {
OG_LOG_RUN_INF("SSL disabled: decrypt SSL private key password failed.");
return OG_SUCCESS;
}
plain[plain_len] = '\0';
para.key_password = plain;
}
g_instance->ssl_acceptor_fd = cs_ssl_create_acceptor_fd(¶);
if (g_instance->ssl_acceptor_fd == NULL) {
OG_LOG_RUN_INF("SSL disabled: create SSL context failed.");
} else {
(void)cm_alter_config(&g_instance->config, "HAVE_SSL", "TRUE", CONFIG_SCOPE_MEMORY, OG_TRUE);
OG_LOG_RUN_INF("SSL context initialized.");
OG_RETURN_IFERR(srv_ssl_check_params(&alert_day));
ssl_ca_cert_expire(g_instance->ssl_acceptor_fd, alert_day);
}
MEMS_RETURN_IFERR(memset_s(plain, sizeof(plain), 0, sizeof(plain)));
return OG_SUCCESS;
}
static status_t srv_ssl_expire_warning(void)
{
int32 alert_day;
int32 detect_day;
if (OG_SUCCESS != cm_str2int(cm_get_config_value(&g_instance->config, "SSL_EXPIRE_ALERT_THRESHOLD"), &alert_day)) {
return OG_ERROR;
}
if (OG_SUCCESS != cm_str2int(cm_get_config_value(&g_instance->config, "SSL_PERIOD_DETECTION"), &detect_day)) {
return OG_ERROR;
}
if ((g_timer()->systime / SECONDS_PER_DAY) % detect_day == 0) {
ssl_ca_cert_expire(g_instance->ssl_acceptor_fd, alert_day);
}
return OG_SUCCESS;
}
static status_t srv_init_session_pool(void)
{
uint32 i;
uint32 id;
rm_pool_init(&g_instance->rm_pool);
stat_pool_init(&g_instance->stat_pool);
g_instance->sql_cur_pool.lock = 0;
g_instance->sql_cur_pool.cnt = 0;
OG_RETURN_IFERR(srv_init_sql_cur_pools());
for (i = 0; i < g_instance->kernel.reserved_sessions; i++) {
if (srv_alloc_reserved_session(&id) != OG_SUCCESS) {
return OG_ERROR;
}
}
g_instance->session_pool.lock = 0;
biqueue_init(&g_instance->session_pool.idle_sessions);
biqueue_init(&g_instance->session_pool.priv_idle_sessions);
g_instance->session_pool.service_count = 0;
g_instance->session_pool.epollfd = epoll_create1(0);
return OG_SUCCESS;
}
static bool32 srv_need_wait_session(knl_instance_t *kernel)
{
session_pool_t *pool = &g_instance->session_pool;
knl_session_t *session = &pool->sessions[0]->knl_session;
uint32 i;
if (!DB_IS_READONLY(session) && DB_IN_BG_ROLLBACK(session)) {
return OG_TRUE;
}
for (i = g_instance->kernel.reserved_sessions; i < pool->hwm; i++) {
if (pool->sessions[i]->type == SESSION_TYPE_REPLICA || i == kernel->switch_ctrl.keep_sid ||
srv_is_kernel_reserve_session(pool->sessions[i]->type)) {
continue;
}
if (!pool->sessions[i]->is_free) {
return OG_TRUE;
}
}
if (kernel->sessions[SESSION_ID_SMON]->status == SESSION_ACTIVE) {
return OG_TRUE;
}
return OG_FALSE;
}
static void srv_kill_active_session(knl_instance_t *kernel)
{
session_pool_t *pool = &g_instance->session_pool;
uint32 i;
srv_pause_lsnr(LSNR_TYPE_SERVICE);
reactor_pause_pool();
for (i = g_instance->kernel.reserved_sessions; i < pool->hwm; i++) {
if (pool->sessions[i]->type == SESSION_TYPE_REPLICA || i == kernel->switch_ctrl.keep_sid ||
srv_is_kernel_reserve_session(pool->sessions[i]->type)) {
continue;
}
srv_mark_sess_killed(pool->sessions[i], OG_TRUE, pool->sessions[i]->knl_session.serial_id);
}
}
* Here we use peer recovery point, because we don't know
* there are gaps or not, we don't know the peer redo log is correct or not..
*/
static bool32 srv_wait_sync_log(knl_instance_t *kernel)
{
log_context_t *log_ctx = &kernel->redo_ctx;
lsnd_context_t *lsnd_ctx = &kernel->lsnd_ctx;
uint32 i;
cm_latch_s(&lsnd_ctx->latch, SESSION_ID_KERNEL, OG_FALSE, NULL);
for (i = 0; i < lsnd_ctx->standby_num; i++) {
if (lsnd_ctx->lsnd[i] == NULL || lsnd_ctx->lsnd[i]->is_disable ||
lsnd_ctx->lsnd[i]->status < LSND_STATUS_QUERYING) {
continue;
}
if (lsnd_ctx->lsnd[i]->state == REP_STATE_DEMOTE_REQUEST) {
if (lsnd_ctx->lsnd[i]->peer_rcy_point.lfn < log_ctx->lfn) {
cm_unlatch(&lsnd_ctx->latch, NULL);
return OG_TRUE;
}
OG_LOG_RUN_INF("[INST] [SWITCHOVER] Log sync end, local/peer lfn [%llu/%llu], local/peer lsn [%llu/%llu]",
log_ctx->lfn, (uint64)lsnd_ctx->lsnd[i]->peer_rcy_point.lfn, kernel->lsn,
lsnd_ctx->lsnd[i]->peer_replay_lsn);
}
}
cm_unlatch(&lsnd_ctx->latch, NULL);
return OG_FALSE;
}
static bool32 srv_demote_approved(knl_instance_t *kernel)
{
lsnd_context_t *ogx = &kernel->lsnd_ctx;
lsnd_t *lsnd = NULL;
cm_latch_s(&ogx->latch, SESSION_ID_KERNEL, OG_FALSE, NULL);
for (uint16 i = 0; i < ogx->standby_num; i++) {
if (ogx->lsnd[i] == NULL || ogx->lsnd[i]->is_disable || ogx->lsnd[i]->status < LSND_STATUS_QUERYING) {
continue;
}
if (ogx->lsnd[i]->state == REP_STATE_DEMOTE_REQUEST) {
ogx->lsnd[i]->state = REP_STATE_PROMOTE_APPROVE;
lsnd = ogx->lsnd[i];
break;
}
}
cm_unlatch(&ogx->latch, NULL);
if (lsnd == NULL) {
return OG_FALSE;
}
while (lsnd->state != REP_STATE_NORMAL) {
if (lsnd->state == REP_STATE_DEMOTE_FAILED) {
return OG_FALSE;
}
cm_sleep(10);
}
OG_LOG_RUN_INF("[INST] [SWITCHOVER] lsnd state is NORMAL.");
return OG_TRUE;
}
static inline void srv_reset_switch_request(switch_ctrl_t *ctrl)
{
cm_spin_lock(&ctrl->lock, NULL);
ctrl->is_rmon_set = OG_FALSE;
ctrl->request = SWITCH_REQ_NONE;
ctrl->state = SWITCH_IDLE;
ctrl->keep_sid = 0;
ctrl->handling = OG_FALSE;
cm_spin_unlock(&ctrl->lock);
}
static bool32 srv_killed_session_flush_end(bool32 demote)
{
knl_instance_t *kernel = &g_instance->kernel;
switch_ctrl_t *ctrl = &kernel->switch_ctrl;
knl_session_t *session = kernel->sessions[SESSION_ID_KERNEL];
if (!ctrl->handling) {
ctrl->handling = OG_TRUE;
OG_LOG_RUN_INF("[INST] %s: process %s request", demote ? "SWITCHOVER" : "RAEDONLY",
demote ? "demote" : "read only");
srv_kill_active_session(kernel);
ctrl->state = SWITCH_WAIT_SESSIONS;
OG_LOG_RUN_INF("[INST] %s: Kill all active sessions", demote ? "SWITCHOVER" : "RAEDONLY");
}
if (ctrl->state == SWITCH_WAIT_SESSIONS) {
if (srv_need_wait_session(kernel)) {
return OG_FALSE;
}
OG_LOG_RUN_INF("[INST] %s: All active sessions stopped", demote ? "SWITCHOVER" : "RAEDONLY");
* maybe log generate during clean nologging guts, so call it before log sync,
* and knl_free_all_xatlocks will set DB_IN_BG_ROLLBACK that is not allowed to do DDL,
* so, drop table before it.
*/
if (demote) {
if (db_clean_nologging_all(session) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[INST] %s: failed to clean nologging tables", demote ? "SWITCHOVER" : "RAEDONLY");
return OG_FALSE;
}
}
srv_shrink_xa_rms((knl_handle_t)session, OG_TRUE);
knl_close_temp_tables(session->kernel->sessions[SESSION_ID_TMP_STAT], DICT_TYPE_TEMP_TABLE_SESSION);
ashrink_clean(session);
ctrl->state = SWITCH_WAIT_LOG_SYNC;
OG_LOG_RUN_INF("[INST] %s: Wait for log %s", demote ? "SWITCHOVER" : "RAEDONLY", demote ? "sync" : "flush");
reactor_resume_pool();
srv_resume_lsnr(LSNR_TYPE_SERVICE);
}
if (ctrl->state == SWITCH_WAIT_LOG_SYNC) {
if (log_need_flush(&session->kernel->redo_ctx)) {
if (log_flush(session, NULL, NULL, NULL) != OG_SUCCESS) {
CM_ABORT(0, "[INST] %s ABORT INFO: failed to flush redo log", demote ? "SWITCHOVER" : "RAEDONLY");
}
return OG_FALSE;
}
}
return OG_TRUE;
}
static void srv_process_demote_request_core(knl_session_t *session, knl_instance_t *kernel)
{
if (db_save_core_ctrl(session) != OG_SUCCESS) {
CM_ABORT(0, "[INST] [SWITCHOVER] ABORT INFO: failed to save ctrlfile, demote failed");
}
rcy_init_proc(session);
if (lrpl_init(session) != OG_SUCCESS) {
CM_ABORT(0, "[INST] ABORT INFO: failed to create lrpl thread");
}
if (lsnd_init(session) != OG_SUCCESS) {
CM_ABORT(0, "[INST] ABORT INFO: failed to start log sender thread, demote failed");
}
}
static void srv_process_demote_request(void)
{
knl_instance_t *kernel = &g_instance->kernel;
switch_ctrl_t *ctrl = &kernel->switch_ctrl;
knl_session_t *session = kernel->sessions[SESSION_ID_KERNEL];
database_t *db = &kernel->db;
if (db_check_backgroud_blocked(session, OG_TRUE, g_instance->sync_doing)) {
return;
}
if (!cm_spin_try_lock(&db->lock)) {
return;
}
if (!srv_killed_session_flush_end(OG_TRUE)) {
cm_spin_unlock(&db->lock);
return;
}
if (ctrl->state == SWITCH_WAIT_LOG_SYNC) {
if (srv_wait_sync_log(kernel)) {
cm_spin_unlock(&db->lock);
return;
}
}
if (!srv_demote_approved(kernel)) {
srv_reset_switch_request(ctrl);
cm_spin_unlock(&db->lock);
lsnd_reset_state(session);
OG_LOG_RUN_INF("[INST] [SWITCHOVER] demote failed, running as primary still");
return;
}
kernel->lrcv_ctx.reconnected = OG_TRUE;
db->is_readonly = OG_TRUE;
db->readonly_reason = PHYSICAL_STANDBY_SET;
db->ctrl.core.db_role = REPL_ROLE_PHYSICAL_STANDBY;
OG_LOG_RUN_INF("[INST] [SWITCHOVER] Log synced, change role to standby");
tx_rollback_close(session);
lsnd_close_all_thread(session);
btree_cache_reset(session);
srv_process_demote_request_core(session, kernel);
srv_reset_switch_request(ctrl);
cm_spin_unlock(&db->lock);
OG_LOG_RUN_INF("[INST] [SWITCHOVER] demote completed, running as standby");
}
static void srv_promote_get_local_host(promote_record_t *promote_record)
{
knl_instance_t *kernel = &g_instance->kernel;
lsnd_context_t *ogx = &kernel->lsnd_ctx;
errno_t err;
for (uint16 i = 0; i < OG_MAX_PHYSICAL_STANDBY; i++) {
if (ogx->lsnd[i] == NULL) {
continue;
}
dest_info_t *dest_info = &ogx->lsnd[i]->dest_info;
if (strncmp(promote_record->peer_url, dest_info->peer_host, strlen(dest_info->peer_host)) == 0) {
if (dest_info->local_host[0] == '\0') {
break;
}
err = snprintf_s(promote_record->local_url, sizeof(promote_record->local_url),
sizeof(promote_record->local_url) - 1, "%s:%u", dest_info->local_host, (uint32)kernel->attr.repl_port);
knl_securec_check_ss(err);
return;
}
}
err =
snprintf_s(promote_record->local_url, sizeof(promote_record->local_url), sizeof(promote_record->local_url) - 1,
"%s:%u", g_instance->lsnr.tcp_replica.host[0], (uint32)kernel->attr.repl_port);
knl_securec_check_ss(err);
}
static status_t srv_promote_get_record(knl_session_t *session, uint16 *row_count, date_t *min_date)
{
knl_cursor_t *cursor = NULL;
date_t temp_date;
date_t date = OG_INVALID_INT64;
uint16 i = 0;
CM_SAVE_STACK(session->stack);
cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_PROMOTE_RECORD_ID, 0);
cursor->index_dsc = OG_TRUE;
knl_init_index_scan(cursor, OG_FALSE);
knl_set_key_flag(&cursor->scan_range.l_key, SCAN_KEY_LEFT_INFINITE, 0);
knl_set_key_flag(&cursor->scan_range.r_key, SCAN_KEY_RIGHT_INFINITE, 0);
for (;;) {
if (OG_SUCCESS != knl_fetch(session, cursor)) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
if (cursor->eof) {
break;
} else {
i++;
temp_date = *(date_t *)CURSOR_COLUMN_DATA(cursor, PRMOTE_COL_TIME);
date = (date > temp_date) ? temp_date : date;
}
}
*row_count = i;
*min_date = date;
CM_RESTORE_STACK(session->stack);
return OG_SUCCESS;
}
static status_t srv_promote_update_record(knl_session_t *session, date_t *min_date, promote_record_t *promote_record)
{
knl_cursor_t *cursor = NULL;
row_assist_t ra;
uint16 size;
status_t status;
knl_set_session_scn(session, OG_INVALID_ID64);
CM_SAVE_STACK(session->stack);
cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_UPDATE, SYS_PROMOTE_RECORD_ID, 0);
knl_init_index_scan(cursor, OG_TRUE);
knl_scan_key_t *key = &cursor->scan_range.l_key;
knl_set_scan_key(INDEX_DESC(cursor->index), key, OG_TYPE_DATE, min_date, sizeof(date_t),
IX_SYS_PROMOTE_RECORD_001_ID);
if (knl_fetch(session, cursor) != OG_SUCCESS) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
if (cursor->eof) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
row_init(&ra, cursor->update_info.data, HEAP_MAX_ROW_SIZE(session), PRMOTE_COL_PEER_HOST + 1);
(void)row_put_date(&ra, promote_record->time);
(void)row_put_str(&ra, promote_record->type);
(void)row_put_str(&ra, promote_record->local_url);
(void)row_put_str(&ra, promote_record->peer_url);
cursor->update_info.count = PRMOTE_COL_PEER_HOST + 1;
cursor->update_info.columns[0] = PRMOTE_COL_TIME;
cursor->update_info.columns[1] = PRMOTE_COL_TYPE;
cursor->update_info.columns[2] = PRMOTE_COL_LOCAL_HOST;
cursor->update_info.columns[3] = PRMOTE_COL_PEER_HOST;
cm_decode_row(cursor->update_info.data, cursor->update_info.offsets, cursor->update_info.lens, &size);
status = knl_internal_update(session, cursor);
if (status != OG_SUCCESS) {
knl_rollback(session, NULL);
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
knl_commit(session);
CM_RESTORE_STACK(session->stack);
return status;
}
static status_t srv_promote_insert_record(knl_session_t *session, promote_record_t *promote_record)
{
status_t status;
row_assist_t ra;
CM_SAVE_STACK(session->stack);
knl_cursor_t *cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_INSERT, SYS_PROMOTE_RECORD_ID, OG_INVALID_ID32);
uint32 max_size = session->kernel->attr.max_row_size;
row_init(&ra, (char *)cursor->row, max_size, PRMOTE_COL_PEER_HOST + 1);
(void)row_put_date(&ra, promote_record->time);
(void)row_put_str(&ra, promote_record->type);
(void)row_put_str(&ra, promote_record->local_url);
(void)row_put_str(&ra, promote_record->peer_url);
status = knl_internal_insert(session, cursor);
if (status != OG_SUCCESS) {
knl_rollback(session, NULL);
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
knl_commit(session);
CM_RESTORE_STACK(session->stack);
return status;
}
static void srv_promote_save_record(knl_session_t *session, promote_record_t *promote_record)
{
status_t status;
uint16 row_count;
date_t min_date = OG_INVALID_INT64;
if (DB_IS_UPGRADE(session) && (cm_strcmpni(promote_record->type, "SWITCHOVER", strlen("SWITCHOVER")) == 0)) {
return;
}
if (srv_promote_get_record(session, &row_count, &min_date) != OG_SUCCESS) {
OG_LOG_RUN_WAR("[INST] [%s] failed to save promote record", promote_record->type);
return;
}
if (row_count >= OG_MAX_PROMOTE_RECORD_COUNT) {
status = srv_promote_update_record(session, &min_date, promote_record);
} else {
status = srv_promote_insert_record(session, promote_record);
}
if (status != OG_SUCCESS) {
OG_LOG_RUN_WAR("[INST] [%s] failed to save promote record", promote_record->type);
}
}
static void srv_promote_get_peer_url(promote_record_t *promote_record)
{
knl_session_t *session = g_instance->kernel.sessions[SESSION_ID_KERNEL];
switch_ctrl_t *ctrl = &g_instance->kernel.switch_ctrl;
lrcv_context_t *lrcv = &g_instance->kernel.lrcv_ctx;
errno_t err;
err = snprintf_s(promote_record->peer_url, sizeof(promote_record->peer_url), sizeof(promote_record->peer_url) - 1,
"%s:%u", lrcv->primary_host, (uint32)ctrl->peer_repl_port);
knl_securec_check_ss(err);
ctrl->peer_repl_port = 0;
err = strncpy_s(promote_record->type, sizeof(promote_record->type), "SWITCHOVER", strlen("SWITCHOVER"));
knl_securec_check(err);
promote_record->time = KNL_NOW(session);
}
static void srv_process_promote_request_core(knl_session_t *session, switch_ctrl_t *ctrl,
promote_record_t *promote_record)
{
if (lsnd_init(session) != OG_SUCCESS) {
CM_ABORT(0, "[INST] [SWITCHOVER] ABORT INFO: failed to start log sender thread, promote failed");
}
if (tx_rollback_start(session) != OG_SUCCESS) {
CM_ABORT(0, "[INST] [SWITCHOVER] ABORT INFO: failed to start txn rollback thread, promote failed");
}
if (db_garbage_segment_clean(session) != OG_SUCCESS) {
OG_LOG_RUN_WAR("[INST] [SWITCHOVER] failed to clean garbage segment");
}
if (spc_clean_garbage_space(session) != OG_SUCCESS) {
OG_LOG_RUN_WAR("[SPACE] failed to clean garbage tablespace");
}
spc_init_swap_space(session, SPACE_GET(session, dtc_my_ctrl(session)->swap_space));
heap_remove_cached_pages(session, NULL);
srv_promote_get_local_host(promote_record);
srv_promote_save_record(session, promote_record);
srv_reset_switch_request(ctrl);
if (db_save_core_ctrl(session) != OG_SUCCESS) {
CM_ABORT(0, "[INST] [SWITCHOVER] ABORT INFO: failed to save ctrlfile, promote failed");
}
}
static void srv_process_promote_request(void)
{
knl_instance_t *kernel = &g_instance->kernel;
switch_ctrl_t *ctrl = &kernel->switch_ctrl;
knl_session_t *session = kernel->sessions[SESSION_ID_KERNEL];
log_context_t *log_ctx = &kernel->redo_ctx;
promote_record_t promote_record;
srv_promote_get_peer_url(&promote_record);
(void)knl_stop_build(session);
OG_LOG_RUN_INF("[INST] [SWITCHOVER] no build is running now");
OG_LOG_RUN_INF("[INST] [SWITCHOVER] process promote request");
ctrl->handling = OG_TRUE;
ctrl->switch_asn = OG_INVALID_ASN;
if (log_switch_logfile(session, OG_INVALID_FILEID, OG_INVALID_ASN, NULL) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[INST] failed to switch logfile, promote failed");
srv_reset_switch_request(ctrl);
return;
}
ctrl->switch_asn = log_ctx->files[log_ctx->curr_file].head.asn;
OG_LOG_RUN_INF("[INST] [SWITCHOVER] log file switched to %u", ctrl->switch_asn);
if (kernel->redo_ctx.files[kernel->redo_ctx.curr_file].head.rst_id != kernel->db.ctrl.core.resetlogs.rst_id) {
kernel->redo_ctx.files[kernel->redo_ctx.curr_file].head.rst_id = kernel->db.ctrl.core.resetlogs.rst_id;
log_flush_head(session, &kernel->redo_ctx.files[kernel->redo_ctx.curr_file]);
if (kernel->redo_ctx.files[kernel->redo_ctx.curr_file].head.asn !=
kernel->db.ctrl.core.resetlogs.last_asn + 1) {
OG_THROW_ERROR_EX(ERR_ASSERT_ERROR,
"kernel->redo_ctx.files[kernel->redo_ctx.curr_file].head.asn(%u) == "
"kernel->db.ctrl.core.resetlogs.last_asn(%u) + 1",
kernel->redo_ctx.files[kernel->redo_ctx.curr_file].head.asn, kernel->db.ctrl.core.resetlogs.last_asn);
}
}
rcy_wait_replay_complete(session);
OG_LOG_RUN_INF("[INST] [SWITCHOVER] LRPL has finished replay work");
lsnd_close_all_thread(session);
lrcv_close(session);
lrpl_close(session);
lftc_clt_close(session);
rcy_close_proc(session);
OG_LOG_RUN_INF("[INST] [SWITCHOVER] close lsnd & lrcv & lrpl & lftc & rcy");
knl_inc_dc_ver(kernel);
kernel->db.is_readonly = OG_FALSE;
kernel->db.readonly_reason = PRIMARY_SET;
kernel->dc_ctx.completed = OG_FALSE;
kernel->db.ctrl.core.db_role = REPL_ROLE_PRIMARY;
OG_LOG_RUN_INF("[INST] [SWITCHOVER] log file switched, change role to primary");
srv_process_promote_request_core(session, ctrl, &promote_record);
OG_LOG_RUN_INF("[INST] [SWITCHOVER] promote completed, running as primary");
}
static void srv_process_readonly_request(void)
{
knl_instance_t *kernel = &g_instance->kernel;
database_t *db = &kernel->db;
switch_ctrl_t *ctrl = &kernel->switch_ctrl;
knl_session_t *session = kernel->sessions[SESSION_ID_KERNEL];
if (db_check_backgroud_blocked(session, OG_FALSE, g_instance->sync_doing)) {
return;
}
if (!cm_spin_try_lock(&db->lock)) {
return;
}
if (!srv_killed_session_flush_end(OG_FALSE)) {
cm_spin_unlock(&db->lock);
return;
}
tx_rollback_close(session);
smon_close(session);
rmon_close(session);
OG_LOG_RUN_INF("[INST] [READONLY] close tx_rollback & smon && rmon");
db->is_readonly = OG_TRUE;
db->readonly_reason = ctrl->is_rmon_set ? RMON_SET : MANUALLY_SET;
if (smon_start(session) != OG_SUCCESS) {
CM_ABORT(0, "[INST] [READONLY] ABORT INFO: failed to start smon thread, convert to readonly failed");
}
if (rmon_start(session) != OG_SUCCESS) {
CM_ABORT(0, "[INST] [READONLY] ABORT INFO: failed to start smon thread, convert to readonly failed");
}
srv_reset_switch_request(ctrl);
cm_spin_unlock(&db->lock);
OG_LOG_RUN_INF("[INST] set readonly completed");
}
static void srv_promote_force_get_local_host(promote_record_t *promote_record)
{
knl_instance_t *kernel = &g_instance->kernel;
knl_session_t *session = kernel->sessions[SESSION_ID_KERNEL];
arch_attr_t *arch_attr = NULL;
errno_t err;
uint16 i;
for (i = 0; i < OG_MAX_ARCH_DEST; i++) {
arch_attr = &kernel->attr.arch_attr[i];
if (arch_attr->dest_mode == LOG_ARCH_DEST_SERVICE && arch_attr->local_host[0] != '\0') {
break;
}
}
if (i < OG_MAX_ARCH_DEST) {
err = snprintf_s(promote_record->local_url, sizeof(promote_record->local_url),
sizeof(promote_record->local_url) - 1, "%s:%u", arch_attr->local_host, (uint32)kernel->attr.repl_port);
} else {
err = snprintf_s(promote_record->local_url, sizeof(promote_record->local_url),
sizeof(promote_record->local_url) - 1, "%s:%u", g_instance->lsnr.tcp_replica.host[0],
(uint32)kernel->attr.repl_port);
}
knl_securec_check_ss(err);
promote_record->time = KNL_NOW(session);
}
static void srv_record_promote_time(knl_session_t *session)
{
log_context_t *log = &session->kernel->redo_ctx;
lrpl_context_t *lrpl = &session->kernel->lrpl_ctx;
OG_LOG_RUN_INF("[LRPL] replay used time %llums, end point: rst_id:[%llu], asn[%u], lfn[%llu]",
(KNL_NOW(session) - log->promote_temp_time) / MILLISECS_PER_SECOND,
(uint64)lrpl->curr_point.rst_id, lrpl->curr_point.asn, (uint64)lrpl->curr_point.lfn);
log->promote_temp_time = KNL_NOW(session);
}
static void srv_process_force_promote_request(void)
{
knl_instance_t *kernel = &g_instance->kernel;
database_t *db = &kernel->db;
switch_ctrl_t *ctrl = &kernel->switch_ctrl;
knl_session_t *session = kernel->sessions[SESSION_ID_KERNEL];
log_context_t *log = &kernel->redo_ctx;
lrpl_context_t *lrpl = &session->kernel->lrpl_ctx;
core_ctrl_t *core = &db->ctrl.core;
bool32 force = (bool32)(ctrl->request == SWITCH_REQ_FORCE_FAILOVER_PROMOTE);
promote_record_t promote_record;
errno_t err;
if (force) {
err = strncpy_s(promote_record.type, sizeof(promote_record.type), "FORCE FAILOVER", strlen("FORCE FAILOVER"));
if (SECUREC_UNLIKELY(err != EOK)) {
OG_THROW_ERROR(ERR_SYSTEM_CALL, err);
return;
}
} else {
err = strncpy_s(promote_record.type, sizeof(promote_record.type), "FAILOVER", strlen("FAILOVER"));
}
knl_securec_check(err);
if ((force && kernel->lrcv_ctx.session != NULL) || !cm_spin_try_lock(&db->lock)) {
return;
}
if (ctrl->state == SWITCH_IDLE) {
(void)knl_stop_build(session);
OG_LOG_RUN_INF("[INST] [%sFAILOVER] no build is running now", force ? "FORCE " : "");
}
if (!ctrl->handling) {
ctrl->handling = OG_TRUE;
OG_LOG_RUN_INF("[INST] [%sFAILOVER] process promote request", force ? "FORCE " : "");
srv_kill_active_session(kernel);
ctrl->state = SWITCH_WAIT_SESSIONS;
ctrl->switch_asn = OG_INVALID_ASN;
OG_LOG_RUN_INF("[INST] [%sFAILOVER] Kill all active sessions", force ? "FORCE " : "");
}
if (ctrl->state == SWITCH_WAIT_SESSIONS) {
if (srv_need_wait_session(kernel)) {
cm_spin_unlock(&db->lock);
return;
}
OG_LOG_RUN_INF("[INST] [%sFAILOVER] All active sessions stopped", force ? "FORCE " : "");
reactor_resume_pool();
srv_resume_lsnr(LSNR_TYPE_SERVICE);
ctrl->state = SWITCH_WAIT_RECOVERY;
log->promote_temp_time = KNL_NOW(session);
OG_LOG_RUN_INF("[INST] [%sFAILOVER] log replay from [%llu-%u-%u/%llu]", force ? "FORCE " : "",
(uint64)lrpl->curr_point.rst_id, lrpl->curr_point.asn, lrpl->curr_point.block_id,
(uint64)lrpl->curr_point.lfn);
}
if (ctrl->state == SWITCH_WAIT_RECOVERY) {
if (!kernel->lrpl_ctx.thread.closed) {
cm_spin_unlock(&db->lock);
return;
}
rcy_wait_replay_complete(session);
OG_LOG_RUN_INF("[INST] [%sFAILOVER] LRPL has finished replay work", force ? "FORCE " : "");
srv_record_promote_time(session);
rcy_close_proc(session);
* do not trigger full checkpoint when db->ctrl.core.rcy_point.rst_id == log_ctx->curr_point.rst_id
* because trigger full point(confuse implement) maybe wait long time
*/
if (dtc_my_ctrl(session)->rcy_point.rst_id != log->curr_point.rst_id) {
ckpt_trigger(session, OG_FALSE, CKPT_TRIGGER_FULL);
}
if (log_switch_logfile(session, OG_INVALID_FILEID, OG_INVALID_ASN, NULL) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[INST] [%sFAILOVER] failed to switch logfile", force ? "FORCE " : "");
ctrl->handling = OG_FALSE;
ctrl->request = SWITCH_REQ_NONE;
cm_spin_unlock(&db->lock);
return;
}
ctrl->switch_asn = log->files[log->curr_file].head.asn;
OG_LOG_RUN_INF("[INST] [%sFAILOVER] Log file switched to %u", force ? "FORCE " : "", ctrl->switch_asn);
if (dtc_my_ctrl(session)->rcy_point.rst_id != log->curr_point.rst_id) {
ctrl->state = SWITCH_WAIT_CKPT;
OG_LOG_RUN_INF("[INST] [%sFAILOVER] Need to wait for checkpoint, rcy point rst_id is %u, "
"current redo log point rst_id is %u",
force ? "FORCE " : "", dtc_my_ctrl(session)->rcy_point.rst_id, log->curr_point.rst_id);
}
}
if (ctrl->state == SWITCH_WAIT_CKPT) {
if (!ckpt_check(session)) {
cm_spin_unlock(&db->lock);
return;
}
db_reset_log(session, ctrl->switch_asn, OG_TRUE, OG_FALSE);
OG_LOG_RUN_INF("[INST] [%sFAILOVER] Checkpoint and resetlog finished", force ? "FORCE " : "");
} else {
db_reset_log(session, ctrl->switch_asn, (ctrl->switch_asn == log->curr_point.asn), OG_FALSE);
OG_LOG_RUN_INF("[INST] [%sFAILOVER] Resetlog finished", force ? "FORCE " : "");
}
OG_LOG_RUN_INF("[INST] [%sFAILOVER] lsn:[%llu]", force ? "FORCE " : "", DB_CURR_LSN(session));
OG_LOG_RUN_INF("[INST] [%sFAILOVER] SCN:[%llu]", force ? "FORCE " : "", DB_CURR_SCN(session));
OG_LOG_RUN_INF("[INST] [%sFAILOVER] lrp rst_id/asn/lfn/offset:[%u/%u/%llu/%u]", force ? "FORCE " : "",
dtc_my_ctrl(session)->lrp_point.rst_id, dtc_my_ctrl(session)->lrp_point.asn,
(uint64)dtc_my_ctrl(session)->lrp_point.lfn, dtc_my_ctrl(session)->lrp_point.block_id);
OG_LOG_RUN_INF("[INST] [%sFAILOVER] rcy rst_id/asn/lfn/offset:[%u/%u/%llu/%u]", force ? "FORCE " : "",
dtc_my_ctrl(session)->rcy_point.rst_id, dtc_my_ctrl(session)->rcy_point.asn,
(uint64)dtc_my_ctrl(session)->rcy_point.lfn, dtc_my_ctrl(session)->rcy_point.block_id);
OG_LOG_RUN_INF("[INST] [%sFAILOVER] curr point rst_id/asn/lfn/offset:[%u/%u/%llu/%u]", force ? "FORCE " : "",
log->curr_point.rst_id, log->curr_point.asn, (uint64)log->curr_point.lfn, log->curr_point.block_id);
* In previous db_reset_log, current file's resetid has been updated and flushed. We should save resetlog here
* to prevent inconsistent resetid if failover failed finally.
*/
if (db_save_core_ctrl(session) != OG_SUCCESS) {
CM_ABORT(0, "[INST] [%sFAILOVER] ABORT INFO: failed to save resetlog ctrlfile", force ? "FORCE " : "");
}
dc_reset_not_ready_by_nlg(session);
knl_inc_dc_ver(kernel);
kernel->db.is_readonly = OG_FALSE;
kernel->db.readonly_reason = PRIMARY_SET;
kernel->dc_ctx.completed = OG_FALSE;
core->db_role = REPL_ROLE_PRIMARY;
OG_LOG_RUN_INF("[INST] [%sFAILOVER] Change role to primary", force ? "FORCE " : "");
if (kernel->db.ctrl.core.is_restored) {
db_set_ctrl_restored(session, OG_FALSE);
}
lrcv_clear_needrepair_for_failover(session);
lrcv_reset_primary_host(session);
lsnd_close_all_thread(session);
if (lsnd_init(session) != OG_SUCCESS) {
CM_ABORT(0, "[INST] [%sFAILOVER] ABORT INFO: failed to start log sender thread", force ? "FORCE " : "");
}
if (tx_rollback_start(session) != OG_SUCCESS) {
CM_ABORT(0, "[INST] [%sFAILOVER] ABORT INFO: failed to start txn rollback thread", force ? "FORCE " : "");
}
if (db_clean_nologging_all(session) != OG_SUCCESS) {
CM_ABORT(0, "[INST] [%sFAILOVER] ABORT INFO: failed to clean nologging tables", force ? "FORCE " : "");
}
dc_set_ready(session);
if (db_garbage_segment_clean(session) != OG_SUCCESS) {
OG_LOG_RUN_WAR("[INST] [%sFAILOVER] failed to clean garbage segment", force ? "FORCE " : "");
}
if (spc_clean_garbage_space(session) != OG_SUCCESS) {
OG_LOG_RUN_WAR("[INST] [%sFAILOVER] failed to clean garbage tablespace", force ? "FORCE " : "");
}
spc_init_swap_space(session, SPACE_GET(session, dtc_my_ctrl(session)->swap_space));
heap_remove_cached_pages(session, NULL);
srv_promote_force_get_local_host(&promote_record);
srv_promote_save_record(session, &promote_record);
srv_reset_switch_request(ctrl);
cm_spin_unlock(&db->lock);
if (db_save_core_ctrl(session) != OG_SUCCESS) {
CM_ABORT(0, "[INST] [%sFAILOVER] ABORT INFO: failed to save core ctrlfile", force ? "FORCE " : "");
}
OG_LOG_RUN_INF("[INST] [%sFAILOVER] promote completed, running as primary", force ? "FORCE " : "");
}
static void srv_process_cancel_upgrade_request(void)
{
knl_instance_t *kernel = &g_instance->kernel;
database_t *db = &kernel->db;
dc_context_t *dc_ctx = &kernel->dc_ctx;
switch_ctrl_t *ctrl = &kernel->switch_ctrl;
knl_session_t *session = kernel->sessions[SESSION_ID_KERNEL];
if (!cm_spin_try_lock(&db->lock)) {
return;
}
ckpt_trigger(session, OG_TRUE, CKPT_TRIGGER_FULL);
dc_ctx->completed = OG_FALSE;
db->open_status = DB_OPEN_STATUS_NORMAL;
if (db_clean_nologging_all(session) != OG_SUCCESS) {
CM_ABORT(0, "[INST] [CANCEL UPGRADE] ABORT INFO: failed to clean nologging tables, cancel upgrade failed");
}
if (db_garbage_segment_clean(session) != OG_SUCCESS) {
OG_LOG_RUN_WAR("[INST] [FAILOVER] failed to clean garbage segment");
}
if (spc_clean_garbage_space(session) != OG_SUCCESS) {
OG_LOG_RUN_WAR("[SPACE] failed to clean garbage tablespace");
}
srv_reset_switch_request(ctrl);
cm_spin_unlock(&db->lock);
OG_LOG_RUN_INF("[INST] cancel upgrade mode completed");
}
static status_t srv_adjust_log_sender(void)
{
knl_instance_t *kernel = &g_instance->kernel;
knl_session_t *session = kernel->sessions[SESSION_ID_LSND];
arch_context_t *ogx = &kernel->arch_ctx;
if (g_instance->shutdown_ctx.phase >= SHUTDOWN_PHASE_INPROGRESS) {
return OG_SUCCESS;
}
lsnd_close_disabled_thread(session);
if (ogx->arch_dest_state_changed) {
if (lsnd_init(session) != OG_SUCCESS) {
ogx->arch_dest_state_changed = OG_FALSE;
return OG_ERROR;
}
ogx->arch_dest_state_changed = OG_FALSE;
}
return OG_SUCCESS;
}
static void srv_process_raft_promote_request_core(void)
{
knl_instance_t *kernel = &g_instance->kernel;
knl_session_t *session = kernel->sessions[SESSION_ID_KERNEL];
log_context_t *log_ctx = &kernel->redo_ctx;
OG_LOG_RUN_INF("[INST] [FAILOVER] lsn:[%llu]", DB_CURR_LSN(session));
OG_LOG_RUN_INF("[INST] [FAILOVER] SCN:[%llu]", DB_CURR_SCN(session));
OG_LOG_RUN_INF("[INST] [FAILOVER] lrp rst_id/asn/lfn/offset:[%u/%u/%llu/%u]",
dtc_my_ctrl(session)->lrp_point.rst_id, dtc_my_ctrl(session)->lrp_point.asn,
(uint64)dtc_my_ctrl(session)->lrp_point.lfn, dtc_my_ctrl(session)->lrp_point.block_id);
OG_LOG_RUN_INF("[INST] [FAILOVER] rcy rst_id/asn/lfn/offset:[%u/%u/%llu/%u]",
dtc_my_ctrl(session)->rcy_point.rst_id, dtc_my_ctrl(session)->rcy_point.asn,
(uint64)dtc_my_ctrl(session)->rcy_point.lfn, dtc_my_ctrl(session)->rcy_point.block_id);
OG_LOG_RUN_INF("[INST] [FAILOVER] curr point rst_id/asn/lfn/offset:[%u/%u/%llu/%u]", log_ctx->curr_point.rst_id,
log_ctx->curr_point.asn, (uint64)log_ctx->curr_point.lfn, log_ctx->curr_point.block_id);
dc_reset_not_ready_by_nlg(session);
knl_inc_dc_ver(kernel);
kernel->db.is_readonly = OG_FALSE;
kernel->db.readonly_reason = PRIMARY_SET;
kernel->dc_ctx.completed = OG_FALSE;
kernel->db.ctrl.core.db_role = REPL_ROLE_PRIMARY;
if (db_save_core_ctrl(session) != OG_SUCCESS) {
CM_ABORT(0, "[INST] ABORT INFO: failed to save core ctrlfile, force promote failed");
}
if (tx_rollback_start(session) != OG_SUCCESS) {
CM_ABORT(0, "[INST] ABORT INFO: failed to start txn rollback thread, force promote failed");
}
if (db_clean_nologging_all(session) != OG_SUCCESS) {
CM_ABORT(0, "[INST] [FAILOVER] ABORT INFO: failed to clean nologging tables, force promote failed");
}
dc_set_ready(session);
if (db_garbage_segment_clean(session) != OG_SUCCESS) {
OG_LOG_RUN_WAR("[INST] failed to clean garbage segment");
}
if (spc_clean_garbage_space(session) != OG_SUCCESS) {
OG_LOG_RUN_WAR("[SPACE] failed to clean garbage tablespace");
}
spc_init_swap_space(session, SPACE_GET(session, dtc_my_ctrl(session)->swap_space));
}
static void srv_process_force_raft_promote_request(void)
{
knl_instance_t *kernel = &g_instance->kernel;
switch_ctrl_t *ctrl = &kernel->switch_ctrl;
knl_session_t *session = kernel->sessions[SESSION_ID_KERNEL];
log_context_t *log_ctx = &kernel->redo_ctx;
lrpl_context_t *lrpl = &session->kernel->lrpl_ctx;
if (!ctrl->handling) {
ctrl->handling = OG_TRUE;
OG_LOG_RUN_INF("[INST] process force promote request");
srv_kill_active_session(kernel);
ctrl->state = SWITCH_WAIT_SESSIONS;
}
if (ctrl->state == SWITCH_WAIT_SESSIONS) {
if (srv_need_wait_session(kernel)) {
return;
}
reactor_resume_pool();
srv_resume_lsnr(LSNR_TYPE_SERVICE);
ctrl->state = SWITCH_WAIT_RECOVERY;
log_ctx->promote_temp_time = KNL_NOW(session);
OG_LOG_RUN_INF("[INST] failover log repaly from [%llu-%u-%u/%llu]", (uint64)lrpl->curr_point.rst_id,
lrpl->curr_point.asn, lrpl->curr_point.block_id, (uint64)lrpl->curr_point.lfn);
}
if (ctrl->state == SWITCH_WAIT_RECOVERY) {
if (!kernel->lrpl_ctx.thread.closed) {
return;
}
rcy_wait_replay_complete(session);
OG_LOG_RUN_INF("[INST] [FAILOVER] LRPL has finished replay work");
srv_record_promote_time(session);
rcy_close_proc(session);
* do not trigger full checkpoint when db->ctrl.core.rcy_point.rst_id == log_ctx->curr_point.rst_id
* because trigger full point(confuse implement) maybe wait long time
*/
if (dtc_my_ctrl(session)->rcy_point.rst_id != log_ctx->curr_point.rst_id) {
ckpt_trigger(session, OG_FALSE, CKPT_TRIGGER_FULL);
}
}
srv_process_raft_promote_request_core();
heap_remove_cached_pages(session, NULL);
srv_reset_switch_request(ctrl);
OG_LOG_RUN_INF("[INST] force promote completed, running as primary");
}
static void srv_record_backup(uint32 client_id)
{
knl_session_t *session = g_instance->kernel.sessions[SESSION_ID_BRU];
lsnd_context_t *lsnd_ctx = &g_instance->kernel.lsnd_ctx;
lsnd_bak_task_t *bak_task = &lsnd_ctx->lsnd[client_id]->bak_task;
bool32 task_failed = OG_FALSE;
OG_LOG_RUN_INF("srv_record_backup");
bak_task->record.status = BACKUP_SUCCESS;
if (bak_record_backup_set(session, &bak_task->record) != OG_SUCCESS) {
task_failed = OG_TRUE;
}
lsnd_trigger_task_response(session, client_id, task_failed);
}
static void srv_process_record_backup_task(void)
{
knl_session_t *session = g_instance->kernel.sessions[SESSION_ID_BRU];
bak_context_t *ogx = &session->kernel->backup_ctx;
uint32 build_keep_alive_timeout = session->kernel->attr.build_keep_alive_timeout;
uint32 i;
for (i = 0; i < OG_MAX_PHYSICAL_STANDBY; i++) {
if (!g_instance->kernel.record_backup_trigger[i]) {
continue;
}
srv_record_backup(i);
g_instance->kernel.record_backup_trigger[i] = OG_FALSE;
}
if (!BAK_IS_KEEP_ALIVE(ogx)) {
return;
}
dls_spin_lock(session, &ogx->lock, NULL);
if (!BAK_IS_KEEP_ALIVE(ogx)) {
dls_spin_unlock(session, &ogx->lock);
return;
}
if (cm_current_time() - ogx->keep_live_start_time > build_keep_alive_timeout || ogx->bak.build_stopped) {
knl_panic(!ogx->bak.need_retry);
ogx->bak_condition = NOT_RUNNING;
OG_LOG_RUN_INF("[BUILD] cancel keep alive condition while reaching timeout "
"or build cancelled: %u",
ogx->bak.build_stopped);
}
dls_spin_unlock(session, &ogx->lock);
}
static void srv_terminal_zombie_session(void)
{
session_t *sess = NULL;
int loop;
int nfds;
struct epoll_event events[OG_EV_WAIT_NUM];
struct epoll_event *ev = NULL;
nfds = epoll_wait(g_instance->session_pool.epollfd, events, OG_EV_WAIT_NUM, OG_EV_WAIT_TIMEOUT);
if (nfds == -1) {
if (errno != EINTR) {
OG_LOG_RUN_ERR("Failed to wait for connection request, OS error:%d", cm_get_os_error());
}
return;
}
if (nfds == 0) {
return;
}
for (loop = 0; loop < nfds; ++loop) {
ev = &events[loop];
sess = (session_t *)ev->data.ptr;
if (!sess->knl_session.killed && sess->knl_session.status != SESSION_INACTIVE) {
srv_mark_sess_killed(sess, OG_FALSE, sess->knl_session.serial_id);
}
}
}
static void srv_instance_request(void)
{
cm_reset_error();
switch (g_instance->kernel.switch_ctrl.request) {
case SWITCH_REQ_DEMOTE:
srv_process_demote_request();
break;
case SWITCH_REQ_PROMOTE:
srv_process_promote_request();
break;
case SWITCH_REQ_FAILOVER_PROMOTE:
case SWITCH_REQ_FORCE_FAILOVER_PROMOTE:
srv_process_force_promote_request();
break;
case SWITCH_REQ_RAFT_PROMOTE:
srv_process_force_raft_promote_request();
break;
case SWITCH_REQ_READONLY:
srv_process_readonly_request();
break;
case SWITCH_REQ_CANCEL_UPGRADE:
srv_process_cancel_upgrade_request();
break;
default:
break;
}
}
status_t srv_instance_loop(void)
{
int64 periods = 0;
int64 period_one_day = MILLISECS_PER_SECOND * SECONDS_PER_DAY / 5;
g_instance->shutdown_ctx.enabled = OG_TRUE;
while (1) {
if (g_instance->shutdown_ctx.phase == SHUTDOWN_PHASE_DONE) {
srv_close_threads(OG_FALSE);
srv_deinit_resource();
return OG_SUCCESS;
}
if (g_instance->lsnr_abort_status == OG_TRUE) {
exec_abnormal_terminal();
return OG_SUCCESS;
}
srv_expire_unauth_timeout_session();
if (periods == period_one_day && IS_SSL_ENABLED) {
periods = 0;
OG_RETURN_IFERR(srv_ssl_expire_warning());
}
#ifndef WIN32
srv_terminal_zombie_session();
#endif
OG_RETURN_IFERR(srv_adjust_log_sender());
srv_process_record_backup_task();
srv_instance_request();
cm_sleep(5);
periods++;
}
}
static status_t srv_kernel_startup(bool32 is_coordinator)
{
knl_instance_t *kernel = &g_instance->kernel;
kernel->attr.xpurpose_buf = cm_aligned_buf(g_instance->xpurpose_buf);
kernel->attr.config = &g_instance->config;
kernel->attr.timer = g_timer();
kernel->attr.max_sessions = g_instance->session_pool.expanded_max_sessions;
OG_LOG_RUN_INF("begin start kernel.");
knl_init_attr(kernel);
kernel->home = g_instance->home;
kernel->id = kernel->dtc_attr.inst_id;
g_local_inst_id = kernel->id;
if (alck_init_ctx(kernel) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_START_INSTANCE_ERROR);
return OG_ERROR;
}
kernel->id = kernel->dtc_attr.inst_id;
if (OG_SUCCESS != knl_startup(kernel)) {
OG_THROW_ERROR(ERR_START_INSTANCE_ERROR);
return OG_ERROR;
}
OG_LOG_RUN_INF("kernel startup finish.");
return OG_SUCCESS;
}
static status_t srv_kernel_open(db_startup_phase_t phase)
{
knl_instance_t *kernel = &g_instance->kernel;
session_pool_t *pool = &g_instance->session_pool;
knl_session_t *knl_session = kernel->sessions[SESSION_ID_KERNEL];
session_t *session = pool->sessions[SESSION_ID_KERNEL];
knl_alterdb_def_t def;
status_t ret;
MEMS_RETURN_IFERR(memset_s(&def, sizeof(knl_alterdb_def_t), 0, sizeof(knl_alterdb_def_t)));
if (phase == STARTUP_MOUNT) {
def.action = STARTUP_DATABASE_MOUNT;
} else {
def.action = STARTUP_DATABASE_OPEN;
}
sql_begin_exec_stat(session);
ret = knl_alter_database(knl_session, &def);
sql_end_exec_stat(session);
return ret;
}
static status_t srv_init_resource_manager(knl_handle_t sess)
{
char *plan_name = srv_get_param("RESOURCE_PLAN");
biqueue_init(&g_instance->rsrc_mgr.free_plans);
knl_session_t *session = (knl_session_t *)sess;
if (cm_event_init(&g_instance->rsrc_mgr.event) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_CREATE_EVENT, cm_get_os_error());
return OG_ERROR;
}
if (CM_IS_EMPTY_STR(plan_name)) {
g_instance->rsrc_mgr.plan = NULL;
g_instance->rsrc_mgr.started = OG_FALSE;
return rsrc_calc_cpuset(session->kernel->attr.cpu_bind_lo, session->kernel->attr.cpu_bind_hi, NULL);
}
if (rsrc_load_plan(session, plan_name, &g_instance->rsrc_mgr.plan) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Load resource plan failed, plan = %s", plan_name);
return OG_ERROR;
}
if (srv_init_vmem_pool() != OG_SUCCESS) {
OG_LOG_RUN_ERR("failed to initialize temp pool");
return OG_ERROR;
}
return rsrc_start_manager(&g_instance->rsrc_mgr);
}
status_t srv_init_resource_manager_null(knl_handle_t sess)
{
return OG_SUCCESS;
}
status_t srv_callback_null_func2_uid(knl_handle_t knl_session, uint32 uid)
{
return OG_SUCCESS;
}
status_t srv_callback_null_func2_dc(knl_handle_t knl_session, knl_dictionary_t *dc)
{
return OG_SUCCESS;
}
void srv_callback_null_func2_dc_void(knl_handle_t knl_session, knl_dictionary_t *dc)
{
}
status_t srv_callback_null_func3_text(knl_handle_t sess, uint32 uid, text_t *syn_name)
{
return OG_SUCCESS;
}
status_t srv_callback_null_func3_ptr(knl_handle_t sess, uint32 uid, void *syn_name)
{
return OG_SUCCESS;
}
status_t srv_callback_null_func4_dc_text(knl_handle_t knl_session, knl_dictionary_t *dc, text_t *name, text_t *new_name)
{
return OG_SUCCESS;
}
void rsrc_accumulate_io_null(knl_handle_t sess, io_type_t type)
{
return;
}
static status_t init_sql_maps_null(knl_handle_t sess)
{
return OG_SUCCESS;
}
status_t sql_update_dependant_status_null(knl_handle_t sess, obj_info_t *obj)
{
return OG_SUCCESS;
}
static status_t ogstore_raw_device_op_init_null(const char *conn_path)
{
return OG_SUCCESS;
}
static void srv_set_kernel_callback_ex(void)
{
g_knl_callback.set_stmt_check = sql_set_stmt_check;
g_knl_callback.before_commit = (knl_before_commit_t)knl_clean_before_commit;
g_knl_callback.alloc_knl_session = srv_alloc_knl_session;
g_knl_callback.release_knl_session = srv_release_knl_session;
g_knl_callback.parse_check_from_text = sql_parse_check_from_text;
g_knl_callback.parse_default_from_text = sql_parse_default_from_text;
g_knl_callback.verify_default_from_text = sql_verify_default_from_text;
g_knl_callback.update_depender = sql_update_depender_status;
g_knl_callback.accumate_io = rsrc_accumate_io;
g_knl_callback.init_resmgr = srv_init_resource_manager;
g_knl_callback.import_rows = sql_try_import_rows;
g_knl_callback.sysdba_privilege = srv_sysdba_privilege;
g_knl_callback.have_ssl = srv_have_ssl;
g_knl_callback.clear_sym_cache = NULL;
g_knl_callback.get_func_index_size = sql_get_func_index_expr_size;
g_knl_callback.compare_index_expr = sql_compare_index_expr;
g_knl_callback.whether_login_with_user = srv_whether_login_with_user;
g_knl_callback.pl_drop_synonym_by_user = pl_drop_synonym_by_user;
g_knl_callback.init_vmc = sql_init_mtrl_vmc;
g_knl_callback.get_ddl_sql = sql_get_ddl_sql;
g_knl_callback.convert_char = sql_convert_char_cb;
g_knl_callback.device_init = ogstore_raw_device_op_init_null;
}
static void srv_set_kernel_callback(void)
{
g_knl_callback.exec_default = sql_exec_default;
g_knl_callback.set_vm_lob_to_knl = sql_set_vm_lob_to_knl;
g_knl_callback.keep_stack_variant = sql_keep_stack_var;
g_knl_callback.alloc_rm = srv_alloc_rm;
g_knl_callback.release_rm = srv_release_rm;
g_knl_callback.alloc_auton_rm = srv_alloc_auton_rm;
g_knl_callback.release_auton_rm = srv_release_auton_rm;
g_knl_callback.get_xa_xid = srv_get_xa_xid;
g_knl_callback.add_xa_xid = srv_add_xa_xid;
g_knl_callback.delete_xa_xid = srv_delete_xa_xid;
g_knl_callback.attach_suspend_rm = srv_attach_suspend_rm;
g_knl_callback.detach_suspend_rm = srv_detach_suspend_rm;
g_knl_callback.attach_pending_rm = srv_attach_pending_rm;
g_knl_callback.detach_pending_rm = srv_detach_pending_rm;
g_knl_callback.shrink_xa_rms = srv_shrink_xa_rms;
g_knl_callback.load_scripts = sql_load_scripts;
g_knl_callback.exec_sql = (knl_exec_sql_t)sql_execute_directly2;
g_knl_callback.invalidate_cursor = clean_open_cursors;
g_knl_callback.invalidate_temp_cursor = clean_open_temp_cursors;
g_knl_callback.invalidate_space = invalidate_tablespaces;
g_knl_callback.decode_check_cond = sr_decode_cond;
g_knl_callback.match_cond_tree = sql_match_cond_tree;
g_knl_callback.dc_recycle_external = dc_recycle_external;
g_knl_callback.pl_init = pl_init;
g_knl_callback.pl_drop_object = pl_drop_object_by_user;
g_knl_callback.pl_db_drop_triggers = pl_db_drop_triggers;
g_knl_callback.pl_update_tab_from_sysproc = pl_update_source_for_trigs;
g_knl_callback.pl_free_trig_entity_by_tab = pl_free_trig_entity_by_tab;
g_knl_callback.pl_drop_triggers_entry = pl_drop_triggers_entry;
g_knl_callback.pl_logic_log_replay = pl_logic_log_replay;
g_knl_callback.exec_check = sql_execute_check;
g_knl_callback.func_idx_exec = sql_exec_index_col_func;
g_knl_callback.kill_session = srv_mark_sess_killed;
g_knl_callback.init_sql_maps = init_sql_maps_null;
g_knl_callback.get_sql_text = srv_get_sql_text;
g_knl_callback.set_min_scn = srv_set_min_scn;
srv_set_kernel_callback_ex();
}
static status_t srv_get_home(void)
{
bool32 exist;
char home[OG_MAX_PATH_BUFFER_SIZE];
if (g_database_home == NULL) {
g_database_home = getenv(OG_ENV_HOME);
if (g_database_home == NULL) {
OG_THROW_ERROR(ERR_HOME_PATH_NOT_FOUND, OG_ENV_HOME);
return OG_ERROR;
}
}
OG_RETURN_IFERR(realpath_file(g_database_home, home, OG_MAX_PATH_BUFFER_SIZE));
exist = cm_check_exist_special_char(home, (uint32)strlen(home));
if (exist) {
OG_THROW_ERROR(ERR_INVALID_DIR, OG_ENV_HOME);
return OG_ERROR;
}
exist = cm_dir_exist(home);
if (!exist) {
OG_THROW_ERROR(ERR_HOME_PATH_NOT_FOUND, OG_ENV_HOME);
return OG_ERROR;
}
cm_trim_home_path(g_database_home, (uint32)strlen(g_database_home));
MEMS_RETURN_IFERR(strncpy_s(g_instance->home, OG_MAX_PATH_BUFFER_SIZE, g_database_home, strlen(g_database_home)));
return OG_SUCCESS;
}
char *startup_mode(db_startup_phase_t phase)
{
switch (phase) {
case STARTUP_NOMOUNT:
return "nomount";
case STARTUP_MOUNT:
return "mount";
default:
return "normal";
}
}
static void srv_reg_os_rinfo(void)
{
for (int i = 0; i < TOTAL_OS_RUN_INFO_TYPES; i++) {
g_instance->os_rinfo[i].desc = &g_os_stat_desc_array[i];
}
}
static void srv_init_lob_locator(void)
{
g_instance->sql.sql_lob_locator_size = MAX(KNL_LOB_LOCATOR_SIZE, VM_LOB_LOCATOR_SIZE);
}
static void srv_init_uuid_info(st_uuid_info_t *uuid_info)
{
uuid_info->lock = 0;
uuid_info->self_increase_seq = cm_random(OG_MAX_RAND_RANGE);
cm_init_mac_address(uuid_info->mac_address, OG_MAC_ADDRESS_LEN);
}
static status_t srv_init_g_instance(void)
{
errno_t errcode;
g_instance = (instance_t *)malloc(sizeof(instance_t));
if (g_instance == NULL) {
OG_THROW_ERROR(ERR_ALLOC_MEMORY, (uint64)sizeof(instance_t), "creating instance");
return OG_ERROR;
}
errcode = memset_s(g_instance, sizeof(instance_t), 0, sizeof(instance_t));
if (errcode != EOK) {
CM_FREE_PTR(g_instance);
OG_THROW_ERROR(ERR_SYSTEM_CALL, (errcode));
return OG_ERROR;
}
g_instance->lock_fd = -1;
g_instance->inc_rebalance = OG_FALSE;
g_instance->frozen_starttime = 0;
g_instance->frozen_waittime = 0;
cm_rand((uchar *)g_instance->rand_for_md5, OG_KDF2SALTSIZE);
srv_init_uuid_info(&g_instance->g_uuid_info);
errcode = memset_s(g_dtc, sizeof(dtc_instance_t), 0, sizeof(dtc_instance_t));
if (errcode != EOK) {
CM_FREE_PTR(g_instance);
OG_THROW_ERROR(ERR_SYSTEM_CALL, (errcode));
return OG_ERROR;
}
g_dtc->kernel = &g_instance->kernel;
shutdown_context_t *ogx = &g_instance->shutdown_ctx;
dls_init_latch(&(ogx->shutdown_latch), DR_TYPE_SHUTDOWN, 0, 0);
return OG_SUCCESS;
}
status_t srv_sysdba_privilege()
{
if (GET_ENABLE_SYSDBA_LOGIN) {
if (srv_init_sysdba_privilege() != OG_SUCCESS) {
srv_instance_destroy();
OG_LOG_RUN_ERR("[Privilege] failed to init Sysdba Privilege");
return OG_ERROR;
}
} else {
(void)srv_remove_sysdba_privilege();
}
return OG_SUCCESS;
}
static status_t srv_init_par_sessions(void)
{
sql_par_pool_t *par_pool = &g_instance->sql_par_pool;
uint32 i;
uint32 id;
par_pool->lock = 0;
par_pool->used_sessions = 0;
for (i = 0; i < par_pool->max_sessions; i++) {
OG_RETURN_IFERR(srv_alloc_reserved_session(&id));
g_instance->session_pool.sessions[id]->type = SESSION_TYPE_SQL_PAR;
g_instance->session_pool.sessions[id]->knl_session.match_cond = sql_match_cond;
par_pool->sessions[i] = g_instance->session_pool.sessions[id];
}
g_instance->kernel.reserved_sessions += par_pool->max_sessions;
par_pool->par_stat.parallel_executions = 0;
par_pool->par_stat.under_trans_cnt = 0;
par_pool->par_stat.res_limited_cnt = 0;
par_pool->par_stat.break_proc_cnt = 0;
return OG_SUCCESS;
}
static status_t srv_lock_db(void)
{
char file_name[OG_FILE_NAME_BUFFER_SIZE] = { 0 };
PRTS_RETURN_IFERR(snprintf_s(file_name, OG_FILE_NAME_BUFFER_SIZE, OG_FILE_NAME_BUFFER_SIZE - 1, "%s/%s",
g_instance->home, g_lock_file));
if (cm_open_file(file_name, O_CREAT | O_RDWR | O_BINARY, &g_instance->lock_fd) != OG_SUCCESS) {
return OG_ERROR;
}
status_t ret = cm_lock_fd(g_instance->lock_fd);
if (ret != OG_SUCCESS) {
cm_show_lock_info(g_instance->lock_fd);
}
return ret;
}
void srv_unlock_db(void)
{
OG_LOG_RUN_INF("Unlock db lock at %d.", g_instance->lock_fd);
if (g_instance->lock_fd != -1) {
cm_unlock_fd(g_instance->lock_fd);
cm_close_file(g_instance->lock_fd);
}
}
static void srv_setup_json_mpool(void)
{
g_instance->sql.json_mpool.lock = 0;
g_instance->sql.json_mpool.used_json_dyn_buf = 0;
}
static void srv_set_locale(void)
{
if (setlocale(LC_ALL, "") == NULL) {
g_instance->is_setlocale_success = OG_FALSE;
OG_LOG_RUN_ERR("Set locale failed, errno %d", errno);
printf("Set locale failed, errno %d\n", errno);
return;
}
g_instance->is_setlocale_success = OG_TRUE;
}
void srv_thread_exit(thread_t *thread, session_t *session)
{
agent_t *agent = session->agent;
cm_release_thread(thread);
srv_unbind_sess_agent(session, agent);
srv_release_session(session);
srv_free_agent_res(agent, OG_FALSE);
CM_FREE_PTR(agent);
}
char g_cpu_info_str[CPU_INFO_STR_SIZE];
int g_cpu_info[CPU_SEG_MAX_NUM][SMALL_RECORD_SIZE];
int g_cpu_group_num = 0;
cpu_set_t g_masks[CPU_SEG_MAX_NUM];
int get_cpu_group_num(void)
{
return g_cpu_group_num;
}
cpu_set_t* get_cpu_masks(void)
{
return g_masks;
}
char *get_g_cpu_info(void)
{
return g_cpu_info_str;
}
static int init_cpu_mask(char *cpu_info_str, int *cpu_group_num, int cpu_info[CPU_SEG_MAX_NUM][SMALL_RECORD_SIZE])
{
errno_t errcode;
if (cpu_info_str[0] == '0' && strlen(cpu_info_str) == 1) {
return OG_SUCCESS;
}
char *p = NULL;
char *str = strtok_r(cpu_info_str, " ", &p);
char cpu_group_str[CPU_SEG_MAX_NUM][SMALL_RECORD_SIZE];
while (str != NULL) {
errcode = strcpy_s(cpu_group_str[(*cpu_group_num)++], SMALL_RECORD_SIZE, str);
MEMS_RETURN_IFERR(errcode);
str = strtok_r(NULL, " ", &p);
}
for (int i = 0; i < *cpu_group_num; i++) {
char *cpu_p = NULL;
char cpu_group_str_cp[SMALL_RECORD_SIZE];
errcode = strcpy_s(cpu_group_str_cp, SMALL_RECORD_SIZE, cpu_group_str[i]);
MEMS_RETURN_IFERR(errcode);
char *cpu_str = strtok_r(cpu_group_str_cp, ",", &cpu_p);
int count = 0;
while (cpu_str != NULL) {
int s = 0, e = 0;
int num = sscanf_s(cpu_str, "%d-%d", &s, &e);
if (num == 1) {
e = s;
} else if (num != 2) {
OG_LOG_RUN_ERR("cpu configuration error, num = %d, s = %d, e = %d, should be like \"0-3\" or \"0\", but \"%s\"",
num, s, e, cpu_str);
return OG_ERROR;
}
for (int j = s; j <= e; j++) {
cpu_info[i][count++] = j;
}
cpu_str = strtok_r(NULL, ",", &cpu_p);
}
cpu_info[i][count] = -1;
}
return OG_SUCCESS;
}
static void set_cpu_mask(void)
{
for (int i = 0; i < g_cpu_group_num; i++) {
cpu_set_t mask;
CPU_ZERO(&mask);
for (int j = 0; j < SMALL_RECORD_SIZE; j++) {
if (g_cpu_info[i][j] >= 0) {
CPU_SET(g_cpu_info[i][j], &mask);
} else {
break;
}
}
g_masks[i] = mask;
}
}
static int init_cpu_info(void)
{
if (init_cpu_mask(g_cpu_info_str, &g_cpu_group_num, g_cpu_info) != 0 || g_cpu_group_num == 0) {
OG_LOG_RUN_ERR("g_cpu_group_num init error, g_cpu_group_num is %d", g_cpu_group_num);
return OG_ERROR;
}
set_cpu_mask();
return OG_SUCCESS;
}
status_t srv_instance_startup(db_startup_phase_t phase, bool32 is_coordinator, bool32 is_datanode, bool32 is_gts)
{
#ifdef WIN32
if (OG_SUCCESS != epoll_init()) {
printf("Failed to initialize epoll");
return OG_ERROR;
}
#endif
if (cm_start_timer(g_timer()) != OG_SUCCESS) {
printf("Aborted due to starting timer thread");
return OG_ERROR;
}
if (srv_init_g_instance() != OG_SUCCESS) {
return OG_ERROR;
}
(void)srv_setup_json_mpool();
(void)srv_set_locale();
(void)srv_init_ip_white();
(void)srv_init_pwd_black();
(void)srv_init_ip_login_addr();
if (srv_get_home() != OG_SUCCESS) {
CM_FREE_PTR(g_instance);
printf("%s\n", "Failed to get home");
return OG_ERROR;
}
if (srv_load_params() != OG_SUCCESS) {
CM_FREE_PTR(g_instance);
int32 error_code;
const char *error_message;
source_location_t error_location;
cm_get_error(&error_code, &error_message, &error_location);
OG_LOG_RUN_ERR("failed to load params");
if (error_code == 0) {
printf("Failed to load params\n");
} else {
printf("Failed to load params:%s\n", error_message);
}
return OG_ERROR;
}
if (init_cpu_info() != OG_SUCCESS) {
srv_instance_destroy();
OG_LOG_RUN_ERR("failed to initialize CPU instance resorces");
return OG_ERROR;
}
(void)cm_lic_init();
srv_print_params();
if (srv_lock_db() != OG_SUCCESS) {
CM_FREE_PTR(g_instance);
OG_LOG_RUN_ERR("Another db is running");
return OG_ERROR;
}
OG_LOG_RUN_INF("starting instance(%s), memory usage(%lu)", startup_mode(phase), cm_print_memory_usage());
printf("starting instance(%s)\n", startup_mode(phase));
if (sql_load_self_func() != OG_SUCCESS) {
CM_FREE_PTR(g_instance);
OG_LOG_RUN_ERR("failed to load self function list");
return OG_ERROR;
}
sql_print_self_func();
if (srv_load_hba(OG_TRUE) != OG_SUCCESS) {
CM_FREE_PTR(g_instance);
OG_LOG_RUN_ERR("failed to load hba");
return OG_ERROR;
}
if (srv_load_pbl(OG_TRUE) != OG_SUCCESS) {
CM_FREE_PTR(g_instance);
OG_LOG_RUN_ERR("failed to load pbl");
return OG_ERROR;
}
#ifndef WIN32
if (sigcap_handle_reg() != OG_SUCCESS) {
CM_FREE_PTR(g_instance);
OG_LOG_RUN_ERR("Failed to initialize SIGSEGV func");
return OG_ERROR;
}
#endif
srv_set_kernel_callback();
srv_reg_os_rinfo();
srv_regist_dynamic_views();
if (srv_create_sga() != OG_SUCCESS) {
CM_FREE_PTR(g_instance);
OG_LOG_RUN_ERR("failed to create sga");
return OG_ERROR;
}
if (sql_instance_startup() != OG_SUCCESS) {
srv_destroy_sga();
CM_FREE_PTR(g_instance);
OG_LOG_RUN_ERR("failed to startup sql instance");
return OG_ERROR;
}
if (sql_load_type_map() != OG_SUCCESS) {
srv_destroy_sga();
CM_FREE_PTR(g_instance);
OG_LOG_RUN_ERR("failed to load type map list");
return OG_ERROR;
}
if (srv_init_session_pool() != OG_SUCCESS) {
srv_destroy_sga();
CM_FREE_PTR(g_instance);
OG_LOG_RUN_ERR("failed to init session pool");
return OG_ERROR;
}
if (srv_kernel_startup(is_coordinator) != OG_SUCCESS) {
knl_shutdown(NULL, &g_instance->kernel, OG_FALSE);
srv_destroy_sga();
srv_destory_reserved_session();
CM_FREE_PTR(g_instance);
OG_LOG_RUN_ERR("failed to startup kernel");
return OG_ERROR;
}
in knl_init_attr */
if (srv_init_par_sessions() != OG_SUCCESS) {
knl_shutdown(NULL, &g_instance->kernel, OG_FALSE);
srv_destroy_sga();
srv_destory_reserved_session();
CM_FREE_PTR(g_instance);
OG_LOG_RUN_ERR("failed to init parallel sessions");
return OG_ERROR;
}
in knl_init_attr */
if (srv_init_emerg_sessions() != OG_SUCCESS) {
knl_shutdown(NULL, &g_instance->kernel, OG_FALSE);
srv_destroy_sga();
srv_destory_reserved_session();
CM_FREE_PTR(g_instance);
OG_LOG_RUN_ERR("failed to init emerg sessions");
return OG_ERROR;
}
srv_init_lob_locator();
if (reactor_create_pool() != OG_SUCCESS) {
knl_shutdown(NULL, &g_instance->kernel, OG_FALSE);
srv_destroy_sga();
srv_destory_reserved_session();
CM_FREE_PTR(g_instance);
OG_LOG_RUN_ERR("failed to create reactor pool");
return OG_ERROR;
}
if (srv_start_lsnr() != OG_SUCCESS) {
knl_shutdown(NULL, &g_instance->kernel, OG_FALSE);
reactor_destroy_pool();
srv_destroy_sga();
srv_destory_reserved_session();
CM_FREE_PTR(g_instance);
OG_LOG_RUN_ERR("failed to start lsnr");
return OG_ERROR;
}
if (srv_sysdba_privilege() != OG_SUCCESS) {
return OG_ERROR;
}
if (srv_init_ssl_communication() != OG_SUCCESS) {
srv_instance_destroy();
OG_LOG_RUN_ERR("failed to init SSL communication");
return OG_ERROR;
}
g_instance->kernel.is_ssl_initialized = OG_TRUE;
if (phase == STARTUP_MOUNT || phase == STARTUP_OPEN) {
if (srv_kernel_open(phase) != OG_SUCCESS) {
srv_instance_destroy();
OG_LOG_RUN_ERR("failed to open kernel");
return OG_ERROR;
}
}
g_instance->lsnr_abort_status = OG_FALSE;
if (cm_regist_signal(SIGUSR1, handle_signal_terminal) != OG_SUCCESS) {
srv_instance_destroy();
OG_LOG_RUN_ERR("failed to initialize SIGUSR1 func");
return OG_ERROR;
}
if (init_job_manager() != OG_SUCCESS) {
srv_instance_destroy();
OG_LOG_RUN_ERR("failed to initialize job manager");
printf("Aborted due to initialize job manager");
return OG_ERROR;
}
#ifndef WIN32
if (DB_IS_RAFT_ENABLED(&(g_instance->kernel))) {
if (sigcap_handle_reg() != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to initialize SIGSEGV func in raft mode");
}
}
if (sigcap_handle_reg() != OG_SUCCESS) {
srv_instance_destroy();
OG_LOG_RUN_ERR("Failed to initialize SIGSEGV func");
return OG_ERROR;
}
#endif
OG_LOG_RUN_INF("instance started, memory usage(%lu)", cm_print_memory_usage());
printf("%s\n", "instance started");
fflush(stdout);
return OG_SUCCESS;
}
void srv_instance_destroy(void)
{
while (1) {
if (g_instance->shutdown_ctx.phase == SHUTDOWN_PHASE_DONE || cm_spin_try_lock(&g_instance->kernel.db.lock)) {
break;
}
cm_sleep(SHUTDOWN_WAIT_INTERVAL);
OG_LOG_RUN_INF("wait for shutdown to complete");
}
if (g_instance->shutdown_ctx.phase == SHUTDOWN_PHASE_DONE) {
return;
}
g_instance->shutdown_ctx.phase = SHUTDOWN_PHASE_INPROGRESS;
g_instance->shutdown_ctx.session = NULL;
g_instance->shutdown_ctx.mode = SHUTDOWN_MODE_ABORT;
if (OG_SUCCESS != srv_wait_agents_done()) {
OG_LOG_RUN_INF("kill canceled, all listener resumed");
reactor_resume_pool();
srv_resume_lsnr(LSNR_TYPE_ALL);
g_instance->shutdown_ctx.phase = SHUTDOWN_PHASE_NOT_BEGIN;
cm_spin_unlock(&g_instance->kernel.db.lock);
return;
}
srv_close_threads(OG_TRUE);
srv_deinit_resource();
g_instance->shutdown_ctx.phase = SHUTDOWN_PHASE_DONE;
CM_FREE_PTR(g_instance);
}
status_t srv_stop_all_session(shutdown_context_t *ogx)
{
knl_session_t *session = g_instance->kernel.sessions[SESSION_ID_KERNEL];
switch (ogx->mode) {
case SHUTDOWN_MODE_IMMEDIATE:
if (session->kernel->db.status == DB_STATUS_RECOVERY) {
OG_THROW_ERROR(ERR_RESTORE_IN_PROGRESS);
return OG_ERROR;
}
srv_kill_all_session(ogx->session, OG_FALSE);
case SHUTDOWN_MODE_NORMAL:
if (session->kernel->db.status == DB_STATUS_RECOVERY) {
OG_THROW_ERROR(ERR_RESTORE_IN_PROGRESS);
return OG_ERROR;
}
if (session->kernel->lrcv_ctx.session != NULL) {
lrcv_close(session);
}
return srv_wait_all_session_be_killed(ogx->session);
case SHUTDOWN_MODE_ABORT:
srv_kill_all_session(ogx->session, OG_TRUE);
return srv_wait_all_session_be_killed(ogx->session);
default:
OG_THROW_ERROR(ERR_INVALID_OPERATION, "shutdown signal");
return OG_ERROR;
}
}
static status_t srv_wait_agents_done(void)
{
shutdown_context_t *ogx = &g_instance->shutdown_ctx;
OG_LOG_RUN_INF("begin to shutdown, mode %s", g_shutdown_mode_desc[ogx->mode]);
OG_LOG_RUN_INF("begin to pause all listener");
srv_pause_lsnr(LSNR_TYPE_ALL);
if (ogx->mode != SHUTDOWN_MODE_NORMAL) {
OG_LOG_RUN_INF("begin to pause reactor");
reactor_pause_pool();
}
if (ogx->session != NULL) {
OG_LOG_RUN_INF("begin to stop all session");
if (OG_SUCCESS != srv_stop_all_session(ogx)) {
OG_LOG_RUN_ERR("stop all session failed");
return OG_ERROR;
}
} else {
OG_LOG_RUN_INF("begin to stop all session without self session");
srv_kill_all_session(NULL, OG_FALSE);
srv_wait_all_session_free();
}
OG_LOG_RUN_INF("wait all agents done ended");
return OG_SUCCESS;
}
static void srv_close_threads(bool32 knl_flag)
{
shutdown_context_t *ogx = &g_instance->shutdown_ctx;
bool32 need_ckpt = OG_FALSE;
OG_LOG_RUN_INF("begin to stop all listener");
srv_stop_lsnr(LSNR_TYPE_ALL);
OG_LOG_RUN_INF("begin to stop reactor");
reactor_destroy_pool();
srv_close_emerg_agents();
OG_LOG_RUN_INF("begin to stop resource manager");
rsrc_stop_manager(&g_instance->rsrc_mgr);
if (knl_flag == OG_TRUE) {
need_ckpt = ogx->mode > SHUTDOWN_MODE_SIGNAL ? OG_FALSE : OG_TRUE;
OG_LOG_RUN_INF("begin to stop kernel");
knl_shutdown(NULL, &g_instance->kernel, need_ckpt);
}
}
static void srv_deinit_resource()
{
if (g_instance->ssl_acceptor_fd != NULL) {
OG_LOG_RUN_INF("begin to free ssl acceptor fd.");
cs_ssl_free_context(g_instance->ssl_acceptor_fd);
g_instance->ssl_acceptor_fd = NULL;
}
OG_LOG_RUN_INF("begin to destory reserved session.");
srv_destory_reserved_session();
OG_LOG_RUN_INF("begin to destory user session.");
srv_destory_session();
OG_LOG_RUN_INF("begin to destory sequence pool.");
OG_LOG_RUN_INF("begin to free memory occupied by SGA.");
srv_destroy_sga();
OG_LOG_RUN_INF("begin to free configuration buffer.");
cm_free_config_buf(&g_instance->config);
OG_LOG_RUN_INF("begin to free null row.");
sql_free_null_row();
OG_LOG_RUN_INF("begin to free ctrl buffer.");
cm_aligned_free(&g_instance->kernel.db.ctrl.buf);
OG_LOG_RUN_INF("finish to shutdown, mode %s, db lock %s", g_shutdown_mode_desc[g_instance->shutdown_ctx.mode],
g_instance->kernel.db.lock ? "true" : "false");
sql_auditlog_deinit(&(cm_log_param_instance()->audit_param));
}
status_t srv_shutdown_wait(session_t *session, shutdown_mode_t mode, shutdown_context_t *ogx)
{
knl_session_t *knl_session = g_instance->kernel.sessions[SESSION_ID_KERNEL];
bool32 is_prohibited = OG_TRUE;
if (!cm_spin_try_lock(&g_instance->kernel.db.lock)) {
OG_THROW_ERROR(ERR_SHUTDOWN_IN_PROGRESS, session->knl_session.id);
return OG_ERROR;
}
if (DB_IS_CLUSTER(&(session->knl_session)) && !DB_CLUSTER_NO_CMS) {
if (!rc_reform_trigger_disable()) {
cm_spin_unlock(&g_instance->kernel.db.lock);
OG_THROW_ERROR(ERR_SHUTDOWN_IN_PROGRESS, session->knl_session.id);
return OG_ERROR;
}
}
do {
cm_spin_lock(&g_instance->kernel.rcy_ctx.lock, NULL);
if (knl_session->kernel->rcy_ctx.is_working) {
g_instance->kernel.rcy_ctx.is_closing = OG_TRUE;
cm_spin_unlock(&g_instance->kernel.rcy_ctx.lock);
is_prohibited = OG_FALSE;
break;
}
cm_spin_unlock(&g_instance->kernel.rcy_ctx.lock);
if (g_instance->shutdown_ctx.enabled) {
is_prohibited = OG_FALSE;
}
} while (0);
if (is_prohibited) {
cm_spin_unlock(&g_instance->kernel.db.lock);
rc_reform_trigger_enable();
OG_THROW_ERROR(ERR_SHUTDOWN_IN_PROGRESS, session->knl_session.id);
return OG_ERROR;
}
step 1: wait other agents process done
step 2: release sql/sharding/kernel threads
step 3: free memory resource, for sga
step 2, step 3 should be processed at main thread */
ogx->phase = SHUTDOWN_PHASE_INPROGRESS;
ogx->session = session;
ogx->mode = mode;
if (OG_SUCCESS != srv_wait_agents_done()) {
OG_LOG_RUN_INF("shutdown canceled, all listener resumed");
reactor_resume_pool();
srv_resume_lsnr(LSNR_TYPE_ALL);
ogx->phase = SHUTDOWN_PHASE_NOT_BEGIN;
cm_spin_unlock(&g_instance->kernel.db.lock);
rc_reform_trigger_enable();
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t srv_shutdown(session_t *session, shutdown_mode_t mode)
{
shutdown_context_t *ogx = &g_instance->shutdown_ctx;
bool32 need_ckpt;
SYNC_POINT(session, "SP_B1_SHUTDOWN");
if (session->knl_session.rm->txn != NULL) {
if (session->knl_session.rm->txn->status != (uint8)XACT_END) {
OG_THROW_ERROR(ERR_SHUTDOWN_IN_TRANS);
return OG_ERROR;
}
}
if (DB_IS_CLUSTER(&(session->knl_session)) && !DB_CLUSTER_NO_CMS) {
if (!dls_latch_timed_x(&(session->knl_session), &(ogx->shutdown_latch), 1, NULL, OG_INVALID_ID32)) {
OG_LOG_RUN_WAR("sql get shutdown lock failed, other node in shutdown progress");
OG_THROW_ERROR(ERR_SHUTDOWN_IN_PROGRESS, session->knl_session.id);
return OG_ERROR;
}
OG_LOG_RUN_INF("sql get shutdown lock, execute shutdown");
}
if (srv_shutdown_wait(session, mode, ogx) != OG_SUCCESS) {
if (DB_IS_CLUSTER(&(session->knl_session))) {
dls_unlatch(&(session->knl_session), &(ogx->shutdown_latch), NULL);
}
return OG_ERROR;
}
need_ckpt = ogx->mode > SHUTDOWN_MODE_SIGNAL ? OG_FALSE : OG_TRUE;
OG_LOG_RUN_INF("begin to stop kernel");
knl_shutdown(NULL, &g_instance->kernel, need_ckpt);
CM_MFENCE;
ogx->phase = SHUTDOWN_PHASE_DONE;
OG_LOG_RUN_INF("end of stop kernel");
return OG_SUCCESS;
}
void srv_instance_abort()
{
shutdown_context_t *ogx = &g_instance->shutdown_ctx;
if (!cm_spin_try_lock(&g_instance->kernel.db.lock)) {
return;
}
if (g_instance->shutdown_ctx.phase != SHUTDOWN_PHASE_NOT_BEGIN) {
return;
}
ogx->session = NULL;
ogx->phase = SHUTDOWN_PHASE_INPROGRESS;
ogx->mode = SHUTDOWN_MODE_SIGNAL;
(void)srv_wait_agents_done();
srv_close_threads(OG_TRUE);
srv_deinit_resource();
ogx->phase = SHUTDOWN_PHASE_DONE;
}
bool32 srv_is_kernel_reserve_session(session_type_e type)
{
if (type == SESSION_TYPE_KERNEL_RESERVE) {
return OG_TRUE;
}
return OG_FALSE;
}
#ifdef __cplusplus
}
#endif