* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* cm_dbs_ulog.c
*
*
* IDENTIFICATION
* src/common/cm_dbs_ulog.c
*
* -------------------------------------------------------------------------
*/
#include "cm_dbs_module.h"
#include "cm_dbs_ulog.h"
#include <stdint.h>
#include <stdlib.h>
#include "cm_error.h"
#include "cm_dbs_map.h"
#include "cm_dbs_ctrl.h"
#include "cm_debug.h"
#include "cm_io_record.h"
#include "cm_dbstor.h"
#define CM_DBS_ULOG_FRAGMENT_RESERVATION SIZE_M(256)
#define CM_DBS_ULOG_UNSUPPORTED(op) OG_LOG_RUN_WAR("Unsupported operation(%s) for dbstor object(ulog).", op)
#define CM_DBS_ULOG_HEAD_SIZE 512
bool32 g_ulog_recycled = OG_FALSE;
uint64 g_recycle_retry_num = 0;
#define RECYCLE_FORCE_RETRY_MAX_NUM 10
int64 cm_dbs_ulog_seek(int32 handle, int64 offset, int32 origin)
{
CM_DBS_ULOG_UNSUPPORTED("seek");
return -1;
}
status_t cm_dbs_ulog_create(const char *name, int64 size, uint32 flags, int32 *handle)
{
int32 ret;
cm_dbs_map_item_s obj = { 0 };
UlogAttr attr = { 0 };
(void)flags;
ret = cm_dbs_get_ns_name(DEV_TYPE_ULOG, &attr.nsName);
if (ret == OG_ERROR) {
OG_LOG_RUN_ERR("Failed to get ulog ns id");
return OG_ERROR;
}
attr.appMode = ULOG_APP_WITH_LSN_MODE;
ret = dbs_global_handle()->create_ulog((char *)name, &attr, &obj.obj_id);
if (ret != 0) {
OG_LOG_RUN_ERR("Failed to create ulog %s", name);
return OG_ERROR;
}
obj.ns_name = attr.nsName;
obj.ulog.curr_lsn = 0;
obj.ulog.trun_lsn = 0;
status_t stat = cm_dbs_map_set(name, &obj, handle, DEV_TYPE_ULOG);
if (stat != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to insert ulog into map");
return OG_ERROR;
}
OG_LOG_DEBUG_INF("Create ulog(%s) success(%d).", name, *handle);
return OG_SUCCESS;
}
status_t cm_dbs_ulog_destroy(const char *name)
{
int32 ret;
UlogAttr attr = { 0 };
ret = cm_dbs_get_ns_name(DEV_TYPE_ULOG, &attr.nsName);
if (ret == OG_ERROR) {
OG_LOG_RUN_ERR("Failed to get ulog ns id");
return OG_ERROR;
}
ret = dbs_global_handle()->destroy_ulog((char *)name, &attr);
if (ret != 0) {
OG_LOG_RUN_ERR("Failed to destroy ulog %s", name);
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t cm_dbs_ulog_open(const char *name, int32 *handle, uint8 is_retry)
{
int32 ret;
cm_dbs_map_item_s obj = { 0 };
UlogAttr attr = { 0 };
ret = cm_dbs_get_ns_name(DEV_TYPE_ULOG, &attr.nsName);
if (ret == OG_ERROR) {
OG_LOG_RUN_ERR("Failed to get ulog ns id");
return OG_ERROR;
}
attr.appMode = ULOG_APP_WITH_LSN_MODE;
attr.isRetry = is_retry;
ret = dbs_global_handle()->open_ulog((char *)name, &attr, &obj.obj_id);
if (ret != 0) {
OG_LOG_RUN_ERR("Failed to open ulog");
return OG_ERROR;
}
obj.ns_name = attr.nsName;
obj.ulog.curr_lsn = attr.meta.ulogLsn.serverLsn;
obj.ulog.trun_lsn = attr.meta.ulogLsn.truncateLsn;
status_t stat = cm_dbs_map_set(name, &obj, handle, DEV_TYPE_ULOG);
if (stat != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to insert ulog into map");
return OG_ERROR;
}
OG_LOG_DEBUG_INF("Open ulog(%s, %lu, %lu) success(%d).", name, obj.ulog.trun_lsn, obj.ulog.curr_lsn, *handle);
return OG_SUCCESS;
}
void cm_dbs_ulog_close(int32 handle)
{
if (handle == -1) {
return;
}
cm_dbs_map_remove(handle);
OG_LOG_DEBUG_INF("Close ulog at handle(%d) successfully.", handle);
}
static void cm_dbs_ulog_readpartopt_init(ReadBatchLogOption *option, char* nsName, uint32_t partId,
uint64 startLsn, uint64 endLsn, uint32_t buffer_size)
{
LogLsn lsn = {0};
lsn.startLsn = startLsn;
lsn.endLsn = endLsn;
option->session.nsName = nsName;
option->opcode = ULOG_OP_READ_WITH_LSN;
option->view = ULOG_VIEW_ONLINE;
option->partId = partId;
option->callBack.ogx = NULL;
option->callBack.callback = NULL;
option->length = buffer_size;
option->lsn = lsn;
return;
}
status_t cm_dbs_get_used_cap(int32 handle, uint64_t startLsn, uint32_t *sizeKb, uint8 is_retry)
{
status_t ret;
cm_dbs_map_item_s obj = { 0 };
UlogAttr attr = { 0 };
LogLsn lsn = { 0 };
if (cm_dbs_map_get(handle, &obj) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to find ulog by handle(%d).", handle);
return OG_ERROR;
}
ret = cm_dbs_get_ns_name(DEV_TYPE_ULOG, &attr.nsName);
if (ret == OG_ERROR) {
OG_LOG_RUN_ERR("Failed to get ulog ns id");
return OG_ERROR;
}
attr.appMode = ULOG_APP_WITH_LSN_MODE;
attr.isRetry = is_retry;
lsn.startLsn = startLsn;
ret = dbs_global_handle()->get_ulog_used_cap(&obj.obj_id, &attr, lsn, ULOG_VIEW_ONLINE, sizeKb);
if (ret != 0) {
OG_LOG_RUN_ERR("Failed to get GetUlogUsedCap");
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t cm_dbs_ulog_capacity(int64 *capacity)
{
uint64_t ulog_cap = 0;
int32_t ret = dbs_global_handle()->get_ulog_init_capacity(&ulog_cap);
if (ret != 0) {
OG_LOG_RUN_ERR("Failed(%d) to get ulog init capacity from dbstor.", ret);
return OG_ERROR;
}
if (ulog_cap <= CM_DBS_ULOG_FRAGMENT_RESERVATION) {
OG_LOG_RUN_ERR("The capacity(%lu) of ULOG defined by DBStor is too small.", ulog_cap);
return OG_ERROR;
}
*capacity = (int64)(ulog_cap - CM_DBS_ULOG_FRAGMENT_RESERVATION);
return OG_SUCCESS;
}
int32 cm_dbs_ulog_align_size(int32 space_size)
{
uint32 tmp_size = space_size + CM_DBS_ULOG_HEAD_SIZE;
const uint32 round_size = SIZE_K(8);
return (tmp_size + (round_size - 1)) & (~(round_size - 1));
}
static inline status_t cms_check_part_list(LogPartitionList *partList, uint32_t num)
{
if (partList->num > num) {
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t cm_dbs_ulog_batch_read(int32 handle, uint64 startLsn, uint64 endLsn, void *buf,
int32 size, int32 *r_size, uint64 *outLsn)
{
uint32_t partId;
cm_dbs_map_item_s obj;
ReadBatchLogOption option = {0};
LogRecord logRecord;
ReadResult result;
char* nsName = NULL;
LogRecordList recordList = {0};
*r_size = 0;
if (cm_dbs_map_get(handle, &obj) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to find ulog by handle(%d).", handle);
return OG_ERROR;
}
int32 ret = cm_dbs_get_ns_name(DEV_TYPE_ULOG, &nsName);
if (ret == OG_ERROR) {
OG_LOG_RUN_ERR("Failed to get ulog ns id");
return OG_ERROR;
}
OG_LOG_DEBUG_INF("read ulog in start %llu in end %llu ", startLsn, endLsn);
partId = OG_INVALID_ID32;
cm_dbs_ulog_readpartopt_init(&option, obj.ns_name, partId, startLsn, endLsn, size);
logRecord.type = DBS_DATA_FORMAT_BUFFER;
logRecord.buf.buf = (char*)buf;
logRecord.buf.len = size;
logRecord.next = NULL;
recordList.cnt = 1;
recordList.recordList = &logRecord;
ret = dbs_global_handle()->read_ulog_record_list(&obj.obj_id, &option, &recordList, &result);
if (ret != 0 || result.result != 0) {
if (result.result == ULOG_READ_RETURN_REACH_MAX_BUF_LEN) {
*r_size = result.outLen;
if (result.outLen == 0) {
OG_LOG_RUN_ERR("[DBS] read ulog fail, need extend large.");
return OG_ERROR;
}
*outLsn = result.endLsn;
return OG_SUCCESS;
}
if (ret == ULOG_READ_RETURN_LSN_NOT_EXIST || result.result == ULOG_READ_RETURN_LSN_NOT_EXIST) {
OG_LOG_DEBUG_INF("read ulog fail, lsn not exist.");
*r_size = 0;
return OG_SUCCESS;
}
OG_LOG_RUN_ERR("[DBS] read ulog fail.");
return OG_ERROR;
}
*r_size = result.outLen;
*outLsn = result.endLsn;
return OG_SUCCESS;
}
static void cm_dbs_set_log_recycled()
{
g_ulog_recycled = OG_TRUE;
OG_LOG_RUN_ERR("ulog has been recycled, wait for sync log");
for (;;) {
cm_sleep(OG_INVALID_ID32);
}
return;
}
bool32 cm_dbs_log_recycled()
{
return g_ulog_recycled;
}
status_t cm_dbs_ulog_read(int32 handle, int64 startLsn, void *buf, int32 size, int32 *r_size)
{
int32 ret;
uint64_t tv_begin;
cm_dbs_map_item_s obj = { 0 };
ReadBatchLogOption option;
LogRecord logRecord;
ReadResult result = {0};
uint32_t partId;
LogRecordList recordList = {0};
if (cm_dbs_map_get(handle, &obj) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to find ulog by handle(%d).", handle);
return OG_ERROR;
}
partId = OG_INVALID_ID32;
cm_dbs_ulog_readpartopt_init(&option, obj.ns_name, partId, startLsn, OG_INVALID_ID64, size);
logRecord.type = DBS_DATA_FORMAT_BUFFER;
logRecord.buf.buf = (char*)buf;
logRecord.buf.len = size;
logRecord.next = NULL;
recordList.cnt = 1;
recordList.recordList = &logRecord;
oGRAC_record_io_stat_begin(IO_RECORD_EVENT_NS_BATCH_READ_ULOG, &tv_begin);
ret = dbs_global_handle()->read_ulog_record_list(&obj.obj_id, &option, &recordList, &result);
if (ret != OG_SUCCESS || result.result != OG_SUCCESS) {
if (ret == ULOG_READ_RETURN_LSN_NOT_EXIST) {
OG_LOG_DEBUG_WAR("LSN(%llu) not found.", startLsn);
ret = OG_SUCCESS;
} else if (ret == ULOG_READ_RETURN_REACH_MAX_BUF_LEN) {
OG_LOG_DEBUG_WAR("The buffer capacity is insufficient for LSN(%llu).", startLsn);
ret = OG_SUCCESS;
} else if (ret == ULOG_READ_RETURN_LSN_NOT_EXIST_SMALL) {
OG_LOG_RUN_ERR("LSN(%llu) not found, redo logs have been recycled", startLsn);
cm_dbs_set_log_recycled();
ret = OG_ERROR;
} else {
OG_LOG_RUN_ERR("Failed to read ulog ret:%u", ret);
ret = OG_ERROR;
}
}
oGRAC_record_io_stat_end(IO_RECORD_EVENT_NS_BATCH_READ_ULOG, &tv_begin);
*r_size = result.outLen;
return ret;
}
static void cm_dbs_ulog_writeopt_init(AppendOption *option, char* nsName, uint64 preLsn, uint64 startLsn)
{
option->opcode = ULOG_OP_APPEND_WITH_LSN;
option->lsn.preLsn = preLsn;
option->lsn.startLsn = startLsn;
option->lsn.endLsn = startLsn;
option->callBack.ogx = NULL;
option->callBack.callback = NULL;
MEMS_RETVOID_IFERR(memset_s(&option->session, sizeof(SessionId), 0, sizeof(SessionId)));
option->session.nsName = nsName;
return;
}
status_t cm_dbs_ulog_write(int32 handle, int64 lsn, const void *buf, int32 size, uint64 *free_size)
{
int32 ret;
cm_dbs_map_item_s obj = { 0 };
AppendOption option = { 0 };
LogRecord logRecord = { 0 };
AppendResult result = { 0 };
if (cm_dbs_map_get(handle, &obj) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to find ulog by handle(%d).", handle);
return OG_ERROR;
}
cm_dbs_ulog_writeopt_init(&option, obj.ns_name, obj.ulog.curr_lsn, (uint64)lsn);
logRecord.type = DBS_DATA_FORMAT_BUFFER;
logRecord.buf.buf = (char *)buf;
logRecord.buf.len = size;
ret = dbs_global_handle()->append_ulog_record(&obj.obj_id, &option, &logRecord, &result);
if (ret != 0 || result.result != 0) {
OG_LOG_RUN_ERR("Failed(%d,%d) to write ulog(%d) with lsn(%lld).", ret, result.result, handle, lsn);
return OG_ERROR;
}
CM_ABORT(lsn == result.serverLsn, "The expected LSN(%lld) is not equal to the actual LSN(%lu).", lsn,
result.serverLsn);
obj.ulog.curr_lsn = result.serverLsn;
cm_dbs_map_update(handle, &obj);
if (free_size != NULL) {
*free_size = result.freeSize;
}
OG_LOG_DEBUG_INF("[CMDBS]Append ulog(%d) size(%d,%lu) with lsn(%lld) successfully.", handle, size, result.freeSize,
lsn);
return OG_SUCCESS;
}
uint64 cm_dbs_ulog_recycle(int32 handle, uint64 lsn)
{
cm_dbs_map_item_s obj = { 0 };
if (cm_dbs_map_get(handle, &obj) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to find ulog by handle(%d).", handle);
return 0;
}
if (obj.ulog.trun_lsn >= lsn) {
g_recycle_retry_num++;
if (g_recycle_retry_num < RECYCLE_FORCE_RETRY_MAX_NUM) {
return 0;
}
}
g_recycle_retry_num = 0;
TruncLogOption option = { 0 };
TruncResult result = { 0 };
option.opcode = ULOG_OP_TRUNCATE_WITH_LSN;
option.lsn = lsn;
option.session.nsName = obj.ns_name;
int32 ret = dbs_global_handle()->truncate_ulog(&obj.obj_id, &option, &result);
if (ret != 0) {
OG_LOG_RUN_ERR_LIMIT(LOG_PRINT_INTERVAL_SECOND_20, "Failed(%d) to truncate ulog by lsn(%llu).", ret, lsn);
return 0;
}
obj.ulog.trun_lsn = lsn;
cm_dbs_map_update(handle, &obj);
OG_LOG_DEBUG_INF("[CMDBS]Recycle ulog(%d) size(%lu) with lsn(%llu) successfully.", handle, result.freeSize, lsn);
return result.freeSize;
}
status_t cm_dbs_ulog_get_maxLsn(const char *name, uint64 *lsn)
{
int32 ret;
UlogId ulog_id;
UlogAttr attr = { 0 };
ret = cm_dbs_get_ns_name(DEV_TYPE_ULOG, &attr.nsName);
if (ret == OG_ERROR) {
OG_LOG_RUN_ERR("Failed to get ulog ns id");
return OG_ERROR;
}
attr.appMode = ULOG_APP_WITH_LSN_MODE;
ret = dbs_global_handle()->open_ulog((char *)name, &attr, &ulog_id);
if (ret != 0) {
OG_LOG_RUN_ERR("Failed to open ulog");
return OG_ERROR;
}
*lsn = attr.meta.ulogLsn.serverLsn;
return OG_SUCCESS;
}
bool32 cm_dbs_ulog_is_lsn_valid(int32 handle, uint64 lsn)
{
cm_dbs_map_item_s obj = { 0 };
if (cm_dbs_map_get(handle, &obj) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to find ulog by handle(%d).", handle);
return OG_FALSE;
}
return (lsn > obj.ulog.trun_lsn && lsn <= obj.ulog.curr_lsn) ? OG_TRUE : OG_FALSE;
}