* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* knl_tenant.c
*
*
* IDENTIFICATION
* src/kernel/catalog/knl_tenant.c
*
* -------------------------------------------------------------------------
*/
#include "knl_db_module.h"
#include "knl_tenant.h"
#include "knl_context.h"
#include "dc_tenant.h"
#include "knl_table.h"
#include "dc_user.h"
#include "knl_interface.h"
#include "knl_spm.h"
#include "dtc_dls.h"
#ifdef __cplusplus
extern "C" {
#endif
static status_t tenant_check_name_valid(knl_session_t *session, knl_tenant_def_t *def, knl_tenant_desc_t *desc)
{
dc_tenant_t *tenant = NULL;
dc_context_t *ogx = &session->kernel->dc_ctx;
uint32 i;
CM_MAGIC_CHECK(def, knl_tenant_def_t);
CM_MAGIC_CHECK(desc, knl_tenant_desc_t);
desc->id = OG_INVALID_ID32;
for (i = 0; i < OG_MAX_TENANTS; i++) {
tenant = ogx->tenants[i];
if (tenant == NULL) {
if (desc->id == OG_INVALID_ID32) {
desc->id = i;
}
continue;
}
if (cm_str_equal(tenant->desc.name, def->name)) {
OG_THROW_ERROR(ERR_OBJECT_EXISTS, "tenant", tenant->desc.name);
return OG_ERROR;
}
}
if (desc->id == OG_INVALID_ID32) {
OG_THROW_ERROR(ERR_MAX_ROLE_COUNT, "tenants", OG_MAX_TENANTS);
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t tenant_prepare_desc(knl_session_t *session, knl_tenant_def_t *def, knl_tenant_desc_t *desc)
{
uint32 i;
text_t* space_name = NULL;
uint32 space_id;
status_t status;
space_t* space = NULL;
text_t name;
CM_MAGIC_CHECK(def, knl_tenant_def_t);
CM_MAGIC_CHECK(desc, knl_tenant_desc_t);
if (tenant_check_name_valid(session, def, desc) != OG_SUCCESS) {
return OG_ERROR;
}
for (i = 0; i < def->space_lst.count; i++) {
space_name = (text_t*)cm_galist_get(&def->space_lst, i);
status = spc_get_space_id(session, space_name, OG_FALSE, &space_id);
if (status != OG_SUCCESS) {
return OG_ERROR;
}
CM_ASSERT(space_id < OG_MAX_SPACES);
space = SPACE_GET(session, space_id);
if (!IS_USER_SPACE(space)) {
OG_THROW_ERROR(ERR_SPACE_INVALID, space->ctrl->name);
return OG_ERROR;
}
dc_set_tenant_tablespace_bitmap(desc, space_id);
}
desc->ts_num = def->space_lst.count;
errno_t ret = strcpy_s(desc->name, OG_TENANT_BUFFER_SIZE, def->name);
knl_securec_check(ret);
cm_str2text(def->default_tablespace, &name);
if (spc_get_space_id(session, &name, OG_FALSE, &space_id) != OG_SUCCESS) {
return OG_ERROR;
}
desc->ts_id = space_id;
desc->ctime = cm_now();
return OG_SUCCESS;
}
status_t tenant_create(knl_session_t *session, knl_tenant_def_t *def)
{
knl_cursor_t *cursor = NULL;
knl_tenant_desc_t desc = { 0 };
errno_t err;
rd_tenant_t redo;
status_t status;
dc_context_t *ogx = &session->kernel->dc_ctx;
CM_MAGIC_CHECK(def, knl_tenant_def_t);
dls_spin_lock(session, &ogx->paral_lock, NULL);
cm_latch_x(&ogx->tenant_latch, session->id, NULL);
CM_MAGIC_SET(&desc, knl_tenant_desc_t);
status = tenant_prepare_desc(session, def, &desc);
if (status != OG_SUCCESS) {
cm_unlatch(&ogx->tenant_latch, NULL);
dls_spin_unlock(session, &ogx->paral_lock);
return OG_ERROR;
}
CM_SAVE_STACK(session->stack);
cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_INSERT, SYS_TENANTS_ID, OG_INVALID_ID32);
status = db_insert_sys_tenants(session, cursor, &desc);
if (status != OG_SUCCESS) {
CM_RESTORE_STACK(session->stack);
cm_unlatch(&ogx->tenant_latch, NULL);
dls_spin_unlock(session, &ogx->paral_lock);
return OG_ERROR;
}
if (dc_add_tenant(ogx, &desc) != OG_SUCCESS) {
CM_RESTORE_STACK(session->stack);
cm_unlatch(&ogx->tenant_latch, NULL);
dls_spin_unlock(session, &ogx->paral_lock);
return OG_ERROR;
}
CM_MAGIC_SET(&redo, rd_tenant_t);
redo.op_type = RD_CREATE_TENANT;
redo.tid = desc.id;
err = strcpy_s(redo.name, OG_TENANT_BUFFER_SIZE, desc.name);
knl_securec_check(err);
log_put(session, RD_LOGIC_OPERATION, &redo, sizeof(rd_tenant_t), LOG_ENTRY_FLAG_NONE);
knl_commit(session);
CM_RESTORE_STACK(session->stack);
cm_unlatch(&ogx->tenant_latch, NULL);
dls_spin_unlock(session, &ogx->paral_lock);
return OG_SUCCESS;
}
static status_t tenant_prepare_alter_add_space(knl_session_t *session, knl_tenant_def_t *def, knl_tenant_desc_t *desc)
{
text_t* space_name = NULL;
uint32 i;
uint32 space_id;
space_t* space = NULL;
CM_MAGIC_CHECK(def, knl_tenant_def_t);
CM_MAGIC_CHECK(desc, knl_tenant_desc_t);
for (i = 0; i < def->space_lst.count; i++) {
space_name = (text_t*)cm_galist_get(&def->space_lst, i);
if (spc_get_space_id(session, space_name, OG_FALSE, &space_id) != OG_SUCCESS) {
return OG_ERROR;
}
CM_ASSERT(space_id < OG_MAX_SPACES);
space = SPACE_GET(session, space_id);
if (!IS_USER_SPACE(space)) {
OG_THROW_ERROR(ERR_SPACE_INVALID, space->ctrl->name);
return OG_ERROR;
}
if (dc_get_tenant_tablespace_bitmap(desc, space_id)) {
OG_THROW_ERROR(ERR_SPACE_ALREADY_USABLE, space->ctrl->name);
return OG_ERROR;
}
}
for (i = 0; i < def->space_lst.count; i++) {
space_name = (text_t*)cm_galist_get(&def->space_lst, i);
if (spc_get_space_id(session, space_name, OG_FALSE, &space_id) != OG_SUCCESS) {
return OG_ERROR;
}
dc_set_tenant_tablespace_bitmap(desc, space_id);
}
desc->ts_num += def->space_lst.count;
return OG_SUCCESS;
}
static status_t tenant_prepare_alter_modify_default(knl_session_t *session, knl_tenant_def_t *def, knl_tenant_desc_t *desc)
{
uint32 space_id;
space_t* space = NULL;
text_t name;
CM_MAGIC_CHECK(def, knl_tenant_def_t);
CM_MAGIC_CHECK(desc, knl_tenant_desc_t);
CM_ASSERT(!CM_IS_EMPTY_STR(def->default_tablespace));
cm_str2text(def->default_tablespace, &name);
if (spc_get_space_id(session, &name, OG_FALSE, &space_id) != OG_SUCCESS) {
return OG_ERROR;
}
CM_ASSERT(space_id < OG_MAX_SPACES);
if (space_id == desc->ts_id) {
return OG_SUCCESS;
}
space = SPACE_GET(session, space_id);
if (!IS_USER_SPACE(space)) {
OG_THROW_ERROR(ERR_SPACE_INVALID, space->ctrl->name);
return OG_ERROR;
}
if (!dc_get_tenant_tablespace_bitmap(desc, space_id)) {
OG_THROW_ERROR(ERR_SPACE_DISABLED, space->ctrl->name);
return OG_ERROR;
}
desc->ts_id = space_id;
return OG_SUCCESS;
}
static status_t tenant_prepare_alter(knl_session_t *session, knl_tenant_def_t *def, knl_tenant_desc_t *desc)
{
CM_MAGIC_CHECK(def, knl_tenant_def_t);
CM_MAGIC_CHECK(desc, knl_tenant_desc_t);
if (def->sub_type == ALTER_TENANT_TYPE_ADD_SPACE) {
return tenant_prepare_alter_add_space(session, def, desc);
} else {
CM_ASSERT(def->sub_type == ALTER_TENANT_TYPE_MODEIFY_DEFAULT);
return tenant_prepare_alter_modify_default(session, def, desc);
}
}
static status_t tenant_alter_core(knl_session_t *session, knl_tenant_def_t *def)
{
knl_tenant_desc_t *desc = NULL;
knl_tenant_desc_t save_desc;
text_t tenant_name;
dc_tenant_t* tenant = NULL;
rd_tenant_t redo;
CM_MAGIC_CHECK(def, knl_tenant_def_t);
cm_str2text(def->name, &tenant_name);
if (dc_open_tenant_core(session, &tenant_name, &tenant) != OG_SUCCESS) {
return OG_ERROR;
}
cm_spin_lock(&tenant->lock, NULL);
int32 ref_cnt = tenant->ref_cnt;
cm_spin_unlock(&tenant->lock);
if (ref_cnt > 0) {
OG_THROW_ERROR(ERR_TENANT_IS_REFERENCED, T2S(&tenant_name), "can not alter");
return OG_ERROR;
}
desc = &tenant->desc;
save_desc = *desc;
if (tenant_prepare_alter(session, def, desc) != OG_SUCCESS) {
*desc = save_desc;
return OG_ERROR;
}
CM_SAVE_STACK(session->stack);
status_t status = db_alter_tenant_field(session, desc);
CM_RESTORE_STACK(session->stack);
if (status != OG_SUCCESS) {
*desc = save_desc;
} else {
CM_MAGIC_SET(&redo, rd_tenant_t);
redo.op_type = RD_ALTER_TENANT;
errno_t err = strcpy_sp(redo.name, OG_TENANT_BUFFER_SIZE, def->name);
knl_securec_check(err);
log_put(session, RD_LOGIC_OPERATION, &redo, sizeof(rd_tenant_t), LOG_ENTRY_FLAG_NONE);
knl_commit(session);
}
return status;
}
status_t tenant_alter(knl_session_t *session, knl_tenant_def_t *def)
{
dc_context_t *ogx = &session->kernel->dc_ctx;
dls_spin_lock(session, &ogx->paral_lock, NULL);
cm_latch_x(&ogx->tenant_latch, session->id, NULL);
status_t status = tenant_alter_core(session, def);
cm_unlatch(&ogx->tenant_latch, NULL);
dls_spin_unlock(session, &ogx->paral_lock);
return status;
}
static status_t tenant_drop_user(knl_session_t *session, uint32 tid)
{
dc_context_t *ogx = &session->kernel->dc_ctx;
dc_bucket_t *bucket = NULL;
dc_user_t *user = NULL;
text_t username;
uint32 uid;
bucket = &ogx->tenant_buckets[tid];
while (bucket->first != OG_INVALID_ID32) {
if (dc_open_user_by_id(session, bucket->first, &user) != OG_SUCCESS) {
return OG_ERROR;
}
cm_str2text(user->desc.name, &username);
if (g_knl_callback.whether_login_with_user(&username)) {
OG_THROW_ERROR(ERR_USER_HAS_LOGIN, user->desc.name);
return OG_ERROR;
}
uid = user->next1;
if (user_drop_core(session, user, OG_TRUE) != OG_SUCCESS) {
session->drop_uid = OG_INVALID_ID32;
return OG_ERROR;
}
if (knl_clean_sys_spm_schmpcr(session, &username) != OG_SUCCESS) {
session->drop_uid = OG_INVALID_ID32;
return OG_ERROR;
}
CM_ASSERT(uid == bucket->first);
}
session->drop_uid = OG_INVALID_ID32;
return OG_SUCCESS;
}
status_t tenant_drop(knl_session_t *session, knl_drop_tenant_t *def)
{
uint32 tid = OG_INVALID_ID32;
dc_context_t *ogx = &session->kernel->dc_ctx;
rd_tenant_t redo;
CM_MAGIC_CHECK(def, knl_drop_tenant_t);
dls_spin_lock(session, &ogx->paral_lock, NULL);
cm_latch_x(&ogx->tenant_latch, session->id, NULL);
knl_set_session_scn(session, OG_INVALID_ID64);
if (dc_lock_tenant(session, def, &tid) != OG_SUCCESS) {
cm_unlatch(&ogx->tenant_latch, NULL);
dls_spin_unlock(session, &ogx->paral_lock);
if (def->options & DROP_IF_EXISTS) {
int32 code = cm_get_error_code();
if (code == ERR_TENANT_NOT_EXIST) {
cm_reset_error();
return OG_SUCCESS;
}
}
return OG_ERROR;
}
if (tenant_drop_user(session, tid) != OG_SUCCESS) {
cm_unlatch(&ogx->tenant_latch, NULL);
dls_spin_unlock(session, &ogx->paral_lock);
return OG_ERROR;
}
status_t status = db_delete_from_sys_tenant(session, tid);
if (status == OG_SUCCESS) {
dc_drop_tenant(session, tid);
CM_MAGIC_SET(&redo, rd_tenant_t);
redo.op_type = RD_DROP_TENANT;
redo.tid = tid;
errno_t err = strcpy_s(redo.name, OG_TENANT_BUFFER_SIZE, T2S(&def->name));
knl_securec_check(err);
log_put(session, RD_LOGIC_OPERATION, &redo, sizeof(rd_tenant_t), LOG_ENTRY_FLAG_NONE);
knl_commit(session);
}
cm_unlatch(&ogx->tenant_latch, NULL);
dls_spin_unlock(session, &ogx->paral_lock);
return status;
}
status_t knl_get_tenant_id(knl_handle_t session, text_t *name, uint32 *tid)
{
return dc_get_tenant_id((knl_session_t *)session, name, tid);
}
#ifdef __cplusplus
}
#endif