* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* dc_tenant.c
*
*
* IDENTIFICATION
* src/kernel/catalog/dc_tenant.c
*
* -------------------------------------------------------------------------
*/
#include "knl_dc_module.h"
#include "dc_tenant.h"
#include "cm_log.h"
#include "knl_context.h"
#include "dc_util.h"
#include "knl_database.h"
status_t dc_init_tenant(dc_context_t *ogx, dc_tenant_t **tenant_out)
{
dc_tenant_t *tenant = NULL;
tenant = (dc_tenant_t *)dc_list_remove(&ogx->free_tenants);
if (tenant == NULL) {
if (dc_alloc_mem(ogx, ogx->memory, sizeof(dc_tenant_t), (void **)&tenant) != OG_SUCCESS) {
return OG_ERROR;
}
}
errno_t ret = memset_s(tenant, sizeof(dc_tenant_t), 0, sizeof(dc_tenant_t));
knl_securec_check(ret);
CM_MAGIC_SET(tenant, dc_tenant_t);
CM_MAGIC_SET(&tenant->desc, knl_tenant_desc_t);
*tenant_out = tenant;
return OG_SUCCESS;
}
void dc_convert_tenant_desc(knl_cursor_t *cursor, knl_tenant_desc_t *desc)
{
text_t text;
CM_MAGIC_CHECK(desc, knl_tenant_desc_t);
desc->id = *(uint32 *)CURSOR_COLUMN_DATA(cursor, SYS_TENANTS_COL_ID);
text.str = CURSOR_COLUMN_DATA(cursor, SYS_TENANTS_COL_NAME);
text.len = CURSOR_COLUMN_SIZE(cursor, SYS_TENANTS_COL_NAME);
(void)cm_text2str(&text, desc->name, OG_TENANT_BUFFER_SIZE);
desc->ts_id = *(uint32 *)CURSOR_COLUMN_DATA(cursor, SYS_TENANTS_COL_TABLESPACE_ID);
desc->ts_num = *(uint32 *)CURSOR_COLUMN_DATA(cursor, SYS_TENANTS_COL_TABLESPACES_NUM);
text.str = CURSOR_COLUMN_DATA(cursor, SYS_TENANTS_COL_TABLESPACES_BITMAP);
text.len = CURSOR_COLUMN_SIZE(cursor, SYS_TENANTS_COL_TABLESPACES_BITMAP);
errno_t err = memcpy_s(desc->ts_bitmap, OG_SPACES_BITMAP_SIZE, text.str, text.len);
knl_securec_check(err);
desc->ctime = *(date_t *)CURSOR_COLUMN_DATA(cursor, SYS_TENANTS_COL_CTIME);
}
status_t dc_init_tenants(knl_session_t *session, dc_context_t *ogx)
{
uint32 tid;
dc_tenant_t *tenant = NULL;
CM_SAVE_STACK(session->stack);
knl_cursor_t *cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_TENANTS_ID, OG_INVALID_ID32);
if (knl_fetch(session, cursor) != OG_SUCCESS) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
while (!cursor->eof) {
tid = *(uint32 *)CURSOR_COLUMN_DATA(cursor, SYS_TENANTS_COL_ID);
if (tid >= OG_MAX_TENANTS) {
CM_NEVER;
break;
}
if (tid == SYS_TENANTROOT_ID) {
tenant = ogx->tenants[tid];
} else {
if (dc_init_tenant(ogx, &tenant) != OG_SUCCESS) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
ogx->tenants[tid] = tenant;
}
dc_convert_tenant_desc(cursor, &tenant->desc);
if (knl_fetch(session, cursor) != OG_SUCCESS) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
}
CM_RESTORE_STACK(session->stack);
return OG_SUCCESS;
}
status_t dc_add_tenant(dc_context_t *ogx, knl_tenant_desc_t *desc)
{
dc_tenant_t *tenant = NULL;
CM_MAGIC_CHECK(desc, knl_tenant_desc_t);
if (dc_init_tenant(ogx, &tenant) != OG_SUCCESS) {
return OG_ERROR;
}
tenant->desc = *desc;
ogx->tenants[desc->id] = tenant;
return OG_SUCCESS;
}
void dc_drop_tenant(knl_session_t *session, uint32 tid)
{
dc_tenant_t *tenant = NULL;
dc_context_t *ogx = &session->kernel->dc_ctx;
CM_ASSERT(tid < OG_MAX_TENANTS);
tenant = ogx->tenants[tid];
ogx->tenants[tid] = NULL;
CM_ASSERT(ogx->tenant_buckets[tid].first == OG_INVALID_ID32);
dc_list_add(&ogx->free_tenants, (dc_list_node_t*)tenant);
}
static void dc_fill_tenant(knl_cursor_t *cursor, dc_tenant_t *tenant, uint32 tid)
{
text_t text;
CM_MAGIC_CHECK(tenant, dc_tenant_t);
tenant->desc.id = tid;
text.str = CURSOR_COLUMN_DATA(cursor, SYS_TENANTS_COL_NAME);
text.len = CURSOR_COLUMN_SIZE(cursor, SYS_TENANTS_COL_NAME);
(void)cm_text2str(&text, tenant->desc.name, OG_TENANT_BUFFER_SIZE);
tenant->desc.ts_id = *(uint32 *)CURSOR_COLUMN_DATA(cursor, SYS_TENANTS_COL_TABLESPACE_ID);
tenant->desc.ts_num = *(uint32 *)CURSOR_COLUMN_DATA(cursor, SYS_TENANTS_COL_TABLESPACES_NUM);
text.str = CURSOR_COLUMN_DATA(cursor, SYS_TENANTS_COL_TABLESPACES_BITMAP);
text.len = CURSOR_COLUMN_SIZE(cursor, SYS_TENANTS_COL_TABLESPACES_BITMAP);
errno_t err = memcpy_s(tenant->desc.ts_bitmap, OG_SPACES_BITMAP_SIZE, text.str, text.len);
knl_securec_check(err);
tenant->desc.ctime = *(date_t *)CURSOR_COLUMN_DATA(cursor, SYS_TENANTS_COL_CTIME);
}
status_t dc_try_create_tenant(knl_session_t *session, uint32 id, const char *tenant_name)
{
dc_context_t *ogx = &session->kernel->dc_ctx;
knl_cursor_t *cursor = NULL;
uint32 tid;
dc_tenant_t *tenant = NULL;
CM_SAVE_STACK(session->stack);
cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_TENANTS_ID, IX_SYS_TENANTS_001_ID);
knl_init_index_scan(cursor, OG_TRUE);
knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key, OG_TYPE_INTEGER, (void *)&id,
sizeof(uint32), IX_COL_SYS_TENANTS_001_ID);
if (knl_fetch(session, cursor) != OG_SUCCESS) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
if (!cursor->eof) {
tid = *(uint32 *)CURSOR_COLUMN_DATA(cursor, SYS_TENANTS_COL_ID);
cm_latch_x(&ogx->tenant_latch, session->id, NULL);
if (ogx->tenants[tid] == NULL) {
if (dc_init_tenant(ogx, &tenant) != OG_SUCCESS) {
cm_unlatch(&ogx->tenant_latch, NULL);
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
ogx->tenants[tid] = tenant;
} else {
tenant = ogx->tenants[tid];
}
dc_fill_tenant(cursor, tenant, tid);
cm_unlatch(&ogx->tenant_latch, NULL);
}
CM_RESTORE_STACK(session->stack);
return OG_SUCCESS;
}
status_t dc_open_tenant_by_id(knl_session_t *session, uint32 tid, dc_tenant_t **tenant)
{
dc_context_t *ogx = &session->kernel->dc_ctx;
CM_ASSERT(tid < OG_MAX_TENANTS);
cm_latch_s(&ogx->tenant_latch, session->id, OG_FALSE, NULL);
if (ogx->tenants[tid] == NULL) {
cm_unlatch(&ogx->tenant_latch, NULL);
OG_THROW_ERROR(ERR_OBJECT_ID_NOT_EXIST, "tenant", tid);
return OG_ERROR;
}
if (tid != SYS_TENANTROOT_ID) {
cm_spin_lock(&ogx->tenants[tid]->lock, NULL);
ogx->tenants[tid]->ref_cnt++;
cm_spin_unlock(&ogx->tenants[tid]->lock);
}
*tenant = ogx->tenants[tid];
cm_unlatch(&ogx->tenant_latch, NULL);
return OG_SUCCESS;
}
void dc_set_tenant_tablespace_bitmap(knl_tenant_desc_t* desc, uint32 ts_id)
{
uint32 bit;
uint32 map;
CM_ASSERT(ts_id <= OG_MAX_SPACES);
CM_MAGIC_CHECK(desc, knl_tenant_desc_t);
bit = ts_id / UINT8_BITS;
map = ts_id % UINT8_BITS;
desc->ts_bitmap[bit] |= (1 << map);
}
bool32 dc_get_tenant_tablespace_bitmap(knl_tenant_desc_t* desc, uint32 ts_id)
{
uint32 bit;
uint32 map;
CM_ASSERT(ts_id <= OG_MAX_SPACES);
CM_MAGIC_CHECK(desc, knl_tenant_desc_t);
bit = ts_id / UINT8_BITS;
map = ts_id % UINT8_BITS;
if ((desc->ts_bitmap[bit] & (1 << map))) {
return OG_TRUE;
} else {
return OG_FALSE;
}
}
status_t dc_open_tenant_core(knl_session_t *session, const text_t *tenantname, dc_tenant_t **tenant_out)
{
uint32 i;
dc_context_t *ogx = &session->kernel->dc_ctx;
dc_tenant_t *tenant = NULL;
for (i = 0; i < OG_MAX_TENANTS; i++) {
tenant = ogx->tenants[i];
if (tenant == NULL || cm_text_str_equal_ins(tenantname, tenant->desc.name) == OG_FALSE) {
continue;
}
*tenant_out = tenant;
return OG_SUCCESS;
}
OG_THROW_ERROR(ERR_TENANT_NOT_EXIST, T2S(tenantname));
return OG_ERROR;
}
status_t dc_open_tenant(knl_session_t *session, const text_t *tenantname, dc_tenant_t **tenant_out)
{
dc_context_t *ogx = &session->kernel->dc_ctx;
cm_latch_s(&ogx->tenant_latch, session->id, OG_FALSE, NULL);
if (dc_open_tenant_core(session, tenantname, tenant_out) != OG_SUCCESS) {
cm_unlatch(&ogx->tenant_latch, NULL);
return OG_ERROR;
}
if ((*tenant_out)->desc.id != SYS_TENANTROOT_ID) {
cm_spin_lock(&(*tenant_out)->lock, NULL);
(*tenant_out)->ref_cnt++;
cm_spin_unlock(&(*tenant_out)->lock);
}
cm_unlatch(&ogx->tenant_latch, NULL);
return OG_SUCCESS;
}
void dc_close_tenant(knl_session_t *session, uint32 tenant_id)
{
dc_context_t *ogx = &session->kernel->dc_ctx;
dc_tenant_t *tenant = NULL;
CM_ASSERT(tenant_id < OG_MAX_TENANTS);
tenant = ogx->tenants[tenant_id];
CM_MAGIC_CHECK(tenant, dc_tenant_t);
if (tenant_id != SYS_TENANTROOT_ID) {
cm_spin_lock(&tenant->lock, NULL);
CM_ASSERT(tenant->ref_cnt > 0);
tenant->ref_cnt--;
cm_spin_unlock(&tenant->lock);
}
}
status_t dc_update_tenant(knl_session_t *session, const char *tenant_name, bool32 *is_found)
{
dc_context_t *ogx = &session->kernel->dc_ctx;
knl_cursor_t *cursor = NULL;
uint32 tid;
dc_tenant_t *tenant = NULL;
*is_found = OG_FALSE;
CM_SAVE_STACK(session->stack);
cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_TENANTS_ID, IX_SYS_TENANTS_002_ID);
knl_init_index_scan(cursor, OG_TRUE);
knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key, OG_TYPE_STRING, tenant_name,
(uint16)strlen(tenant_name), IX_COL_SYS_TENANTS_002_NAME);
if (knl_fetch(session, cursor) != OG_SUCCESS) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
if (!cursor->eof) {
tid = *(uint32 *)CURSOR_COLUMN_DATA(cursor, SYS_TENANTS_COL_ID);
cm_latch_x(&ogx->tenant_latch, session->id, NULL);
tenant = ogx->tenants[tid];
if (tenant == NULL) {
cm_unlatch(&ogx->tenant_latch, NULL);
CM_RESTORE_STACK(session->stack);
OG_LOG_RUN_ERR("[DC] failed to load tid:%u", tid);
return OG_ERROR;
}
dc_fill_tenant(cursor, tenant, tid);
*is_found = OG_TRUE;
cm_unlatch(&ogx->tenant_latch, NULL);
CM_RESTORE_STACK(session->stack);
return OG_SUCCESS;
}
*is_found = OG_FALSE;
CM_RESTORE_STACK(session->stack);
return OG_SUCCESS;
}
status_t dc_lock_tenant(knl_session_t *session, knl_drop_tenant_t *def, uint32 *tid)
{
dc_context_t *ogx = &session->kernel->dc_ctx;
dc_tenant_t *tenant = NULL;
dc_bucket_t *bucket = NULL;
status_t status = OG_SUCCESS;
if (dc_open_tenant_core(session, &def->name, &tenant) != OG_SUCCESS) {
return OG_ERROR;
}
cm_spin_lock(&tenant->lock, NULL);
int32 ref_cnt = tenant->ref_cnt;
cm_spin_unlock(&tenant->lock);
if (ref_cnt > 0) {
OG_THROW_ERROR(ERR_TENANT_IS_REFERENCED, T2S(&def->name), "can not drop");
return OG_ERROR;
}
*tid = tenant->desc.id;
bucket = &ogx->tenant_buckets[*tid];
cm_spin_lock(&bucket->lock, NULL);
if (bucket->first != OG_INVALID_ID32) {
if (!(def->options & DROP_CASCADE_CONS)) {
OG_THROW_ERROR(ERR_TENANT_IS_REFERENCED, T2S(&def->name), "can not drop");
status = OG_ERROR;
}
}
cm_spin_unlock(&bucket->lock);
return status;
}
status_t dc_init_root_tenant(knl_handle_t session, dc_context_t *ogx)
{
dc_tenant_t *tenant = NULL;
if (ogx->tenants[SYS_TENANTROOT_ID] != NULL) {
return OG_SUCCESS;
}
if (dc_init_tenant(ogx, &tenant) != OG_SUCCESS) {
return OG_ERROR;
}
tenant->desc.ctime = cm_now();
tenant->desc.id = SYS_TENANTROOT_ID;
tenant->desc.ts_id = FIXED_USER_SPACE_ID;
tenant->desc.ts_num = 0;
if (cm_text2str(&g_tenantroot, tenant->desc.name, OG_TENANT_NAME_LEN) != OG_SUCCESS) {
return OG_ERROR;
}
errno_t ret = memset_s(tenant->desc.ts_bitmap, OG_SPACES_BITMAP_SIZE, -1, OG_SPACES_BITMAP_SIZE);
knl_securec_check(ret);
ogx->tenants[SYS_TENANTROOT_ID] = tenant;
return OG_SUCCESS;
}
status_t dc_get_tenant_id(knl_session_t *session, const text_t *name, uint32 *tenant_id)
{
dc_tenant_t *tenant = NULL;
if (CM_IS_EMPTY(name) || cm_text_equal_ins(name, &g_tenantroot)) {
*tenant_id = 0;
return OG_SUCCESS;
}
if (dc_open_tenant(session, name, &tenant) != OG_SUCCESS) {
return OG_ERROR;
}
CM_MAGIC_CHECK(tenant, dc_tenant_t);
*tenant_id = tenant->desc.id;
dc_close_tenant(session, tenant->desc.id);
return OG_SUCCESS;
}