* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* knl_sequence.c
*
*
* IDENTIFICATION
* src/kernel/sequence/knl_sequence.c
*
* -------------------------------------------------------------------------
*/
#include "knl_common_module.h"
#include "knl_sequence.h"
#include "dc_seq.h"
#include "dc_log.h"
#include "knl_context.h"
#include "dtc_dls.h"
#define SEQ_MINVAL_COLUMN_ID 3
#define SEQ_MAXVAL_COLUMN_ID 4
#define SEQ_STEP_COLUMN_ID 5
#define SEQ_CACHESIZE_COLUMN_ID 6
#define SEQ_CYCLE_FLAG_COLUMN_ID 7
#define SEQ_ORDER_FALG_COLUMN_ID 8
#define SEQ_ORG_SCN_COLUMN_ID 9
#define SEQ_CHG_SCN_COLUMN_ID 10
#define SEQ_LASTVAL_COLUMN_ID 11
#define SEQ_COLUMN_COUNTS 12
* create sequence
* @param[in] session - knl_session_t
* @param[in] def - pointer to the definition of sequence
* @return
* - OG_SUCCESS
* - OG_ERROR
* @note
*
*/
status_t db_create_sequence(knl_session_t *session, knl_handle_t stmt, knl_sequence_def_t *def)
{
uint32 max_size;
row_assist_t ra;
knl_cursor_t *cursor = NULL;
text_t txt_seq;
sequence_desc_t desc;
dc_user_t *user = NULL;
rd_seq_t redo;
errno_t ret;
if (!dc_get_user_id(session, &def->user, &desc.uid)) {
OG_THROW_ERROR(ERR_USER_NOT_EXIST, T2S(&def->user));
return OG_ERROR;
}
txt_seq = def->name;
(void)cm_text2str(&txt_seq, desc.name, OG_MAX_NAME_LEN + 1);
desc.is_cache = def->nocache ? 0 : 1;
desc.is_cyclable = def->is_cycle;
desc.is_order = def->is_order;
desc.step = def->step;
desc.org_scn = db_inc_scn(session);
desc.chg_scn = desc.org_scn;
desc.minval = def->min_value;
desc.maxval = def->max_value;
desc.cache = (uint64)(desc.is_cache ? def->cache : 0);
#ifdef OG_RAC_ING
desc.dist_data = def->dist_data;
#endif
if (dc_alloc_seq_entry(session, &desc) != OG_SUCCESS) {
return OG_ERROR;
}
CM_SAVE_STACK(session->stack);
cursor = knl_push_cursor(session);
max_size = session->kernel->attr.max_row_size;
row_init(&ra, cursor->buf, max_size, SYS_SEQUENCE_COLUMN_COUNT);
(void)row_put_int32(&ra, desc.uid);
(void)row_put_int32(&ra, desc.id);
(void)row_put_str(&ra, desc.name);
(void)row_put_int64(&ra, desc.minval);
(void)row_put_int64(&ra, desc.maxval);
(void)row_put_int64(&ra, desc.step);
(void)row_put_int64(&ra, desc.cache);
(void)row_put_int32(&ra, desc.is_cyclable);
(void)row_put_int32(&ra, desc.is_order);
(void)row_put_int64(&ra, desc.org_scn);
(void)row_put_int64(&ra, desc.chg_scn);
(void)row_put_int64(&ra, def->start);
#ifdef OG_RAC_ING
(void)row_put_bin(&ra, &desc.dist_data);
#endif
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_INSERT, SYS_SEQ_ID, SYS_SEQ001_ID);
log_add_lrep_ddl_begin(session);
if (knl_internal_insert(session, cursor) != OG_SUCCESS) {
log_add_lrep_ddl_end(session);
if (dc_open_user_by_id(session, desc.uid, &user) != OG_SUCCESS) {
knl_panic_log(0, "[SEQUENCE] ABORT INFO: open user failed while drop sequence");
}
sequence_entry_t *entry = DC_GET_SEQ_ENTRY(user, desc.id);
dc_sequence_drop(session, entry);
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
redo.op_type = RD_CREATE_SEQUENCE;
redo.id = desc.id;
redo.uid = desc.uid;
ret = strcpy_sp(redo.seq_name, OG_NAME_BUFFER_SIZE, desc.name);
knl_securec_check(ret);
log_put(session, RD_LOGIC_OPERATION, &redo, sizeof(rd_seq_t), LOG_ENTRY_FLAG_NONE);
log_add_lrep_ddl_info(session, stmt, LOGIC_OP_SEQUNCE, RD_CREATE_SEQUENCE, NULL);
log_add_lrep_ddl_end(session);
CM_RESTORE_STACK(session->stack);
return OG_SUCCESS;
}
static status_t rd_dc_create_sequence(knl_session_t *session, dc_user_t *user, text_t *seq_name)
{
knl_cursor_t *cursor = NULL;
sequence_desc_t desc;
sequence_entry_t *entry = NULL;
errno_t err;
CM_SAVE_STACK(session->stack);
cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_SEQ_ID, SYS_SEQ001_ID);
knl_init_index_scan(cursor, OG_TRUE);
knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key, OG_TYPE_INTEGER, (void *)&user->desc.id,
sizeof(uint32), IX_COL_SYS_SEQ001_UID);
knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key, OG_TYPE_STRING, (void *)seq_name->str,
seq_name->len, IX_COL_SYS_SEQ001_NAME);
if (knl_fetch(session, cursor) != OG_SUCCESS) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
if (cursor->eof) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
dc_convert_seq_desc(cursor, &desc);
dls_spin_lock(session, &user->lock, NULL);
if (dc_create_sequence_entry(session, user, desc.id, &entry) != OG_SUCCESS) {
dls_spin_unlock(session, &user->lock);
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
err = memcpy_sp(entry->name, OG_NAME_BUFFER_SIZE, desc.name, OG_MAX_NAME_LEN + 1);
knl_securec_check(err);
dc_insert_into_seqindex(user, entry);
if (desc.id >= user->sequence_set.sequence_hwm) {
user->sequence_set.sequence_hwm = desc.id + 1;
}
dls_spin_unlock(session, &user->lock);
CM_RESTORE_STACK(session->stack);
return OG_SUCCESS;
}
void rd_create_sequence(knl_session_t *session, log_entry_t *log)
{
if (log->size != CM_ALIGN4(sizeof(rd_seq_t)) + LOG_ENTRY_SIZE) {
OG_LOG_RUN_ERR("[SEQ] no need to replay create sequence, log size %u is wrong", log->size);
return;
}
rd_seq_t *rd = (rd_seq_t *)log->data;
rd->seq_name[OG_NAME_BUFFER_SIZE - 1] = 0;
rd->user_name[OG_NAME_BUFFER_SIZE - 1] = 0;
dc_user_t *user = NULL;
knl_dictionary_t dc;
text_t seq_name;
if (dc_open_user_by_id(session, rd->uid, &user) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[SEQ] failed to replay create sequence,user id %u doesn't exists", rd->uid);
rd_check_dc_replay_err(session);
return;
}
cm_str2text(rd->seq_name, &seq_name);
if (dc_init_sequence_set(session, user) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[SEQ] failed to replay create sequence");
rd_check_dc_replay_err(session);
return;
}
if (dc_seq_find(session, user, &seq_name, &dc)) {
OG_LOG_RUN_INF("[SEQ] no need to replay create sequence,sequence already exists");
return;
}
if (rd_dc_create_sequence(session, user, &seq_name) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[SEQ] failed to create sequence");
rd_check_dc_replay_err(session);
}
}
void print_create_sequence(log_entry_t *log)
{
rd_seq_t *rd = (rd_seq_t *)log->data;
printf("create sequence uid:%u,seq_name:%s\n", rd->uid, rd->seq_name);
}
status_t db_get_seq_dist_data(knl_session_t *session, text_t *user, text_t *name, binary_t **dist_data)
{
knl_dictionary_t dc;
dc_sequence_t *sequence = NULL;
if (OG_SUCCESS != dc_seq_open(session, user, name, &dc)) {
return OG_ERROR;
}
sequence = (dc_sequence_t *)dc.handle;
*dist_data = &sequence->dist_data;
dc_seq_close(&dc);
return OG_SUCCESS;
}
status_t db_get_sequence_id(knl_session_t *session, text_t *user, text_t *name, uint32 *id)
{
knl_dictionary_t dc;
dc_sequence_t *sequence = NULL;
if (OG_SUCCESS != dc_seq_open(session, user, name, &dc)) {
return OG_ERROR;
}
sequence = (dc_sequence_t *)dc.handle;
*id = sequence->id;
dc_seq_close(&dc);
return OG_SUCCESS;
}
status_t db_set_cn_seq_currval(knl_session_t *session, text_t *user, text_t *name, int64 nextval)
{
dc_sequence_t *sequence = NULL;
sequence_entry_t *entry = NULL;
dc_currval_t *currval = NULL;
knl_dictionary_t dc_seq;
if (OG_SUCCESS != dc_seq_open(session, user, name, &dc_seq)) {
return OG_ERROR;
}
sequence = (dc_sequence_t *)dc_seq.handle;
entry = sequence->entry;
dls_spin_lock(session, &entry->lock, NULL);
if (!sequence->valid || entry->org_scn > DB_CURR_SCN(session)) {
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
OG_THROW_ERROR(ERR_SEQ_NOT_EXIST, T2S(user), T2S_EX(name));
return OG_ERROR;
}
if (sequence->currvals[session->id / DC_GROUP_CURRVAL_COUNT] == NULL) {
if (dc_alloc_mem(&session->kernel->dc_ctx, entry->entity->memory, OG_SHARED_PAGE_SIZE,
(void **)&sequence->currvals[session->id / DC_GROUP_CURRVAL_COUNT]) != OG_SUCCESS) {
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
return OG_ERROR;
}
}
currval = DC_GET_SEQ_CURRVAL(sequence, session->id);
currval->data = nextval;
currval->serial_id = session->serial_id;
sequence->lastval = nextval;
sequence->rsv_nextval = nextval;
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
return OG_SUCCESS;
}
status_t db_current_seq_value(knl_session_t *session, text_t *user, text_t *name, int64 *currval)
{
knl_dictionary_t dc;
dc_sequence_t *sequence = NULL;
dc_currval_t *dc_currval = NULL;
if (OG_SUCCESS != dc_seq_open(session, user, name, &dc)) {
return OG_ERROR;
}
sequence = (dc_sequence_t *)dc.handle;
dls_spin_lock(session, &sequence->entry->lock, NULL);
if (!sequence->valid || sequence->entry->org_scn > DB_CURR_SCN(session)) {
dls_spin_unlock(session, &sequence->entry->lock);
dc_seq_close(&dc);
OG_THROW_ERROR(ERR_SEQ_NOT_EXIST, T2S(user), T2S_EX(name));
return OG_ERROR;
}
if (sequence->currvals[session->id / DC_GROUP_CURRVAL_COUNT] == NULL) {
OG_THROW_ERROR(ERR_SEQ_INVALID, "sequence CURRVAL is not defined in this session");
dls_spin_unlock(session, &sequence->entry->lock);
dc_seq_close(&dc);
return OG_ERROR;
}
dc_currval = DC_GET_SEQ_CURRVAL(sequence, session->id);
if (dc_currval->serial_id != session->serial_id) {
OG_THROW_ERROR(ERR_SEQ_INVALID, "sequence CURRVAL is not defined in this session");
dls_spin_unlock(session, &sequence->entry->lock);
dc_seq_close(&dc);
return OG_ERROR;
}
*currval = dc_currval->data;
dls_spin_unlock(session, &sequence->entry->lock);
dc_seq_close(&dc);
return OG_SUCCESS;
}
* generate the nextval of the sequence
* @param[in] sequence in dc
* @param[out] sequence in dc
* @return
* - OG_SUCCESS
* - OG_ERROR
*/
static status_t seq_get_next_value(dc_sequence_t *sequence)
{
int64 res;
if (sequence->lastval > sequence->maxval || sequence->lastval < sequence->minval) {
if (sequence->is_cyclable == OG_FALSE) {
if (sequence->step > 0) {
OG_THROW_ERROR(ERR_SEQ_INVALID, "sequence NEXTVAL exceeds MAXVALUE");
} else {
OG_THROW_ERROR(ERR_SEQ_INVALID, "sequence NEXTVAL goes below MINVALUE");
}
return OG_ERROR;
}
sequence->lastval = sequence->step > 0 ? sequence->minval : sequence->maxval;
}
if (opr_int64add_overflow(sequence->rsv_nextval, sequence->step, &res)) {
if (sequence->step > 0) {
OG_THROW_ERROR(ERR_SEQ_INVALID, "sequence NEXTVAL exceeds MAXVALUE");
} else {
OG_THROW_ERROR(ERR_SEQ_INVALID, "sequence NEXTVAL goes below MINVALUE");
}
return OG_ERROR;
}
sequence->rsv_nextval = sequence->lastval;
sequence->lastval += sequence->step;
return OG_SUCCESS;
}
static bool32 db_reach_sequence_boundary(dc_sequence_t *sequence, uint32 batch_count,
int64 *end_val, uint32 group_order, uint32 group_cnt)
{
int64 val;
val = sequence->lastval + batch_count * sequence->step;
if ((sequence->step > 0 && val < sequence->maxval) ||
(sequence->step < 0 && val > sequence->minval)) {
sequence->lastval = val;
*end_val = val - sequence->step;
return OG_FALSE;
}
*end_val = sequence->step > 0 ? sequence->maxval : sequence->minval;
if (sequence->is_cyclable) {
sequence->lastval = ((sequence->step > 0) ? sequence->minval : sequence->maxval);
return OG_TRUE;
}
if (sequence->step > 0) {
sequence->lastval += ((sequence->maxval - sequence->lastval) / sequence->step + 1) * sequence->step;
} else if (sequence->step < 0) {
sequence->lastval += ((sequence->minval - sequence->lastval) / sequence->step + 1) * sequence->step;
}
return OG_TRUE;
}
* generate value of last_number which will be recoreded in sequence$
* @param[in] sequence - dictionary of sequence
* @param[out] last_number - value of last_number
* @return void
*/
static void seq_generate_last_number(dc_sequence_t *sequence, int64 *last_number)
{
int64 val;
int64 cache_size;
cache_size = sequence->cache_size > 0 ? sequence->cache_size : 1;
val = *last_number + cache_size * sequence->step;
if ((sequence->step > 0 && val <= sequence->maxval) ||
(sequence->step < 0 && val >= sequence->minval)) {
*last_number = val;
return;
}
knl_panic(sequence->step != 0);
if (sequence->is_cyclable) {
val = sequence->step > 0 ? sequence->maxval : sequence->minval;
val = (cache_size - cm_abs64((val - *last_number) / sequence->step) - 1) * sequence->step;
*last_number = (sequence->step > 0 ? sequence->minval : sequence->maxval) + val;
return;
}
if (sequence->step > 0) {
*last_number += ((sequence->maxval - *last_number) / sequence->step + 1) * sequence->step;
} else if (sequence->step < 0) {
*last_number += ((sequence->minval - *last_number) / sequence->step + 1) * sequence->step;
}
return;
}
* fetch description of specified sequence from sequence$
* @param[in] session - kernel reserved session for handle sequences
* @param[in] cursor - kernel of the cursor
* @param[in] uid - user id
* @param[in] name - name of sequence
* @param[out] cursor
* @return
* - OG_SUCCESS
* - OG_ERROR
*/
static status_t db_fetch_seq_description(knl_session_t *session, knl_cursor_t *cursor, uint32 uid, text_t *name)
{
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_UPDATE, SYS_SEQ_ID, 0);
knl_init_index_scan(cursor, OG_TRUE);
knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key, OG_TYPE_INTEGER, &uid, sizeof(uint32), 0);
knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key, OG_TYPE_STRING, name->str, name->len, 1);
if (OG_SUCCESS != knl_fetch(session, cursor)) {
return OG_ERROR;
}
if (cursor->eof) {
OG_THROW_ERROR(ERR_SEQ_INVALID, "sequence does not exist");
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t seq_generate_cache(knl_session_t *session, dc_sequence_t *sequence, text_t *name)
{
uint16 size;
int64 lastval;
row_assist_t ra;
knl_cursor_t *cursor = NULL;
if (++sequence->cache_pos >= sequence->cache_size) {
if (knl_begin_auton_rm(session) != OG_SUCCESS) {
return OG_ERROR;
}
CM_SAVE_STACK(session->stack);
cursor = knl_push_cursor(session);
if (OG_SUCCESS != db_fetch_seq_description(session, cursor, sequence->uid, name)) {
CM_RESTORE_STACK(session->stack);
knl_end_auton_rm(session, OG_ERROR);
return OG_ERROR;
}
lastval = *(int64 *)CURSOR_COLUMN_DATA(cursor, SEQ_LASTVAL_COLUMN_ID);
sequence->lastval = lastval;
(void)seq_generate_last_number(sequence, &lastval);
row_init(&ra, cursor->update_info.data, HEAP_MAX_ROW_SIZE(session), 1);
(void)row_put_int64(&ra, lastval);
cursor->update_info.count = 1;
cursor->update_info.columns[0] = SEQ_LASTVAL_COLUMN_ID;
cm_decode_row(cursor->update_info.data, cursor->update_info.offsets, cursor->update_info.lens, &size);
if (OG_SUCCESS != knl_internal_update(session, cursor)) {
CM_RESTORE_STACK(session->stack);
knl_end_auton_rm(session, OG_ERROR);
return OG_ERROR;
}
sequence->cache_pos = 0;
CM_RESTORE_STACK(session->stack);
knl_end_auton_rm(session, OG_SUCCESS);
}
return OG_SUCCESS;
}
status_t db_next_seq_value(knl_session_t *session, text_t *user, text_t *name, int64 *nextval)
{
dc_sequence_t *sequence = NULL;
sequence_entry_t *entry = NULL;
dc_currval_t *currval = NULL;
knl_dictionary_t dc_seq;
if (OG_SUCCESS != dc_seq_open(session, user, name, &dc_seq)) {
return OG_ERROR;
}
sequence = (dc_sequence_t *)dc_seq.handle;
entry = sequence->entry;
dls_spin_lock(session, &entry->lock, NULL);
if (!sequence->valid || entry->org_scn > DB_CURR_SCN(session)) {
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
OG_THROW_ERROR(ERR_SEQ_NOT_EXIST, T2S(user), T2S_EX(name));
return OG_ERROR;
}
if (seq_generate_cache(session, sequence, name) != OG_SUCCESS) {
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
return OG_ERROR;
}
if (OG_SUCCESS != seq_get_next_value(sequence)) {
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
return OG_ERROR;
}
*nextval = sequence->rsv_nextval;
if (sequence->currvals[session->id / DC_GROUP_CURRVAL_COUNT] == NULL) {
if (dc_alloc_mem(&session->kernel->dc_ctx, entry->entity->memory, OG_SHARED_PAGE_SIZE,
(void **)&sequence->currvals[session->id / DC_GROUP_CURRVAL_COUNT]) != OG_SUCCESS) {
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
return OG_ERROR;
}
}
currval = DC_GET_SEQ_CURRVAL(sequence, session->id);
currval->data = *nextval;
currval->serial_id = session->serial_id;
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
return OG_SUCCESS;
}
status_t db_get_nextval_for_cn(knl_session_t *session, text_t *user, text_t *name, int64 *value)
{
dc_sequence_t *sequence = NULL;
sequence_entry_t *entry = NULL;
knl_dictionary_t dc_seq;
if (OG_SUCCESS != dc_seq_open(session, user, name, &dc_seq)) {
return OG_ERROR;
}
sequence = (dc_sequence_t *)dc_seq.handle;
entry = sequence->entry;
dls_spin_lock(session, &entry->lock, NULL);
if (!sequence->valid || entry->org_scn > DB_CURR_SCN(session)) {
OG_THROW_ERROR(ERR_SEQ_NOT_EXIST, T2S(user), T2S_EX(name));
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
return OG_ERROR;
}
if (seq_generate_cache(session, sequence, name) != OG_SUCCESS) {
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
return OG_ERROR;
}
*value = sequence->lastval;
sequence->rsv_nextval = sequence->lastval;
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
return OG_SUCCESS;
}
status_t db_multi_seq_value(knl_session_t *session, knl_sequence_def_t *def,
uint32 group_order, uint32 group_cnt, uint32 count)
{
dc_sequence_t *sequence = NULL;
sequence_entry_t *entry = NULL;
knl_dictionary_t dc_seq;
uint32 remain_size = count;
uint32 batch_count;
bool32 is_first = OG_TRUE;
int64 start_val = 0;
int64 end_val = 0;
bool32 allow_cycle = OG_TRUE;
if (OG_SUCCESS != dc_seq_open(session, &def->user, &def->name, &dc_seq)) {
return OG_ERROR;
}
sequence = (dc_sequence_t *)dc_seq.handle;
entry = sequence->entry;
dls_spin_lock(session, &entry->lock, NULL);
if (!sequence->valid || entry->org_scn > DB_CURR_SCN(session)) {
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
OG_THROW_ERROR(ERR_SEQ_NOT_EXIST, T2S(&def->user), T2S_EX(&def->name));
return OG_ERROR;
}
while (remain_size) {
if (seq_generate_cache(session, sequence, &def->name) != OG_SUCCESS) {
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
return OG_ERROR;
}
batch_count = (uint32)(sequence->cache_size - sequence->cache_pos);
batch_count = (batch_count > 0) ? batch_count : 1;
batch_count = (remain_size > batch_count) ? batch_count : remain_size;
if (sequence->lastval > sequence->maxval || sequence->lastval < sequence->minval) {
if (sequence->is_cyclable == OG_FALSE) {
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
OG_THROW_ERROR(ERR_SEQ_INVALID, "sequence exceeds MINVALUE or MAXVALUE");
return OG_ERROR;
}
}
if (is_first) {
start_val = sequence->lastval;
is_first = OG_FALSE;
}
if (db_reach_sequence_boundary(sequence, batch_count, &end_val, group_order, group_cnt)) {
if (sequence->is_cyclable && allow_cycle) {
allow_cycle = OG_FALSE;
if ((sequence->step > 0 && start_val > end_val) || (sequence->step < 0 && start_val < end_val)) {
if (sequence->step > 0) {
start_val = sequence->minval;
} else {
start_val = sequence->maxval;
}
continue;
}
}
break;
}
sequence->cache_pos += batch_count - 1;
remain_size -= batch_count;
}
sequence->rsv_nextval = sequence->lastval;
def->step = sequence->step;
def->start = start_val;
def->max_value = end_val;
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
return OG_SUCCESS;
}
* implement of drop sequence
* @param[in] session - user session
* @param[in] user - username
* @param[in] name - sequence name
* @return
* - OG_SUCCESS
* - OG_ERROR
*/
status_t db_drop_sequence(knl_session_t *session, knl_handle_t stmt, knl_dictionary_t *dc, bool32 *exists)
{
knl_cursor_t *cursor = NULL;
text_t name;
dc_sequence_t *seq = (dc_sequence_t *)dc->handle;
rd_seq_t redo;
errno_t ret;
obj_info_t obj_addr;
cm_str2text(seq->name, &name);
knl_set_session_scn(session, OG_INVALID_ID64);
CM_SAVE_STACK(session->stack);
cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_DELETE, SYS_SEQ_ID, 0);
knl_init_index_scan(cursor, OG_TRUE);
knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key, OG_TYPE_INTEGER, &seq->uid, sizeof(uint32),
0);
knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key, OG_TYPE_STRING, name.str, name.len, 1);
if (OG_SUCCESS != knl_fetch(session, cursor)) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
if (cursor->eof) {
*exists = OG_FALSE;
CM_RESTORE_STACK(session->stack);
return OG_SUCCESS;
}
*exists = OG_TRUE;
log_add_lrep_ddl_begin(session);
if (OG_SUCCESS != knl_internal_delete(session, cursor)) {
log_add_lrep_ddl_end(session);
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
if (OG_SUCCESS != db_drop_object_privs(session, seq->uid, seq->name, OBJ_TYPE_SEQUENCE)) {
log_add_lrep_ddl_end(session);
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
redo.op_type = RD_DROP_SEQUENCE;
redo.id = seq->id;
redo.uid = seq->uid;
ret = strcpy_sp(redo.user_name, OG_NAME_BUFFER_SIZE, seq->entry->user->desc.name);
knl_securec_check(ret);
ret = strcpy_sp(redo.seq_name, OG_NAME_BUFFER_SIZE, seq->name);
knl_securec_check(ret);
log_put(session, RD_LOGIC_OPERATION, &redo, sizeof(rd_seq_t), LOG_ENTRY_FLAG_NONE);
obj_addr.oid = dc->oid;
obj_addr.uid = dc->uid;
obj_addr.tid = OBJ_TYPE_SEQUENCE;
if (g_knl_callback.update_depender(session, &obj_addr) != OG_SUCCESS) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
log_add_lrep_ddl_info(session, stmt, LOGIC_OP_SEQUNCE, RD_DROP_SEQUENCE, NULL);
log_add_lrep_ddl_end(session);
knl_commit(session);
dc_drop_object_privs(&session->kernel->dc_ctx, seq->uid, seq->name, OBJ_TYPE_SEQUENCE);
dc_sequence_drop(session, seq->entry);
CM_RESTORE_STACK(session->stack);
return OG_SUCCESS;
}
void print_drop_sequence(log_entry_t *log)
{
rd_seq_t *rd = (rd_seq_t *)log->data;
printf("drop sequence uid:%u,seq_name:%s\n", rd->uid, rd->seq_name);
}
static void db_alter_seq_update_row(knl_session_t *session, knl_sequence_def_t *def, knl_cursor_t *cursor,
dc_sequence_t *sequence)
{
row_assist_t ra;
bool32 lastnum_changed = OG_FALSE;
bool32 lastnum_set = OG_FALSE;
int64 old_step = sequence->step;
int64 new_lastval = sequence->lastval;
uint16 update_cols = 0;
uint16 size;
row_init(&ra, cursor->update_info.data, HEAP_MAX_ROW_SIZE(session), SEQ_COLUMN_COUNTS);
if (def->is_minval_set) {
cursor->update_info.columns[update_cols++] = SEQ_MINVAL_COLUMN_ID;
(void)row_put_int64(&ra, def->min_value);
sequence->minval = def->min_value;
}
if (def->is_maxval_set) {
cursor->update_info.columns[update_cols++] = SEQ_MAXVAL_COLUMN_ID;
(void)row_put_int64(&ra, def->max_value);
sequence->maxval = def->max_value;
}
if (def->is_step_set) {
sequence->step = def->step;
cursor->update_info.columns[update_cols++] = SEQ_STEP_COLUMN_ID;
(void)row_put_int64(&ra, def->step);
lastnum_changed = OG_TRUE;
}
if (def->is_cache_set) {
cursor->update_info.columns[update_cols++] = SEQ_CACHESIZE_COLUMN_ID;
(void)row_put_int64(&ra, def->cache);
sequence->cache_size = def->cache;
lastnum_changed = OG_TRUE;
}
if (def->is_cycle != sequence->is_cyclable) {
cursor->update_info.columns[update_cols++] = SEQ_CYCLE_FLAG_COLUMN_ID;
(void)row_put_int32(&ra, def->is_cycle);
sequence->is_cyclable = def->is_cycle;
}
if (def->is_order != sequence->is_order) {
cursor->update_info.columns[update_cols++] = SEQ_ORDER_FALG_COLUMN_ID;
(void)row_put_int32(&ra, def->is_order);
sequence->is_order = def->is_order;
}
cursor->update_info.columns[update_cols++] = SEQ_CHG_SCN_COLUMN_ID;
(void)row_put_int64(&ra, db_inc_scn(session));
if (lastnum_changed) {
new_lastval += def->step - old_step;
sequence->step = def->step;
lastnum_set = OG_TRUE;
sequence->cache_pos = sequence->cache_size;
}
if (def->is_restart_set) {
new_lastval = def->start;
sequence->rsv_nextval = def->start;
sequence->cache_pos = sequence->cache_size;
lastnum_set = OG_TRUE;
}
if (lastnum_set) {
cursor->update_info.columns[update_cols++] = SEQ_LASTVAL_COLUMN_ID;
(void)row_put_int64(&ra, new_lastval);
sequence->lastval = new_lastval;
}
cursor->update_info.count = update_cols;
ra.head->column_count = update_cols;
cm_decode_row(cursor->update_info.data, cursor->update_info.offsets, cursor->update_info.lens, &size);
}
status_t db_alter_sequence(knl_session_t *session, knl_handle_t stmt, knl_sequence_def_t *def)
{
knl_dictionary_t dc_seq;
knl_cursor_t *cursor = NULL;
dc_sequence_t *sequence = NULL;
sequence_entry_t *entry = NULL;
rd_seq_t redo;
if (!def->is_option_set) {
OG_THROW_ERROR(ERR_SEQ_INVALID, "no options specified for alter sequence");
return OG_ERROR;
}
if (OG_SUCCESS != dc_seq_open(session, &def->user, &def->name, &dc_seq)) {
return OG_ERROR;
}
sequence = (dc_sequence_t *)dc_seq.handle;
entry = sequence->entry;
dls_spin_lock(session, &entry->lock, NULL);
if (!sequence->valid || entry->org_scn > DB_CURR_SCN(session)) {
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
OG_THROW_ERROR(ERR_SEQ_NOT_EXIST, T2S(&def->user), T2S_EX(&def->name));
return OG_ERROR;
}
CM_SAVE_STACK(session->stack);
cursor = knl_push_cursor(session);
if (OG_SUCCESS != db_fetch_seq_description(session, cursor, sequence->uid, &def->name)) {
dls_spin_unlock(session, &entry->lock);
CM_RESTORE_STACK(session->stack);
dc_seq_close(&dc_seq);
return OG_ERROR;
}
db_alter_seq_update_row(session, def, cursor, sequence);
log_add_lrep_ddl_begin(session);
if (OG_SUCCESS != knl_internal_update(session, cursor)) {
log_add_lrep_ddl_end(session);
dls_spin_unlock(session, &entry->lock);
CM_RESTORE_STACK(session->stack);
dc_seq_close(&dc_seq);
return OG_ERROR;
}
redo.op_type = RD_ALTER_SEQUENCE;
redo.uid = dc_seq.uid;
redo.id = dc_seq.oid;
log_put(session, RD_LOGIC_OPERATION, &redo, sizeof(rd_seq_t), LOG_ENTRY_FLAG_NONE);
log_add_lrep_ddl_info(session, stmt, LOGIC_OP_SEQUNCE, RD_ALTER_SEQUENCE, NULL);
log_add_lrep_ddl_end(session);
knl_commit(session);
dls_spin_unlock(session, &entry->lock);
CM_RESTORE_STACK(session->stack);
dc_seq_close(&dc_seq);
return OG_SUCCESS;
}
static status_t seq_update_seq_nextval(knl_session_t *session, dc_sequence_t *sequence, text_t *name, int64 value)
{
uint16 size;
row_assist_t ra;
knl_cursor_t *cursor = NULL;
if (knl_begin_auton_rm(session) != OG_SUCCESS) {
return OG_ERROR;
}
CM_SAVE_STACK(session->stack);
cursor = knl_push_cursor(session);
if (OG_SUCCESS != db_fetch_seq_description(session, cursor, sequence->uid, name)) {
CM_RESTORE_STACK(session->stack);
knl_end_auton_rm(session, OG_ERROR);
return OG_ERROR;
}
row_init(&ra, cursor->update_info.data, HEAP_MAX_ROW_SIZE(session), 1);
(void)row_put_int64(&ra, value);
cursor->update_info.count = 1;
cursor->update_info.columns[0] = SEQ_LASTVAL_COLUMN_ID;
cm_decode_row(cursor->update_info.data, cursor->update_info.offsets, cursor->update_info.lens, &size);
if (OG_SUCCESS != knl_internal_update(session, cursor)) {
CM_RESTORE_STACK(session->stack);
knl_end_auton_rm(session, OG_ERROR);
return OG_ERROR;
}
sequence->lastval = value;
sequence->cache_pos = sequence->cache_size;
CM_RESTORE_STACK(session->stack);
knl_end_auton_rm(session, OG_SUCCESS);
return OG_SUCCESS;
}
status_t db_get_seq_def(knl_session_t *session, text_t *user, text_t *name, knl_sequence_def_t *def)
{
dc_sequence_t *sequence = NULL;
sequence_entry_t *entry = NULL;
knl_dictionary_t dc_seq;
if (OG_SUCCESS != dc_seq_open(session, user, name, &dc_seq)) {
return OG_ERROR;
}
sequence = (dc_sequence_t *)dc_seq.handle;
entry = sequence->entry;
dls_spin_lock(session, &entry->lock, NULL);
if (!sequence->valid || entry->org_scn > DB_CURR_SCN(session)) {
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
OG_THROW_ERROR(ERR_SEQ_NOT_EXIST, T2S(user), T2S_EX(name));
return OG_ERROR;
}
def->step = sequence->step;
def->max_value = sequence->maxval;
def->min_value = sequence->minval;
def->cache = sequence->cache_size;
def->is_cycle = sequence->is_cyclable;
def->start = sequence->lastval;
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
return OG_SUCCESS;
}
status_t db_alter_seq_nextval(knl_session_t *session, knl_sequence_def_t *def, int64 value)
{
dc_sequence_t *sequence = NULL;
sequence_entry_t *entry = NULL;
knl_dictionary_t dc_seq;
text_t *user = &def->user;
text_t *name = &def->name;
if (OG_SUCCESS != dc_seq_open(session, user, name, &dc_seq)) {
return OG_ERROR;
}
sequence = (dc_sequence_t *)dc_seq.handle;
entry = sequence->entry;
dls_spin_lock(session, &entry->lock, NULL);
if (!sequence->valid || entry->org_scn > DB_CURR_SCN(session)) {
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
OG_THROW_ERROR(ERR_SEQ_NOT_EXIST, T2S(user), T2S_EX(name));
return OG_ERROR;
}
if (seq_update_seq_nextval(session, sequence, name, value) != OG_SUCCESS) {
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
return OG_ERROR;
}
dls_spin_unlock(session, &entry->lock);
dc_seq_close(&dc_seq);
return OG_SUCCESS;
}
void print_alter_sequence(log_entry_t *log)
{
rd_seq_t *rd = (rd_seq_t *)log->data;
printf("alter sequence uid:%u,oid:%u\n", rd->uid, rd->id);
}
* implement of drop all sequences owned by user
* @param[in] session - user session
* @param[in] user - username
* @return
* - OG_SUCCESS
* - OG_ERROR
*/
static status_t db_fetch_seq_by_uid(knl_session_t *session, uint32 uid, sequence_desc_t *desc, bool32 *found)
{
knl_cursor_t *cursor = NULL;
text_t seq_name;
CM_SAVE_STACK(session->stack);
cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_SEQ_ID, SYS_SEQ001_ID);
knl_init_index_scan(cursor, OG_FALSE);
knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key, OG_TYPE_INTEGER, &uid, sizeof(uint32), 0);
knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.r_key, OG_TYPE_INTEGER, &uid, sizeof(uint32), 0);
knl_set_key_flag(&cursor->scan_range.l_key, SCAN_KEY_LEFT_INFINITE, 1);
knl_set_key_flag(&cursor->scan_range.r_key, SCAN_KEY_RIGHT_INFINITE, 1);
if (OG_SUCCESS != knl_fetch(session, cursor)) {
CM_RESTORE_STACK(session->stack);
*found = OG_FALSE;
return OG_SUCCESS;
}
if (!cursor->eof) {
seq_name.str = CURSOR_COLUMN_DATA(cursor, SYS_SEQUENCE_COL_NAME);
seq_name.len = CURSOR_COLUMN_SIZE(cursor, SYS_SEQUENCE_COL_NAME);
(void)cm_text2str(&seq_name, desc->name, OG_MAX_NAME_LEN + 1);
*found = OG_TRUE;
} else {
*found = OG_FALSE;
}
CM_RESTORE_STACK(session->stack);
return OG_SUCCESS;
}
* implement of drop all sequences owned by user
* @param[in] session - user session
* @param[in] user - username
* @return
* - OG_SUCCESS
* - OG_ERROR
*/
status_t db_drop_sequence_by_user(knl_session_t *session, text_t *user, uint32 uid)
{
bool32 found = OG_FALSE;
sequence_desc_t desc;
knl_dictionary_t dc;
text_t seq_name;
bool32 seq_exists = OG_FALSE;
if (db_fetch_seq_by_uid(session, uid, &desc, &found) != OG_SUCCESS) {
return OG_ERROR;
}
while (found) {
cm_str2text(desc.name, &seq_name);
if (OG_SUCCESS != dc_seq_open(session, user, &seq_name, &dc)) {
return OG_ERROR;
}
if (db_drop_sequence(session, NULL, &dc, &seq_exists) != OG_SUCCESS) {
dc_seq_close(&dc);
return OG_ERROR;
}
dc_seq_close(&dc);
knl_set_session_scn(session, OG_INVALID_ID64);
if (db_fetch_seq_by_uid(session, uid, &desc, &found) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}