* Copyright (c) 2022 Huawei Technologies Co.,Ltd.
*
* DMS is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* drc_tran.c
*
*
* IDENTIFICATION
* src/drc/drc_tran.c
*
* -------------------------------------------------------------------------
*/
#include "drc_tran.h"
#include "drc.h"
#include "cm_thread.h"
#include "dms_error.h"
#include "dms_reform.h"
#include "dms_process.h"
#include "dcs_tran.h"
#define TX_WAIT_INTERVAL 5
static drc_res_map_t *get_txn_res_map(dms_res_type_e res_type)
{
drc_res_ctx_t *ctx = DRC_RES_CTX;
if (res_type == DMS_RES_TYPE_IS_TXN) {
return &ctx->txn_res_map;
} else if (res_type == DMS_RES_TYPE_IS_LOCAL_TXN) {
return &ctx->local_txn_map;
}
cm_panic(0);
return NULL;
}
static void init_txn_res(drc_txn_res_t *txn_res, const uint64 *xid, dms_res_type_e res_type)
{
txn_res->res_type = res_type;
txn_res->res_id = *xid;
txn_res->inst_map = 0;
txn_res->ref_count = 0;
if (!txn_res->is_cond_inited && (res_type == DMS_RES_TYPE_IS_LOCAL_TXN)) {
cm_init_cond(&txn_res->cond);
txn_res->is_cond_inited = CM_TRUE;
}
}
drc_txn_res_t *drc_create_txn_res(uint64 *xid, dms_res_type_e res_type)
{
drc_txn_res_t *txn_res = NULL;
drc_res_bucket_t *bucket = NULL;
drc_res_map_t *res_map = NULL;
res_map = get_txn_res_map(res_type);
CM_ASSERT(res_map != NULL);
txn_res = (drc_txn_res_t *)drc_res_pool_alloc_item(&res_map->res_pool);
if (txn_res == NULL) {
return NULL;
}
init_txn_res(txn_res, xid, res_type);
bucket = drc_res_map_get_bucket(res_map, (char *)xid, sizeof(uint64));
drc_res_map_add_res(bucket, (char *)txn_res);
return txn_res;
}
void drc_release_txn_res(drc_txn_res_t *txn_res)
{
drc_res_map_t *res_map = get_txn_res_map(txn_res->res_type);
CM_ASSERT(res_map != NULL);
drc_res_pool_free_item(&res_map->res_pool, (char *)txn_res);
}
int32 drc_enqueue_txn(void *db_handle, uint64 *xid, uint8 inst_id, dms_txn_info_t *txn_info)
{
drc_res_ctx_t *ctx = DRC_RES_CTX;
drc_res_map_t *res_map = &ctx->txn_res_map;
drc_res_bucket_t *bucket = NULL;
drc_txn_res_t *txn_res = NULL;
bucket = drc_res_map_get_bucket(res_map, (char *)xid, sizeof(uint64));
cm_spin_lock(&bucket->lock, NULL);
int32 ret = g_dms.callback.get_txn_info(db_handle, *xid, CM_FALSE, txn_info);
if (ret != DMS_SUCCESS) {
cm_spin_unlock(&bucket->lock);
return ret;
}
if (txn_info->status == DMS_XACT_END) {
cm_spin_unlock(&bucket->lock);
return DMS_SUCCESS;
}
txn_res = (drc_txn_res_t *)drc_res_map_lookup(res_map, bucket, (char *)xid, sizeof(uint64));
if (txn_res == NULL) {
txn_res = drc_create_txn_res(xid, DMS_RES_TYPE_IS_TXN);
if (txn_res == NULL) {
cm_spin_unlock(&bucket->lock);
return DMS_SUCCESS;
}
}
bitmap64_set(&txn_res->inst_map, inst_id);
cm_spin_unlock(&bucket->lock);
return DMS_SUCCESS;
}
bool8 drc_local_txn_wait(uint64 *xid)
{
drc_res_ctx_t *ctx = DRC_RES_CTX;
drc_res_map_t *res_map = &ctx->local_txn_map;
drc_res_bucket_t *bucket = NULL;
drc_txn_res_t *txn_res = NULL;
bucket = drc_res_map_get_bucket(res_map, (char *)xid, sizeof(uint64));
cm_spin_lock(&bucket->lock, NULL);
txn_res = (drc_txn_res_t *)drc_res_map_lookup(res_map, bucket, (char *)xid, sizeof(uint64));
if (txn_res == NULL) {
txn_res = drc_create_txn_res(xid, DMS_RES_TYPE_IS_LOCAL_TXN);
if (txn_res == NULL) {
cm_spin_unlock(&bucket->lock);
return CM_FALSE;
}
}
(void)cm_atomic32_inc(&txn_res->ref_count);
cm_spin_unlock(&bucket->lock);
bool8 ret = (bool8)cm_wait_cond(&txn_res->cond, TX_WAIT_INTERVAL);
if (ret != CM_TRUE) {
return CM_FALSE;
}
return CM_TRUE;
}
void drc_local_txn_recycle(uint64 *xid)
{
drc_res_ctx_t *ctx = DRC_RES_CTX;
drc_res_map_t *res_map = &ctx->local_txn_map;
drc_res_bucket_t *bucket = NULL;
drc_txn_res_t *txn_res = NULL;
bucket = drc_res_map_get_bucket(res_map, (char *)xid, sizeof(uint64));
cm_spin_lock(&bucket->lock, NULL);
txn_res = (drc_txn_res_t *)drc_res_map_lookup(res_map, bucket, (char *)xid, sizeof(uint64));
if (txn_res == NULL) {
cm_spin_unlock(&bucket->lock);
return;
}
cm_panic_log(txn_res->ref_count > 0, "xid(%llu) ref_count(%d) is invalid", *xid, txn_res->ref_count);
(void)cm_atomic32_inc(&txn_res->ref_count);
if (txn_res->ref_count == 0) {
drc_res_map_del_res(res_map, bucket, (char *)xid, sizeof(uint64));
drc_release_txn_res(txn_res);
}
cm_spin_unlock(&bucket->lock);
}
void drc_local_txn_awake(uint64 *xid)
{
drc_res_ctx_t *ctx = DRC_RES_CTX;
drc_res_map_t *res_map = &ctx->local_txn_map;
drc_res_bucket_t *bucket = NULL;
drc_txn_res_t *txn_res = NULL;
bucket = drc_res_map_get_bucket(res_map, (char *)xid, sizeof(uint64));
cm_spin_lock(&bucket->lock, NULL);
txn_res = (drc_txn_res_t *)drc_res_map_lookup(res_map, bucket, (char *)xid, sizeof(uint64));
if (txn_res == NULL) {
cm_spin_unlock(&bucket->lock);
return;
}
cm_release_cond(&txn_res->cond);
cm_spin_unlock(&bucket->lock);
}
static uint16 drc_get_global_xid_partid(drc_global_xid_t *global_xid)
{
uint8 bytes[sizeof(uint64) + DMS_MAX_XA_BASE16_GTRID_LEN + DMS_MAX_XA_BASE16_BQUAL_LEN] = { 0 };
*(uint64 *)bytes = global_xid->fmt_id;
errno_t ret = memcpy_sp(bytes + sizeof(uint64), DMS_MAX_XA_BASE16_GTRID_LEN, global_xid->gtrid,
global_xid->gtrid_len);
DMS_SECUREC_CHECK(ret);
if (global_xid->bqual_len > 0) {
ret = memcpy_sp(bytes + sizeof(uint64) + global_xid->gtrid_len, DMS_MAX_XA_BASE16_BQUAL_LEN, global_xid->bqual,
global_xid->bqual_len);
DMS_SECUREC_CHECK(ret);
}
uint32 bytes_size = sizeof(uint64) + global_xid->gtrid_len + global_xid->bqual_len;
uint32 part_id = cm_hash_bytes(bytes, bytes_size, DRC_MAX_PART_NUM);
return part_id;
}
int32 drc_get_xa_master_id(drc_global_xid_t *xid, uint8 *master_id)
{
uint16 part_id = drc_get_global_xid_partid(xid);
uint8 instance_id = DRC_PART_MASTER_ID(part_id);
if (instance_id == CM_INVALID_ID8) {
DMS_THROW_ERROR(ERRNO_DMS_DRC_XA_MASTER_NOT_FOUND, cm_display_resid((char *)xid, DRC_RES_GLOBAL_XA_TYPE));
return ERRNO_DMS_DRC_XA_MASTER_NOT_FOUND;
}
*master_id = instance_id;
return CM_SUCCESS;
}
int32 drc_get_xa_remaster_id(drc_global_xid_t *xid, uint8 *master_id)
{
uint16 part_id = drc_get_global_xid_partid(xid);
uint8 instance_id = DRC_PART_REMASTER_ID(part_id);
if (instance_id == CM_INVALID_ID8) {
DMS_THROW_ERROR(ERRNO_DMS_DRC_XA_MASTER_NOT_FOUND, cm_display_resid((char *)xid, DRC_RES_GLOBAL_XA_TYPE));
return ERRNO_DMS_DRC_XA_MASTER_NOT_FOUND;
}
*master_id = instance_id;
return CM_SUCCESS;
}
static void drc_add_xa_part_list(drc_global_xa_res_t *xa_res)
{
drc_res_ctx_t *ctx = DRC_RES_CTX;
xa_res->part_id = drc_get_global_xid_partid(&xa_res->xid);
drc_part_list_t *part = &ctx->global_xa_res.res_parts[xa_res->part_id];
cm_spin_lock(&part->lock, NULL);
cm_bilist_add_head(&xa_res->part_node, &part->list);
cm_spin_unlock(&part->lock);
}
static void drc_del_xa_part_list(drc_global_xa_res_t *xa_res)
{
drc_res_ctx_t *ctx = DRC_RES_CTX;
drc_part_list_t *part = &ctx->global_xa_res.res_parts[xa_res->part_id];
cm_spin_lock(&part->lock, NULL);
cm_bilist_del(&xa_res->part_node, &part->list);
cm_spin_unlock(&part->lock);
}
static int32 drc_new_xa_res(drc_global_res_map_t *xa_res_map, drc_global_xid_t *global_xid, uint8 owner_id,
uint8 undo_set_id)
{
drc_res_map_t *res_map = &xa_res_map->res_map;
drc_res_bucket_t *bucket = drc_res_map_get_bucket(res_map, (char *)global_xid, sizeof(drc_global_xid_t));
drc_global_xa_res_t *xa_res = (drc_global_xa_res_t *)drc_res_pool_alloc_item(&xa_res_map->res_map.res_pool);
if (xa_res == NULL) {
LOG_DEBUG_ERR("[%s][drc_new_xa_res] drc global xa pool is full",
cm_display_resid((char *)global_xid, DRC_RES_GLOBAL_XA_TYPE));
DMS_THROW_ERROR(ERRNO_DMS_DRC_XA_POOL_CAPACITY_NOT_ENOUGH);
return ERRNO_DMS_DRC_XA_POOL_CAPACITY_NOT_ENOUGH;
}
LOG_DEBUG_INF("[DRC][%s] global xa res created", cm_display_resid((char *)global_xid, DRC_RES_GLOBAL_XA_TYPE));
xa_res->node.next = NULL;
xa_res->node.prev = NULL;
xa_res->owner_id = owner_id;
xa_res->undo_set_id = undo_set_id;
xa_res->in_recovery = CM_FALSE;
xa_res->part_id = CM_INVALID_ID8;
xa_res->part_node.next = NULL;
xa_res->part_node.prev = NULL;
errno_t ret = memcpy_sp(&xa_res->xid, sizeof(drc_global_xid_t), global_xid, sizeof(drc_global_xid_t));
if (ret != EOK) {
DMS_THROW_ERROR(ERR_SYSTEM_CALL, ret);
return ret;
}
cm_bilist_add_head(&xa_res->node, &bucket->bucket_list);
drc_add_xa_part_list(xa_res);
return DMS_SUCCESS;
}
static int32 drc_enter_xa_res_precheck(drc_global_xid_t *global_xid, drc_global_res_map_t *xa_res_map,
bool32 check_xa_drc)
{
if (check_xa_drc && xa_res_map->drc_accessible_stage == DRC_ACCESS_STAGE_ALL_INACCESS) {
LOG_DEBUG_ERR("[%s][drc_enter_xa_res_precheck] XA drc is inaccessiable", cm_display_resid((char *)global_xid,
DRC_RES_GLOBAL_XA_TYPE));
DMS_THROW_ERROR(ERRNO_DMS_REFORM_IN_PROCESS);
return ERRNO_DMS_REFORM_IN_PROCESS;
}
uint8 master_id = 0XFF;
int32 ret = drc_get_master_id((char *)global_xid, DRC_RES_GLOBAL_XA_TYPE, &master_id);
if (ret != DMS_SUCCESS) {
LOG_DEBUG_ERR("[%s][drc_enter_xa_res_precheck] get xa mater failed", cm_display_resid((char *)global_xid,
DRC_RES_GLOBAL_XA_TYPE));
return ret;
}
if (check_xa_drc && !dms_dst_id_is_self(master_id)) {
LOG_DEBUG_ERR("[%s][drc_enter_xa_res_precheck] master is changed to %u", cm_display_resid((char *)global_xid,
DRC_RES_GLOBAL_XA_TYPE), master_id);
DMS_THROW_ERROR(ERRNO_DMS_DRC_INVALID, cm_display_resid((char *)global_xid, DRC_RES_GLOBAL_XA_TYPE));
return ERRNO_DMS_DRC_INVALID;
}
return DMS_SUCCESS;
}
int32 drc_enter_xa_res(drc_global_xid_t *global_xid, drc_global_xa_res_t **xa_res, bool32 check_xa_drc)
{
drc_global_res_map_t *xa_res_map = drc_get_global_res_map(DRC_RES_GLOBAL_XA_TYPE);
if (!cm_latch_timed_s(&xa_res_map->res_latch, 1, CM_FALSE, NULL)) {
LOG_DEBUG_ERR("[%s][drc_enter_xa_res] failed to latch xa res map in shared mode",
cm_display_resid((char *)global_xid, DRC_RES_GLOBAL_XA_TYPE));
DMS_THROW_ERROR(ERRNO_DMS_REFORM_IN_PROCESS);
return ERRNO_DMS_REFORM_IN_PROCESS;
}
int32 ret = drc_enter_xa_res_precheck(global_xid, xa_res_map, check_xa_drc);
if (ret != DMS_SUCCESS) {
cm_unlatch(&xa_res_map->res_latch, NULL);
return ret;
}
drc_res_map_t *res_map = &xa_res_map->res_map;
drc_res_bucket_t *bucket = drc_res_map_get_bucket(res_map, (char *)global_xid, sizeof(drc_global_xid_t));
cm_spin_lock(&bucket->lock, NULL);
drc_global_xa_res_t *xa = (drc_global_xa_res_t *)drc_res_map_lookup(res_map, bucket, (char *)global_xid,
sizeof(drc_global_xid_t));
*xa_res = xa;
return DMS_SUCCESS;
}
void drc_leave_xa_res(drc_global_res_map_t *xa_res_map, drc_res_bucket_t *bucket)
{
cm_spin_unlock(&bucket->lock);
cm_unlatch(&xa_res_map->res_latch, NULL);
}
int32 drc_create_xa_res(void *db_handle, uint32 session_id, drc_global_xid_t *global_xid, uint8 owner_id,
uint8 undo_set_id, bool32 check_xa_drc)
{
drc_global_xa_res_t *xa_res = NULL;
drc_global_res_map_t *xa_res_map = drc_get_global_res_map(DRC_RES_GLOBAL_XA_TYPE);
drc_res_map_t *res_map = &xa_res_map->res_map;
drc_res_bucket_t *bucket = drc_res_map_get_bucket(res_map, (char *)global_xid, sizeof(drc_global_xid_t));
int32 ret = drc_enter_xa_res(global_xid, &xa_res, check_xa_drc);
if (ret != DMS_SUCCESS) {
return ret;
}
if (xa_res == NULL || xa_res->owner_id == CM_INVALID_ID8) {
ret = drc_new_xa_res(xa_res_map, global_xid, owner_id, undo_set_id);
if (ret != DMS_SUCCESS) {
drc_leave_xa_res(xa_res_map, bucket);
return ret;
}
LOG_DEBUG_INF("[DRC][%s][drc_create_xa_res]: create xa res success", cm_display_resid((char *)global_xid,
DRC_RES_GLOBAL_XA_TYPE));
drc_leave_xa_res(xa_res_map, bucket);
return DMS_SUCCESS;
}
dms_context_t dms_ctx;
ret = memset_sp(&dms_ctx, sizeof(dms_context_t), 0, sizeof(dms_context_t));
if (ret != EOK) {
drc_leave_xa_res(xa_res_map, bucket);
DMS_THROW_ERROR(ERR_SYSTEM_CALL, ret);
return ret;
}
dms_ctx.inst_id = g_dms.inst_id;
dms_ctx.sess_id = session_id;
dms_ctx.db_handle = db_handle;
dms_ctx.sess_type = dms_is_recovery_session(session_id);
ret = memcpy_sp(&dms_ctx.global_xid, sizeof(drc_global_xid_t), global_xid, sizeof(drc_global_xid_t));
if (ret != EOK) {
drc_leave_xa_res(xa_res_map, bucket);
DMS_THROW_ERROR(ERR_SYSTEM_CALL, ret);
return ret;
}
bool8 inuse = CM_FALSE;
ret = dms_request_xa_inuse(&dms_ctx, xa_res->owner_id, &inuse);
if (ret != DMS_SUCCESS) {
LOG_DEBUG_ERR("[%s][drc_create_xa_res] ask xa in use or not failed", cm_display_resid((char *)global_xid,
DRC_RES_GLOBAL_XA_TYPE));
drc_leave_xa_res(xa_res_map, bucket);
return ret;
}
if (inuse) {
LOG_DEBUG_ERR("[%s][drc_create_xa_res] xa res already exists", cm_display_resid((char *)global_xid,
DRC_RES_GLOBAL_XA_TYPE));
DMS_THROW_ERROR(ERRNO_DMS_DRC_XA_RES_ALREADY_EXISTS, cm_display_resid((char *)global_xid,
DRC_RES_GLOBAL_XA_TYPE));
drc_leave_xa_res(xa_res_map, bucket);
return ERRNO_DMS_DRC_XA_RES_ALREADY_EXISTS;
}
xa_res->owner_id = owner_id;
xa_res->undo_set_id = undo_set_id;
drc_leave_xa_res(xa_res_map, bucket);
return DMS_SUCCESS;
}
static void drc_release_xa_res_map(drc_res_map_t *res_map, drc_global_xid_t *global_xid, drc_global_xa_res_t *xa_res)
{
drc_res_bucket_t *bucket = drc_res_map_get_bucket(res_map, (char *)global_xid, sizeof(drc_global_xid_t));
drc_del_xa_part_list(xa_res);
cm_bilist_del(&xa_res->node, &bucket->bucket_list);
drc_res_pool_free_item(&res_map->res_pool, (char *)xa_res);
}
int32 drc_delete_xa_res(drc_global_xid_t *global_xid, bool32 check_xa_drc)
{
drc_global_xa_res_t *xa_res = NULL;
drc_global_res_map_t *xa_res_map = drc_get_global_res_map(DRC_RES_GLOBAL_XA_TYPE);
drc_res_map_t *res_map = &xa_res_map->res_map;
drc_res_bucket_t *bucket = drc_res_map_get_bucket(res_map, (char *)global_xid, sizeof(drc_global_xid_t));
int32 ret = drc_enter_xa_res(global_xid, &xa_res, check_xa_drc);
if (ret != DMS_SUCCESS) {
return ret;
}
if (xa_res == NULL) {
LOG_DEBUG_ERR("[%s][drc_delete_xa_res] xa res does not exists", cm_display_resid((char *)global_xid,
DRC_RES_GLOBAL_XA_TYPE));
DMS_THROW_ERROR(ERRNO_DMS_DRC_XA_RES_NOT_EXISTS, cm_display_resid((char *)global_xid, DRC_RES_GLOBAL_XA_TYPE));
drc_leave_xa_res(xa_res_map, bucket);
return ERRNO_DMS_DRC_XA_RES_NOT_EXISTS;
}
xa_res->owner_id = CM_INVALID_ID8;
drc_release_xa_res_map(&xa_res_map->res_map, global_xid, xa_res);
drc_leave_xa_res(xa_res_map, bucket);
LOG_DEBUG_ERR("[DRC][%s][drc_delete_xa_res]: delete xa res success", cm_display_resid((char *)global_xid,
DRC_RES_GLOBAL_XA_TYPE));
return DMS_SUCCESS;
}
void drc_release_xa_by_part(drc_part_list_t *part)
{
drc_global_res_map_t *global_res_map = drc_get_global_res_map(DRC_RES_GLOBAL_XA_TYPE);
drc_res_map_t *res_map = &global_res_map->res_map;
bilist_node_t *node = cm_bilist_head(&part->list);
drc_res_bucket_t *bucket = NULL;
drc_global_xa_res_t *xa_res = NULL;
while (node != NULL) {
xa_res = DRC_RES_NODE_OF(drc_global_xa_res_t, node, part_node);
node = BINODE_NEXT(node);
bucket = drc_res_map_get_bucket(res_map, (char *)&xa_res->xid, sizeof(drc_global_xid_t));
cm_spin_lock(&bucket->lock, NULL);
drc_release_xa_res_map(res_map, &xa_res->xid, xa_res);
cm_spin_unlock(&bucket->lock);
}
}