* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* cm_dbs_pgpool.c
*
*
* IDENTIFICATION
* src/common/cm_dbs_pgpool.c
*
* -------------------------------------------------------------------------
*/
#include "cm_dbs_module.h"
#include "cm_dbs_pgpool.h"
#include <stdint.h>
#include <fcntl.h>
#include <semaphore.h>
#include "cm_spinlock.h"
#include "cm_log.h"
#include "cm_error.h"
#include "cm_dbs_map.h"
#include "cm_dbs_ctrl.h"
#include "cm_dbs_intf.h"
#include "cm_dbstor.h"
#define DBS_DEFAULT_POOL_SIZE (1 << 20)
#define DBS_PAGE_POOL_PART_SIZE SIZE_M(64)
int64 cm_dbs_pg_seek(int32 handle, int64 offset, int32 origin)
{
if (origin != SEEK_END) {
OG_LOG_RUN_ERR("Unsupported seek type(%d).", origin);
return -1;
}
PagePoolAttr attr = { 0 };
cm_dbs_map_item_s obj = { 0 };
if (cm_dbs_map_get(handle, &obj) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to find page pool by index %d", handle);
return -1;
}
int32 ret = strcpy_sp(attr.nsName, sizeof(attr.nsName), obj.ns_name);
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to copy ns name");
return OG_ERROR;
}
uint64_t capacity = 0;
ret = dbs_global_handle()->get_pagepool_logic_capacity(&obj.obj_id, &attr, &capacity);
if (ret != 0) {
OG_LOG_RUN_ERR("Failed(%d) to get pagepool logic capacity.", ret);
return -1;
}
return (int64)capacity;
}
status_t cm_dbs_pg_create(const char *name, int64 size, uint32 flags, int32 *handle)
{
int32 ret;
uint32 pageSize;
uint64 pgPoolSize = (uint64)size;
if (pgPoolSize == 0) {
pgPoolSize = DBS_DEFAULT_POOL_SIZE;
}
cm_dbs_map_item_s obj = { 0 };
PagePoolAttr attr = { 0 };
cm_dbs_cfg_s *cfg = cm_dbs_get_cfg();
if (flags == 0xFFFFFFFF) {
pageSize = cfg->ctrlFilePgSize;
} else {
pageSize = cfg->dataFilePgSize;
}
attr.pagePoolPartNum = cfg->partition_num;
attr.pageSize = pageSize;
attr.initSize = pgPoolSize;
char *nsName = NULL;
ret = cm_dbs_get_ns_name(DEV_TYPE_PGPOOL, &nsName);
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to get ns id");
return OG_ERROR;
}
ret = strcpy_sp(attr.nsName, sizeof(attr.nsName), nsName);
if (SECUREC_UNLIKELY(ret != EOK)) {
OG_THROW_ERROR(ERR_SYSTEM_CALL, ret);
return OG_ERROR;
}
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to copy ns name");
return OG_ERROR;
}
ret = dbs_global_handle()->create_pagepool((char *)name, &attr, &obj.obj_id);
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to create page pool %s", name);
return OG_ERROR;
}
ret = dbs_global_handle()->open_pagepool((char *)name, &attr, &obj.obj_id);
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to open page pool %s", name);
return OG_ERROR;
}
obj.pagepool.page_size = pageSize;
obj.ns_name = nsName;
ret = cm_dbs_map_set(name, &obj, handle, DEV_TYPE_PGPOOL);
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to insert page pool to map");
return OG_ERROR;
}
OG_LOG_DEBUG_INF("Create page pool %s, page size %u, pool size %llu success", name, pageSize, pgPoolSize);
return OG_SUCCESS;
}
status_t cm_dbs_pg_destroy(const char *name)
{
PagePoolAttr attr = { 0 };
char *nsName = NULL;
int32 ret = cm_dbs_get_ns_name(DEV_TYPE_PGPOOL, &nsName);
if (ret != 0) {
OG_LOG_RUN_ERR("Failed to get ns id");
return OG_ERROR;
}
ret = strcpy_sp(attr.nsName, sizeof(attr.nsName), nsName);
if (SECUREC_UNLIKELY(ret != EOK)) {
OG_THROW_ERROR(ERR_SYSTEM_CALL, ret);
return OG_ERROR;
}
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to copy ns name");
return OG_ERROR;
}
ret = dbs_global_handle()->destroy_pagepool((char *)name, &attr);
if (ret != 0) {
OG_LOG_RUN_ERR("Failed to detroy page pool");
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t cm_dbs_pg_open(const char *name, int32 *handle)
{
int32 ret;
cm_dbs_map_item_s obj = { 0 };
PagePoolAttr attr = { 0 };
char *nsName = NULL;
ret = cm_dbs_get_ns_name(DEV_TYPE_PGPOOL, &nsName);
if (ret == OG_ERROR) {
OG_LOG_DEBUG_ERR("Failed to Get ns id");
return OG_ERROR;
}
ret = strcpy_sp(attr.nsName, sizeof(attr.nsName), nsName);
if (SECUREC_UNLIKELY(ret != EOK)) {
OG_THROW_ERROR(ERR_SYSTEM_CALL, ret);
return OG_ERROR;
}
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to copy ns name");
return OG_ERROR;
}
ret = dbs_global_handle()->open_pagepool((char *)name, &attr, &obj.obj_id);
if (ret != 0) {
OG_LOG_DEBUG_ERR("Failed to open page pool name=%s", name);
return OG_ERROR;
}
obj.pagepool.page_size = attr.pageSize;
obj.ns_name = nsName;
status_t stat = cm_dbs_map_set(name, &obj, handle, DEV_TYPE_PGPOOL);
if (stat != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to insert page pool to map");
return OG_ERROR;
}
OG_LOG_DEBUG_INF("Open page pool(%s, %u) at handle(%d) successfully.", name, attr.pageSize, *handle);
return OG_SUCCESS;
}
void cm_dbs_pg_close(int32 handle)
{
cm_dbs_map_item_s obj = { 0 };
if (handle == -1) {
return;
}
status_t stat = cm_dbs_map_get(handle, &obj);
if (stat != OG_SUCCESS) {
OG_LOG_RUN_WAR("Failed to get pagepool object from cache by handle(%d).", handle);
return;
}
cm_dbs_map_remove(handle);
int32 ret = dbs_global_handle()->close_pagepool(&obj.obj_id);
if (ret != 0) {
OG_LOG_RUN_WAR("Failed(%d) to close pagepool from dbstor.", ret);
return;
}
OG_LOG_DEBUG_INF("Close pagepool at handle(%d) successfully.", handle);
}
static void cm_dbs_pg_init_opt(DbsPageOption *pgOpt, int32 pageSize, uint32_t opcode)
{
pgOpt->priority = 0;
pgOpt->opcode = opcode;
pgOpt->offset = 0;
pgOpt->length = pageSize;
pgOpt->lsn = 1;
pgOpt->callBack.cb = NULL;
pgOpt->callBack.ogx = NULL;
(void)memset_s(&pgOpt->session, sizeof(SessionId), 0, sizeof(SessionId));
return;
}
status_t cm_dbs_pg_read(int32 handle, int64 offset, void *buf, int32 size, int32 *read_size)
{
int32 ret;
int32 pageSize;
cm_dbs_map_item_s obj = { 0 };
DbsPageOption pgOpt;
PageValue pgValue;
DbsPageId startPageId;
uint32_t num;
if (cm_dbs_map_get(handle, &obj) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to find page pool by index %d", handle);
return OG_ERROR;
}
pageSize = obj.pagepool.page_size;
if ((offset % pageSize) != 0 || (size % pageSize) != 0) {
OG_LOG_RUN_ERR("Param is exception, offset %llu size %u", offset, size);
return OG_ERROR;
}
pgValue.buf.len = size;
pgValue.type = DBS_DATA_FORMAT_BUFFER;
cm_dbs_pg_init_opt(&pgOpt, pageSize, CS_PAGE_POOL_READ);
startPageId = offset / pageSize;
num = (size + pageSize - 1) / pageSize;
pgValue.buf.buf = (char *)buf;
ret = dbs_global_handle()->dbs_mget_page(&obj.obj_id, startPageId, num, &pgOpt, &pgValue);
if (ret != 0) {
char name[CSS_MAX_NAME_LEN + 1];
cm_dbs_map_get_name(handle, name, sizeof(name));
OG_LOG_RUN_ERR("Read page %s fail, offset:0x%llx", name, offset);
return OG_ERROR;
}
*read_size = size;
return OG_SUCCESS;
}
static void cm_dbs_print_err_info(int32 handle, uint32_t opcode, int64 offset, DbsPageId pgId, int32 size, int32 ret)
{
char name[CSS_MAX_NAME_LEN + 1] = { 0 };
cm_dbs_map_get_name(handle, name, sizeof(name));
if (opcode == CS_PAGE_POOL_WRITE) {
OG_LOG_RUN_ERR("Mput page %s fail, offset:0x%llx, pgId %lu, size %d, ret %d", name, offset, pgId, size, ret);
} else {
OG_LOG_RUN_ERR("Write page %s fail, offset:0x%llx, pgId %lu, size %d, ret %d", name, offset, pgId, size, ret);
}
}
static status_t cm_dbs_pg_comm_write(int32 handle, int64 offset, const void *buf, int32 size, uint32_t opcode,
uint32 *partid)
{
int32 ret;
uint32 pageSize;
cm_dbs_map_item_s obj = { 0 };
DbsPageOption pgOpt;
PageValue pgValue;
DbsPageId pgId;
int32 x = 0;
if (cm_dbs_map_get(handle, &obj) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to find page pool by index %d", handle);
return OG_ERROR;
}
if (obj.pagepool.page_size == 0) {
OG_LOG_RUN_ERR("Page size is invalid, page pool by index %d", handle);
return OG_ERROR;
}
pageSize = obj.pagepool.page_size;
if (((offset % pageSize) != 0) || ((size % pageSize) != 0)) {
OG_LOG_RUN_ERR("Offset %lld or size %d is invalid, page pool by index %d", offset, size, handle);
return OG_ERROR;
}
pgValue.type = DBS_DATA_FORMAT_BUFFER;
if (opcode == CS_PAGE_POOL_WRITE) {
pgValue.buf.len = size;
pgValue.buf.buf = (char *)buf;
cm_dbs_pg_init_opt(&pgOpt, size, opcode);
pgId = offset / pageSize;
ret = dbs_global_handle()->dbs_mput_continue_pages(&obj.obj_id, pgId, (size / pageSize), &pgOpt, &pgValue);
} else if (opcode == CS_PAGE_POOL_ASYNC_WRITE) {
pgValue.buf.len = pageSize;
cm_dbs_pg_init_opt(&pgOpt, pageSize, opcode);
for (x = 0; x < size; x += pageSize) {
pgId = (offset + x) / pageSize;
pgValue.buf.buf = (char *)buf + x;
ret = dbs_global_handle()->dbs_put_page_async(&obj.obj_id, pgId, &pgOpt, &pgValue, *partid);
if (ret != 0) {
break;
}
}
} else {
OG_LOG_RUN_ERR("Opcode %u is invalid", opcode);
return OG_ERROR;
}
if (ret != 0) {
cm_dbs_print_err_info(handle, opcode, offset, ((offset + x) / pageSize), size, ret);
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t cm_dbs_pg_cal_part_id(uint64 pgid, uint32 pageSize, uint32 *partid)
{
cm_dbs_cfg_s *cfg = cm_dbs_get_cfg();
if (cfg->partition_num <= 0) {
OG_LOG_RUN_ERR("The part capacity(%u) should not smaller than 1.", cfg->partition_num);
return OG_ERROR;
}
*partid = ((pgid * pageSize) / DBS_PAGE_POOL_PART_SIZE) % cfg->partition_num;
return OG_SUCCESS;
}
status_t cm_dbs_pg_write(int32 handle, int64 offset, const void *buf, int32 size)
{
return cm_dbs_pg_comm_write(handle, offset, buf, size, CS_PAGE_POOL_WRITE, NULL);
}
status_t cm_dbs_pg_asyn_write(int32 handle, int64 offset, const void *buf, int32 size, uint32 partid)
{
return cm_dbs_pg_comm_write(handle, offset, buf, size, CS_PAGE_POOL_ASYNC_WRITE, &partid);
}
status_t cm_dbs_sync_page(int32 handle, uint32 partid)
{
cm_dbs_map_item_s obj = { 0 };
if (cm_dbs_map_get(handle, &obj) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to find pgpool obj by handle(%d).", handle);
return OG_ERROR;
}
int32 ret = dbs_global_handle()->sync_page_by_part_index(&obj.obj_id, partid);
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed(%d) to sync page, partid %u.", ret, partid);
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t cm_dbs_pg_extend(int32 handle, int64 offset, int64 size)
{
cm_dbs_map_item_s obj = { 0 };
if (cm_dbs_map_get(handle, &obj) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to find pgpool obj by handle(%d).", handle);
return OG_ERROR;
}
PagePoolAttr attr = { 0 };
int32 ret = strcpy_sp(attr.nsName, sizeof(attr.nsName), obj.ns_name);
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to copy ns name");
return OG_ERROR;
}
ret = dbs_global_handle()->expand_pagepool_logic_capacity(&obj.obj_id, &attr, (uint64)offset, (uint64)size);
if (ret != 0) {
OG_LOG_RUN_ERR("Failed(%d) to extend pgpool(%lld) size(%lld).", ret, offset, size);
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t cm_dbs_pg_truncate(int32 handle, int64 keep_size)
{
if (keep_size < 0) {
OG_LOG_RUN_ERR("Failed to truncate pagepool because of invalid size(%lld).", keep_size);
return OG_ERROR;
}
cm_dbs_map_item_s obj = { 0 };
if (cm_dbs_map_get(handle, &obj) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to find pgpool obj by handle(%d).", handle);
return OG_ERROR;
}
PagePoolAttr attr = { 0 };
int32 ret = strcpy_sp(attr.nsName, sizeof(attr.nsName), obj.ns_name);
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to copy ns name");
return OG_ERROR;
}
uint64_t capacity = 0;
ret = dbs_global_handle()->get_pagepool_logic_capacity(&obj.obj_id, &attr, &capacity);
if (ret != 0) {
OG_LOG_RUN_ERR("Failed(%d) to get pagepool logic capacity.", ret);
return OG_ERROR;
}
uint64_t new_size = (uint64_t)keep_size;
if (capacity == new_size) {
return OG_SUCCESS;
}
if (capacity < new_size) {
ret = dbs_global_handle()->expand_pagepool_logic_capacity(&obj.obj_id, &attr, capacity, new_size - capacity);
} else {
ret = dbs_global_handle()->expand_pagepool_logic_capacity(&obj.obj_id, &attr, new_size, 0);
}
if (ret != 0) {
OG_LOG_RUN_ERR("Failed(%d) to truncate PagePool size from %lu to %lu.", ret, capacity, new_size);
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t cm_dbs_pg_rename(const char *src_name, const char *dst_name)
{
if (src_name == NULL || strlen(src_name) == 0) {
OG_LOG_RUN_ERR("The src name is invalid.");
return OG_ERROR;
}
if (dst_name == NULL || strlen(dst_name) == 0) {
OG_LOG_RUN_ERR("The dst name is invalid.");
return OG_ERROR;
}
if (strcmp(src_name, dst_name) == 0) {
OG_LOG_RUN_WAR("The dst name is same as the src(%s).", src_name);
return OG_SUCCESS;
}
PagePoolAttr attr = { 0 };
char *nsName = NULL;
if (cm_dbs_get_ns_name(DEV_TYPE_PGPOOL, &nsName) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to get namespace id for pagepool.");
return OG_ERROR;
}
int32_t ret = strcpy_sp(attr.nsName, sizeof(attr.nsName), nsName);
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to copy ns name");
return OG_ERROR;
}
if (cm_dbs_pg_exist(dst_name)) {
OG_LOG_DEBUG_INF("target pagepool %s already exists, try to remove before rmname %s", dst_name, src_name);
if (cm_dbs_pg_destroy(dst_name) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to remove the existing target file %s before renaming %s", dst_name, src_name);
return OG_ERROR;
}
}
ret = dbs_global_handle()->rename_pagepool((char *)src_name, (char *)dst_name, &attr);
if (ret != 0) {
OG_LOG_RUN_ERR("Failed(%d) to rename pagepool from %s to %s.", ret, src_name, dst_name);
return OG_ERROR;
}
OG_LOG_DEBUG_INF("Rename pagepool from %s to %s successfully.", src_name, dst_name);
return OG_SUCCESS;
}
bool32 cm_dbs_pg_exist(const char *name)
{
PagePoolAttr attr = { 0 };
char *nsName = NULL;
int32 ret = cm_dbs_get_ns_name(DEV_TYPE_PGPOOL, &nsName);
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed(%d) to get namespace id for pagepool.", ret);
return OG_FALSE;
}
ret = strcpy_sp(attr.nsName, sizeof(attr.nsName), nsName);
if (SECUREC_UNLIKELY(ret != EOK)) {
OG_THROW_ERROR(ERR_SYSTEM_CALL, ret);
return OG_ERROR;
}
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("Failed to copy ns name");
return OG_ERROR;
}
PagePoolId id = { 0 };
ret = dbs_global_handle()->open_pagepool((char *)name, &attr, &id);
if (ret == -ENOENT) {
OG_LOG_DEBUG_INF("The pagepool(%s) does not exist.", name);
return OG_FALSE;
} else if (ret != 0) {
OG_LOG_RUN_ERR("Failed(%d) to open pagepool(%s)", ret, name);
return OG_FALSE;
}
(void)dbs_global_handle()->close_pagepool(&id);
OG_LOG_DEBUG_INF("The pagepool named %s exists.", name);
return OG_TRUE;
}