/* -------------------------------------------------------------------------
 *  This file is part of the oGRAC project.
 * Copyright (c) 2024 Huawei Technologies Co.,Ltd.
 *
 * oGRAC is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * ogsql_insert.c
 *
 *
 * IDENTIFICATION
 * src/ogsql/executor/ogsql_insert.c
 *
 * -------------------------------------------------------------------------
 */
#include "ogsql_insert.h"
#include "ogsql_update.h"
#include "ogsql_select.h"
#include "ogsql_proj.h"
#include "srv_instance.h"
#include "ogsql_scan.h"

status_t sql_insert_try_ignore(sql_insert_t *insert_ctx)
{
    int32 err_code;
    const char *err_msg = NULL;
    cm_get_error(&err_code, &err_msg, NULL);
    if ((insert_ctx->syntax_flag & INSERT_IS_IGNORE) && err_code == ERR_DUPLICATE_KEY) {
        cm_reset_error();
        return OG_SUCCESS;
    }
    return OG_ERROR;
}

status_t sql_open_insert_cursor(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_insert_t *ogx)
{
    if (cursor->is_open) {
        sql_close_cursor(stmt, cursor);
    }
    OG_RETURN_IFERR(sql_alloc_table_cursors(cursor, 1));

    if (sql_open_cursor_for_update(stmt, ogx->table, &ogx->ssa, cursor, CURSOR_ACTION_INSERT) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (sql_alloc_knl_cursor(stmt, &cursor->exec_data.ext_knl_cur) != OG_SUCCESS) {
        return OG_ERROR;
    }
    cursor->exec_data.ext_knl_cur->scan_mode = SCAN_MODE_ROWID;

    cursor->cond = NULL;
    cursor->plan = ogx->plan;
    cursor->insert_ctx = ogx;
    return OG_SUCCESS;
}

status_t sql_exec_column_default(sql_stmt_t *stmt, knl_dictionary_t *dc, knl_column_t *column, variant_t *val)
{
    val->type = column->datatype;

    if (column->flags & KNL_COLUMN_FLAG_SERIAL) {
        return sql_get_serial_value(stmt, dc, val);
    }

    if (column->default_expr == NULL || KNL_COLUMN_IS_DELETED(column)) {
        val->type = OG_TYPE_STRING; // default datatype
        val->is_null = OG_TRUE;
        return OG_SUCCESS;
    }

    return sql_exec_expr(stmt, (expr_tree_t *)column->default_expr, val);
}

static status_t sql_insert_get_col_value(sql_stmt_t *stmt, knl_dictionary_t *dc, sql_insert_t *insert,
    sql_cursor_t *cur_select, knl_column_t *knl_col, variant_t *val)
{
    uint32 pair_id;
    column_value_pair_t *value_pair = NULL;
    expr_tree_t *expr = NULL;
    rs_column_t *rs_column = NULL;

    pair_id = insert->col_map[knl_col->id];

    /* insert/replace into noselect */
    if (cur_select == NULL) {
        /* exec replace value */
        if (SECUREC_UNLIKELY(stmt->default_info.default_on == OG_TRUE)) {
            sql_get_default_value(stmt, knl_col->id, val);
            return OG_SUCCESS;
        }

        /* exec insert value */
        value_pair = (column_value_pair_t *)((pair_id == OG_INVALID_ID32) ? NULL : cm_galist_get(insert->pairs,
            pair_id));
        if (value_pair == NULL) {
            return sql_exec_column_default(stmt, dc, knl_col, val);
        }
        stmt->default_column = knl_col;
        expr = (expr_tree_t *)cm_galist_get(value_pair->exprs, stmt->pairs_pos);
        OG_RETURN_IFERR(sql_exec_expr(stmt, expr, val));
    } else {
        /* sql: insert into t1(f1, f2) select f1 from t2;
        f2 is clob or blob, has default value
        */
        if (pair_id == OG_INVALID_ID32) {
            return sql_exec_column_default(stmt, dc, knl_col, val);
        }

        // simply check val_from of value from sub_select
        rs_column = cm_galist_get(insert->select_ctx->first_query->rs_columns, pair_id);
        if (rs_column->type == RS_COL_CALC) {
            OG_RETURN_IFERR(sql_get_rs_value(stmt, cur_select, pair_id, val));
        } else {
            OG_RETURN_IFERR(sql_get_col_rs_value(stmt, cur_select, pair_id, &rs_column->v_col, val));
        }
    }

    if (knl_col->flags & KNL_COLUMN_FLAG_SERIAL) {
        if (val->is_null || (knl_col->datatype == OG_TYPE_INTEGER && val->v_int == 0) ||
            (knl_col->datatype == OG_TYPE_BIGINT && val->v_bigint == 0) ||
            (knl_col->datatype == OG_TYPE_UINT32 && val->v_uint32 == 0)) {
            return sql_get_serial_value(stmt, dc, val);
        }
    }

    return OG_SUCCESS;
}

static status_t sql_calc_part_for_insert(sql_stmt_t *stmt, part_key_t *part_key, uint32 *part_id,
    insert_assist_t *assist)
{
    uint16 col_id;
    uint16 part_keys;
    variant_t value;
    knl_column_t *knl_column = NULL;
    knl_dictionary_t *dc = &assist->insert_ctx->table->entry->dc;

    part_keys = knl_part_key_count(dc->handle);
    part_key_init(part_key, part_keys);

    for (uint16 i = 0; i < part_keys; i++) {
        col_id = knl_part_key_column_id(dc->handle, i);
        knl_column = knl_get_column(dc->handle, col_id);

        OG_RETURN_IFERR(sql_insert_get_col_value(stmt, dc, assist->insert_ctx, assist->cur_select, knl_column, &value));

        OG_RETURN_IFERR(sql_part_put_key(stmt, &value, knl_column->datatype, knl_column->size,
            KNL_COLUMN_IS_CHARACTER(knl_column), knl_column->precision, knl_column->scale, part_key));
    }

    *part_id = knl_locate_part_key(dc->handle, part_key);
    return OG_SUCCESS;
}

static status_t sql_calc_subpart_for_insert(sql_stmt_t *stmt, part_key_t *part_key, uint32 compart_no,
    uint32 *subpart_id, insert_assist_t *assist)
{
    uint16 col_id;
    variant_t value;
    knl_column_t *knl_column = NULL;
    knl_dictionary_t *dc = &assist->insert_ctx->table->entry->dc;

    uint16 part_keys = knl_subpart_key_count(dc->handle);
    part_key_init(part_key, part_keys);

    for (uint16 i = 0; i < part_keys; i++) {
        col_id = knl_subpart_key_column_id(dc->handle, i);
        knl_column = knl_get_column(dc->handle, col_id);

        OG_RETURN_IFERR(sql_insert_get_col_value(stmt, dc, assist->insert_ctx, assist->cur_select, knl_column, &value));

        OG_RETURN_IFERR(sql_part_put_key(stmt, &value, knl_column->datatype, knl_column->size,
            KNL_COLUMN_IS_CHARACTER(knl_column), knl_column->precision, knl_column->scale, part_key));
    }

    *subpart_id = knl_locate_subpart_key(dc->handle, compart_no, part_key);
    return OG_SUCCESS;
}

static void sql_set_partition_nologging_insert(sql_stmt_t *stmt, knl_handle_t dc_entity, knl_cursor_t *knl_cur,
    knl_part_locate_t part_loc)
{
    if (knl_part_nologging_enabled(dc_entity, part_loc) || stmt->session->nologging_enable) {
        if (!DB_IS_SINGLE(&stmt->session->knl_session) ||
            (DB_IS_RCY_CHECK_PCN(&stmt->session->knl_session) && stmt->session->nologging_enable)) {
            OG_LOG_DEBUG_WAR("forbid to nologging load when database in HA mode or \
                when _RCY_CHECK_PCN is TRUE on session_level nologging insert");
            knl_cur->logging = OG_TRUE;
            stmt->session->knl_session.rm->logging = OG_TRUE;
            knl_cur->nologging_type = LOGGING_LEVEL;
            stmt->session->knl_session.rm->nolog_type = LOGGING_LEVEL;
        } else {
            knl_cur->logging = OG_FALSE;
            stmt->session->knl_session.rm->logging = OG_FALSE;
            knl_cur->nologging_type = knl_part_nologging_enabled(dc_entity, part_loc) ? TABLE_LEVEL : SESSION_LEVEL;
            stmt->session->knl_session.rm->nolog_type = knl_cur->nologging_type;
        }
    } else {
        knl_cur->logging = OG_TRUE;
        stmt->session->knl_session.rm->logging = OG_TRUE;
        knl_cur->nologging_type = LOGGING_LEVEL;
        stmt->session->knl_session.rm->nolog_type = LOGGING_LEVEL;
    }
}

status_t sql_route_part_table(sql_stmt_t *stmt, knl_cursor_t *knl_cur, part_key_t *part_key, insert_assist_t *assist)
{
    knl_part_locate_t part_loc = {
        .part_no = OG_INVALID_ID32,
        .subpart_no = OG_INVALID_ID32
    };
    knl_dictionary_t *dc = &assist->insert_ctx->table->entry->dc;

    if (sql_calc_part_for_insert(stmt, part_key, &part_loc.part_no, assist) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (part_loc.part_no == OG_INVALID_ID32) {
        OG_THROW_ERROR(ERR_INVALID_PART_KEY, "inserted partition key does not map to any partition");
        return OG_ERROR;
    }

    if (knl_verify_interval_part(dc->handle, part_loc.part_no) &&
        knl_create_interval_part(&stmt->session->knl_session, dc, part_loc.part_no, part_key) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (knl_is_parent_part(dc->handle, part_loc.part_no)) {
        part_key_t *subpart_key = NULL;
        OGSQL_SAVE_STACK(stmt);
        if (sql_push(stmt, OG_MAX_COLUMN_SIZE, (void **)&subpart_key) != OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            stmt->default_info.default_on = OG_FALSE;
            return OG_ERROR;
        }

        if (sql_calc_subpart_for_insert(stmt, subpart_key, part_loc.part_no, &part_loc.subpart_no, assist) !=
            OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            return OG_ERROR;
        }

        if (part_loc.subpart_no == OG_INVALID_ID32) {
            OG_THROW_ERROR(ERR_INVALID_PART_KEY, "inserted partition key does not map to any subpartition");
            OGSQL_RESTORE_STACK(stmt);
            return OG_ERROR;
        }
        OGSQL_RESTORE_STACK(stmt);
    }

    knl_set_table_part(knl_cur, part_loc);
    sql_set_partition_nologging_insert(stmt, dc->handle, knl_cur, part_loc);
    return OG_SUCCESS;
}

static inline bool32 sql_try_get_part_key_value(knl_part_key_t *decode_key, sql_insert_t *insert, knl_column_t *knl_col,
    variant_t *value)
{
    char *ptr = NULL;
    uint16 len;
    uint16 id;

    OG_RETVALUE_IFTRUE(!knl_is_part_table(insert->table->entry->dc.handle), OG_FALSE);

    id = insert->part_key_map[knl_col->id];
    if (id == OG_INVALID_ID16) {
        return OG_FALSE;
    }
    value->is_null = OG_FALSE;
    value->type = knl_col->datatype;

    len = decode_key->decoder.lens[id];
    if (len == PART_KEY_NULL_LEN) {
        value->is_null = OG_TRUE;
        return OG_TRUE;
    }

    ptr = decode_key->decoder.buf + decode_key->decoder.offsets[id];

    switch (knl_col->datatype) {
        case OG_TYPE_UINT32:
            VALUE(uint32, value) = *(uint32 *)ptr;
            break;
        case OG_TYPE_INTEGER:
            VALUE(int32, value) = *(int32 *)ptr;
            break;

        case OG_TYPE_BOOLEAN:
            VALUE(bool32, value) = *(bool32 *)ptr;
            break;

        case OG_TYPE_BIGINT:
        case OG_TYPE_DATE:
        case OG_TYPE_TIMESTAMP:
        case OG_TYPE_TIMESTAMP_TZ_FAKE:
        case OG_TYPE_TIMESTAMP_LTZ:
            VALUE(int64, value) = *(int64 *)ptr;
            break;

        case OG_TYPE_TIMESTAMP_TZ:
            VALUE(timestamp_tz_t, value) = *(timestamp_tz_t *)ptr;
            break;

        case OG_TYPE_INTERVAL_DS:
            VALUE(interval_ds_t, value) = *(interval_ds_t *)ptr;
            break;

        case OG_TYPE_INTERVAL_YM:
            VALUE(interval_ym_t, value) = *(interval_ym_t *)ptr;
            break;

        case OG_TYPE_REAL:
            VALUE(double, value) = *(double *)ptr;
            break;

        case OG_TYPE_NUMBER:
        case OG_TYPE_DECIMAL:
            (void)cm_dec_4_to_8(VALUE_PTR(dec8_t, value), (dec4_t *)ptr, len);
            break;
        case OG_TYPE_NUMBER2:
            (void)cm_dec_2_to_8(VALUE_PTR(dec8_t, value), (const payload_t *)ptr, len);
            break;

        case OG_TYPE_STRING:
        case OG_TYPE_CHAR:
        case OG_TYPE_VARCHAR:
            VALUE_PTR(text_t, value)->str = ptr;
            VALUE_PTR(text_t, value)->len = len;
            break;

        case OG_TYPE_CLOB:
        case OG_TYPE_BLOB:
        case OG_TYPE_IMAGE:
            VALUE_PTR(var_lob_t, value)->type = OG_LOB_FROM_KERNEL;
            VALUE_PTR(var_lob_t, value)->knl_lob.bytes = (uint8 *)ptr;
            VALUE_PTR(var_lob_t, value)->knl_lob.size = len;
            VALUE_PTR(var_lob_t, value)->knl_lob.is_hex_const = OG_FALSE;
            break;

        default:
            VALUE_PTR(binary_t, value)->bytes = (uint8 *)ptr;
            VALUE_PTR(binary_t, value)->size = len;
            VALUE_PTR(binary_t, value)->is_hex_const = OG_FALSE;
            break;
    }
    return OG_TRUE;
}

status_t sql_try_construct_insert_data(sql_stmt_t *stmt, knl_cursor_t *knl_cur, knl_part_key_t *decode_key,
    insert_assist_t *ass)
{
    knl_column_t *knl_column = NULL;
    knl_dictionary_t *dc = &ass->insert_ctx->table->entry->dc;

    knl_column = knl_get_column(dc->handle, ass->col_id);
    if (KNL_COLUMN_IS_DELETED(knl_column)) {
        ass->value.is_null = OG_TRUE;
        if (sql_set_table_value(stmt, knl_cur, &ass->ra, knl_column, &ass->value) != OG_SUCCESS) {
            return OG_ERROR;
        }
        return OG_SUCCESS;
    }

    if (!sql_try_get_part_key_value(decode_key, ass->insert_ctx, knl_column, &ass->value)) {
        if (sql_insert_get_col_value(stmt, dc, ass->insert_ctx, ass->cur_select, knl_column, &ass->value) !=
            OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    if (sql_set_table_value(stmt, knl_cur, &ass->ra, knl_column, &ass->value) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (knl_column->flags & KNL_COLUMN_FLAG_SERIAL) {
        ass->has_serial = OG_TRUE;
        if (ass->value.type == OG_TYPE_BIGINT) {
            ass->max_serial_val = MAX(ass->max_serial_val, ass->value.v_bigint);
            ass->serial_val = ass->value.v_bigint;
        } else if (ass->value.type == OG_TYPE_UINT32) {
            ass->max_serial_val = MAX(ass->max_serial_val, ass->value.v_uint32);
            ass->serial_val = ass->value.v_uint32;
        } else {
            ass->max_serial_val = MAX(ass->max_serial_val, ass->value.v_int);
            ass->serial_val = ass->value.v_int;
        }
    }

    return OG_SUCCESS;
}

void sql_get_default_value(sql_stmt_t *stmt, uint32 col_id, variant_t *res)
{
    *res = stmt->default_info.default_values[col_id];
}

status_t sql_update_default_values(sql_stmt_t *stmt, uint32 col_id, variant_t *val)
{
    variant_t *set_cols_data = &stmt->default_info.default_values[col_id];
    *set_cols_data = *val;

    if (val->is_null) {
        return OG_SUCCESS;
    }

    /* deep copy string buffer */
    if (!sql_var_cankeep(stmt, val)) {
        return var_deep_copy(val, set_cols_data, (var_malloc_t)cm_push, (var_malloc_handle_t *)stmt->session->stack);
    }
    sql_keep_stack_variant(stmt, val);
    return OG_SUCCESS;
}

static status_t sql_init_default_values(sql_stmt_t *stmt, knl_dictionary_t *dc, uint32 column_count)
{
    knl_column_t *knl_column = NULL;
    variant_t value;
    uint32 i;

    OG_RETURN_IFERR(sql_push(stmt, column_count * sizeof(variant_t), (void **)&stmt->default_info.default_values));

    for (i = 0; i < column_count; i++) {
        knl_column = knl_get_column(dc->handle, i);
        OG_RETURN_IFERR(sql_exec_column_default(stmt, dc, knl_column, &value));
        OG_RETURN_IFERR(sql_update_default_values(stmt, i, &value));
    }

    return OG_SUCCESS;
}

static status_t sql_gen_insert_default_values(sql_stmt_t *stmt, sql_insert_t *insert)
{
    uint32 i;
    knl_dictionary_t *dc = &insert->table->entry->dc;
    uint32 col_count = knl_get_column_count(dc->handle);
    uint32 val_count = insert->pairs->count;
    column_value_pair_t *value_pair = NULL;
    expr_tree_t *expr = NULL;
    variant_t value;

    OG_RETURN_IFERR(sql_init_default_values(stmt, dc, col_count));

    for (i = 0; i < val_count; i++) {
        value_pair = (column_value_pair_t *)cm_galist_get(insert->pairs, i);
        stmt->default_column = value_pair->column;
        expr = (expr_tree_t *)cm_galist_get(value_pair->exprs, stmt->pairs_pos);
        OG_RETURN_IFERR(sql_exec_expr(stmt, expr, &value));

        if (value_pair->column->flags & KNL_COLUMN_FLAG_SERIAL) {
            if (value.is_null || (value_pair->column->datatype == OG_TYPE_INTEGER && value.v_int == 0) ||
                (value_pair->column->datatype == OG_TYPE_BIGINT && value.v_bigint == 0) ||
                (value_pair->column->datatype == OG_TYPE_UINT32 && value.v_uint32 == 0)) {
                OG_RETURN_IFERR(sql_get_serial_value(stmt, dc, &value));
            }
        }

        OG_RETURN_IFERR(sql_update_default_values(stmt, value_pair->column->id, &value));
    }

    return OG_SUCCESS;
}

static status_t sql_init_and_gen_default_values(sql_stmt_t *stmt, insert_assist_t *assist, knl_cursor_t *knl_cursor)
{
    stmt->serial_value = 0;
    knl_cursor->vnc_column = NULL;
    knl_cursor->lob_inline_num = 0;

    if (assist->insert_ctx->ret_columns != NULL ||
        (stmt->context->type == OGSQL_TYPE_REPLACE && assist->cur_select == NULL)) {
        stmt->default_info.default_on = OG_TRUE;
        if (sql_gen_insert_default_values(stmt, assist->insert_ctx) != OG_SUCCESS) {
            stmt->default_info.default_on = OG_FALSE;
            return OG_ERROR;
        }
    }

    return OG_SUCCESS;
}

static status_t sql_handle_part_key(sql_stmt_t *stmt, insert_assist_t *assist, knl_cursor_t *knl_cur,
    knl_part_key_t *decode_key)
{
    part_key_t *part_key = NULL;
    knl_dictionary_t *dc = &assist->insert_ctx->table->entry->dc;
    uint32 col_count = knl_get_column_count(dc->handle);

    if (knl_is_part_table(dc->handle)) {
        if (sql_push(stmt, OG_MAX_COLUMN_SIZE, (void **)&part_key) != OG_SUCCESS) {
            stmt->default_info.default_on = OG_FALSE;
            return OG_ERROR;
        }
        if (sql_route_part_table(stmt, knl_cur, part_key, assist) != OG_SUCCESS) {
            stmt->default_info.default_on = OG_FALSE;
            return OG_ERROR;
        }
        knl_decode_part_key(part_key, decode_key);
    }

    bool32 is_csf = knl_is_table_csf(dc->handle, knl_cur->part_loc);
    uint32 max_row_len = knl_table_max_row_len(dc->handle, g_instance->kernel.attr.max_row_size, knl_cur->part_loc);
    cm_row_init(&assist->ra, (char *)knl_cur->row, max_row_len, col_count, is_csf);

    return OG_SUCCESS;
}

status_t sql_generate_insert_data(sql_stmt_t *stmt, knl_cursor_t *knl_cursor, insert_assist_t *assist)
{
    knl_part_key_t decode_key;
    knl_dictionary_t *dc = &assist->insert_ctx->table->entry->dc;
    uint32 col_count = knl_get_column_count(dc->handle);
    uint32 scan_id = 0;
    int32 code;
    const char *msg = NULL;

    /* generate default values by sql order */
    OG_RETURN_IFERR(sql_init_and_gen_default_values(stmt, assist, knl_cursor));

    OGSQL_SAVE_STACK(stmt);
    if (sql_handle_part_key(stmt, assist, knl_cursor, &decode_key) != OG_SUCCESS) {
        OGSQL_RESTORE_STACK(stmt);
        return OG_ERROR;
    }

    while (assist->col_id < col_count) {
        OGSQL_SAVE_STACK(stmt);
        if (sql_try_construct_insert_data(stmt, knl_cursor, &decode_key, assist) != OG_SUCCESS) {
            // all insert information should be putted to row buffer.
            OGSQL_RESTORE_STACK(stmt);

            cm_get_error(&code, &msg, NULL);

            if (code != ERR_ROW_SIZE_TOO_LARGE) {
                OGSQL_RESTORE_STACK(stmt);
                stmt->default_info.default_on = OG_FALSE;
                return OG_ERROR;
            }

            if (knl_is_lob_table(dc) && knl_cursor->lob_inline_num > 0) {
                cm_decode_row((char *)knl_cursor->row, knl_cursor->offsets, knl_cursor->lens, NULL);
                if (knl_reconstruct_lob_row(&stmt->session->knl_session, dc->handle, knl_cursor, &scan_id,
                    assist->col_id) != OG_SUCCESS) {
                    OGSQL_RESTORE_STACK(stmt);
                    stmt->default_info.default_on = OG_FALSE;
                    return OG_ERROR;
                }

                cm_reset_error();
                continue;
            } else {
                OGSQL_RESTORE_STACK(stmt);
                stmt->default_info.default_on = OG_FALSE;
                return OG_ERROR;
            }
        }
        // all insert information should be putted to row buffer.
        OGSQL_RESTORE_STACK(stmt);

        assist->col_id++;
    }

    row_end(&assist->ra);
    cm_decode_row((char *)knl_cursor->row, knl_cursor->offsets, knl_cursor->lens, NULL);
    OGSQL_RESTORE_STACK(stmt);
    stmt->default_info.default_on = OG_FALSE;

    if (assist->has_serial && stmt->pairs_pos == 0) {
        stmt->session->last_insert_id = assist->serial_val;
    }

    return OG_SUCCESS;
}

status_t sql_execute_insert_trigs(sql_stmt_t *stmt, trig_set_t *set, uint32 type, void *knl_cur, void *insert_data)
{
    trig_item_t *trig_item = NULL;
    pl_dc_t pl_dc;

    OGSQL_SAVE_STACK(stmt);
    for (uint32 i = 0; i < set->trig_count; ++i) {
        trig_item = &set->items[i];
        if (!trig_item->trig_enable) {
            continue;
        }

        if ((uint32)trig_item->trig_type != type || (trig_item->trig_event & TRIG_EVENT_INSERT) == 0) {
            continue;
        }

        if (pl_dc_open_trig_by_entry(stmt, &pl_dc, trig_item) != OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            return OG_ERROR;
        }

        if (ple_exec_trigger(stmt, pl_dc.entity, TRIG_EVENT_INSERT, knl_cur, insert_data) != OG_SUCCESS) {
            ple_check_exec_trigger_error(stmt, pl_dc.entity);
            pl_dc_close(&pl_dc);
            OGSQL_RESTORE_STACK(stmt);
            return OG_ERROR;
        }
        pl_dc_close(&pl_dc);
    }

    OGSQL_RESTORE_STACK(stmt);
    return OG_SUCCESS;
}


static status_t sql_fetch_insert_update(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_insert_t *insert_ctx,
    knl_cursor_t *knl_cur, bool32 *is_found)
{
    upd_object_t *upd_obj = NULL;

    // may call sql_match_cond in knl_match_cond, need used current cursor in sql_match_cond
    OG_RETURN_IFERR(SQL_CURSOR_PUSH(stmt, cursor));
    OG_RETURN_IFERR(knl_fetch_by_rowid(KNL_SESSION(stmt), knl_cur, is_found));

    if (!(*is_found)) {
        SQL_CURSOR_POP(stmt);
        return OG_SUCCESS;
    }

    upd_obj = (upd_object_t *)cm_galist_get(insert_ctx->update_ctx->objects, 0);
    OG_RETURN_IFERR(sql_execute_update_table(stmt, cursor, knl_cur, upd_obj));

    cursor->total_rows++;
    SQL_CURSOR_POP(stmt);
    return OG_SUCCESS;
}

status_t sql_execute_insert_update(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_insert_t *insert_ctx,
    knl_dictionary_t *dc, bool32 *is_found)
{
    knl_cursor_t *insert_knl_cur = cursor->tables[0].knl_cur;
    knl_cursor_t *update_knl_cur = cursor->exec_data.ext_knl_cur;
    status_t status;
    errno_t ret;

    update_knl_cur->action = CURSOR_ACTION_UPDATE;
    OG_RETURN_IFERR(knl_open_cursor(KNL_SESSION(stmt), update_knl_cur, dc));
    OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&update_knl_cur->row));
    ret = memset_sp(update_knl_cur->row, OG_MAX_ROW_SIZE, 0, OG_MAX_ROW_SIZE);
    if (ret != EOK) {
        OG_THROW_ERROR(ERR_SYSTEM_CALL, ret);
        OGSQL_POP(stmt);
        return OG_ERROR;
    }

    // set statement ssn when insert on duplicate key and before do insert update
    sql_set_ssn(stmt);

    // insert update only set ssn
    if (dc->type == DICT_TYPE_TEMP_TABLE_SESSION || dc->type == DICT_TYPE_TEMP_TABLE_TRANS) {
        update_knl_cur->ssn = stmt->ssn;
    } else {
        update_knl_cur->ssn = stmt->xact_ssn;
    }
    update_knl_cur->query_scn = insert_knl_cur->query_scn;

    ROWID_COPY(update_knl_cur->rowid, insert_knl_cur->conflict_rid);
    update_knl_cur->insert_info = insert_knl_cur->insert_info;
    cursor->tables[0].knl_cur = update_knl_cur;
    status = sql_fetch_insert_update(stmt, cursor, insert_ctx, update_knl_cur, is_found);
    cursor->tables[0].knl_cur = insert_knl_cur;

    OGSQL_POP(stmt);
    return status;
}


static inline status_t sql_keep_cursor_insert_info(sql_stmt_t *stmt, knl_cursor_t *knl_cursor, sql_insert_t *insert_ctx)
{
    if (SECUREC_LIKELY(insert_ctx->update_ctx == NULL)) {
        return OG_SUCCESS;
    }

    uint32 max_column_count = stmt->session->knl_session.kernel->attr.max_column_count;
    uint32 size = OG_MAX_ROW_SIZE + max_column_count * sizeof(uint16) * 2;
    char *ptr = NULL;

    OG_RETURN_IFERR(sql_push(stmt, size, (void **)&ptr));

    knl_cursor->insert_info.data = ptr;
    MEMS_RETURN_IFERR(memcpy_sp(knl_cursor->insert_info.data, OG_MAX_ROW_SIZE, knl_cursor->row, knl_cursor->row->size));

    knl_cursor->insert_info.lens = (uint16 *)(ptr + OG_MAX_ROW_SIZE);
    MEMS_RETURN_IFERR(memcpy_sp(knl_cursor->insert_info.lens, max_column_count * sizeof(uint16), knl_cursor->lens,
        ROW_COLUMN_COUNT(knl_cursor->row) * sizeof(uint16)));

    knl_cursor->insert_info.offsets = (uint16 *)(ptr + OG_MAX_ROW_SIZE + max_column_count * sizeof(uint16));
    MEMS_RETURN_IFERR(memcpy_sp(knl_cursor->insert_info.offsets, max_column_count * sizeof(uint16), knl_cursor->offsets,
        ROW_COLUMN_COUNT(knl_cursor->row) * sizeof(uint16)));

    return OG_SUCCESS;
}

static inline void sql_reset_cursor_insert_info(sql_stmt_t *stmt, knl_cursor_t *knl_cursor, sql_insert_t *insert_ctx)
{
    if (SECUREC_LIKELY(insert_ctx->update_ctx == NULL)) {
        return;
    }

    knl_cursor->insert_info.data = NULL;
    knl_cursor->insert_info.lens = NULL;
    knl_cursor->insert_info.offsets = NULL;

    return;
}

static status_t sql_send_insert_return_row(sql_stmt_t *stmt, galist_t *ret_columns)
{
    status_t status;

    stmt->default_info.default_on = OG_TRUE;
    status = sql_send_return_row(stmt, ret_columns, OG_FALSE);
    stmt->default_info.default_on = OG_FALSE;

    return status;
}

status_t sql_store_row_if_trigger_modify(insert_data_t *insert_data, knl_cursor_t *knl_cur, char *buf)
{
    if ((insert_data->row_modify) && (knl_cur->row->size != 0)) {
        errno_t errcode = memcpy_s(buf, g_instance->kernel.attr.max_row_size, knl_cur->row, knl_cur->row->size);
        if (errcode != EOK) {
            OG_THROW_ERROR(ERR_SYSTEM_CALL, errcode);
            return OG_ERROR;
        }
    }
    return OG_SUCCESS;
}

status_t sql_restore_row_if_trigger_modify(insert_data_t *insert_data, knl_cursor_t *knl_cur, const char *buf,
    sql_stmt_t *stmt, insert_assist_t *assist)
{
    if ((insert_data->row_modify) && (((const row_head_t *)buf)->size != 0)) {
        errno_t errcode = memcpy_s(knl_cur->row, OG_MAX_ROW_SIZE, buf, ((const row_head_t *)buf)->size);
        if (errcode != EOK) {
            OG_THROW_ERROR(ERR_SYSTEM_CALL, errcode);
            return OG_ERROR;
        }
    } else {
        sql_reset_insert_assist(assist);
        OG_RETURN_IFERR(sql_generate_insert_data(stmt, knl_cur, assist));
    }
    return OG_SUCCESS;
}

status_t sql_insert_inner(sql_stmt_t *stmt, sql_cursor_t *cursor, knl_cursor_t *knl_cur, insert_assist_t *assist,
    status_t *status)
{
    // may call sql_match_cond in knl_insrt(check, knl_verify_check_cons),
    // need used current cursor in sql_match_cond
    OG_RETURN_IFERR(SQL_CURSOR_PUSH(stmt, cursor) != OG_SUCCESS);
    *status = knl_insert(&stmt->session->knl_session, knl_cur);
    SQL_CURSOR_POP(stmt);

    if (*status == OG_SUCCESS) {
        OG_RETURN_IFERR(
            sql_execute_insert_triggers(stmt, assist->insert_ctx->table, TRIG_AFTER_EACH_ROW, knl_cur, NULL));
        OG_RETURN_IFERR(knl_verify_ref_integrities(&stmt->session->knl_session, knl_cur));

        if (assist->has_serial) {
            OG_RETURN_IFERR(
                knl_update_serial_value(&stmt->session->knl_session, knl_cur->dc_entity,
                                        assist->max_serial_val, OG_FALSE));
            OG_RETURN_IFERR(sql_send_generated_key_row(stmt, &assist->serial_val));
        }
    }

    return OG_SUCCESS;
}

static status_t sql_insert_single_row_core(sql_stmt_t *stmt, sql_cursor_t *cursor, knl_dictionary_t *dc,
    knl_cursor_t *knl_cur, insert_assist_t *assist)
{
    char *buf = NULL;
    status_t status = OG_ERROR;
    bool32 is_found = OG_FALSE;
    insert_data_t insert_data = {
        .cur_select = assist->cur_select,
        .row_modify = OG_FALSE
    };

    OG_RETURN_IFERR(sql_execute_insert_triggers(stmt, assist->insert_ctx->table, TRIG_BEFORE_EACH_ROW, knl_cur,
        (void *)&insert_data));

    OG_RETURN_IFERR(sql_push(stmt, g_instance->kernel.attr.max_row_size, (void **)&buf));

    /* execute insert */
    do {
        OG_BREAK_IF_ERROR(sql_insert_inner(stmt, cursor, knl_cur, assist, &status));

        // knl_insert return success
        if (status == OG_SUCCESS) {
            OGSQL_POP(stmt);
            return OG_SUCCESS;
        }

        // for on duplicate key update
        OG_BREAK_IF_TRUE(OG_ERRNO != ERR_DUPLICATE_KEY || assist->insert_ctx->update_ctx == NULL);

        // to release lob insert page when  insert failed result from primary key
        // or unique key  violation using sql "on duplicate key"
        OG_BREAK_IF_ERROR(knl_recycle_lob_insert_pages(&stmt->session->knl_session, knl_cur));

        // row has been modified by trigger, store it
        OG_BREAK_IF_ERROR(sql_store_row_if_trigger_modify(&insert_data, knl_cur, buf));

        // execute insert update
        cm_reset_error();

        OG_BREAK_IF_ERROR(sql_keep_cursor_insert_info(stmt, knl_cur, assist->insert_ctx));
        status = sql_execute_insert_update(stmt, cursor, assist->insert_ctx, dc, &is_found);
        sql_reset_cursor_insert_info(stmt, knl_cur, assist->insert_ctx);
        OG_BREAK_IF_ERROR(status);
        if (is_found) {
            OGSQL_POP(stmt);
            return OG_SUCCESS;
        }

        // row has been modified by trigger, restore it
        OG_BREAK_IF_ERROR(sql_restore_row_if_trigger_modify(&insert_data, knl_cur, buf, stmt, assist));

        SQL_CHECK_SESSION_VALID_FOR_RETURN(stmt);
    } while (OG_TRUE);

    OGSQL_POP(stmt);
    return OG_ERROR;
}

static status_t sql_insert_single_row(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_insert_t *insert_ctx,
    knl_dictionary_t *dc, knl_cursor_t *knl_cur, sql_cursor_t *cur_select)
{
    status_t status = OG_ERROR;
    insert_data_t insert_data = {
        .cur_select = cur_select,
        .row_modify = OG_FALSE
    };
    insert_assist_t assist;

    sql_init_insert_assist(&assist, &insert_data, insert_ctx, cur_select);

    /* generate insert values by dc order */
    OG_RETURN_IFERR(sql_generate_insert_data(stmt, knl_cur, &assist));

    if (insert_ctx->table->type == VIEW_AS_TABLE) {
        status = sql_insteadof_triggers(stmt, insert_ctx->table, knl_cur, &insert_data, TRIG_EVENT_INSERT);
    } else {
        status = sql_insert_single_row_core(stmt, cursor, dc, knl_cur, &assist);
    }
    OG_RETURN_IFERR(status);

    /* gen return values if has return columns */
    if (insert_ctx->ret_columns != NULL) {
        status = sql_send_insert_return_row(stmt, insert_ctx->ret_columns);
    }

    return status;
}

bool32 sql_batch_insert_enable(sql_stmt_t *stmt, sql_insert_t *insert_ctx)
{
    if (IS_COORDINATOR && IS_APP_CONN(stmt->session)) {
        return OG_FALSE;
    }

    if (stmt->allowed_batch_errs > 0) {
        return OG_FALSE;
    }

    if (insert_ctx->update_ctx != NULL || (insert_ctx->syntax_flag & INSERT_IS_IGNORE)) {
        return OG_FALSE;
    }

    if (stmt->return_generated_key) {
        return OG_FALSE;
    }

    return knl_batch_insert_enabled(stmt->session, &insert_ctx->table->entry->dc, stmt->session->triggers_disable);
}

static status_t sql_batch_insert_rows(sql_stmt_t *stmt, sql_cursor_t *cursor, knl_cursor_t *knl_cursor)
{
    if (knl_cursor->rowid_count == 0) {
        return OG_SUCCESS;
    }

    if (SQL_CURSOR_PUSH(stmt, cursor) != OG_SUCCESS) {
        return OG_ERROR;
    }

    uint32 row_count = knl_cursor->rowid_count;
    status_t status = knl_insert(&stmt->session->knl_session, knl_cursor);
    SQL_CURSOR_POP(stmt);
    if (status == OG_SUCCESS) {
        cursor->total_rows += row_count;
    } else {
        cursor->total_rows += knl_cursor->rowid_count;
        knl_cursor->rowid_count = 0;
    }

    SQL_CHECK_SESSION_VALID_FOR_RETURN(stmt);

    return status;
}

static status_t sql_try_batch_insert(sql_stmt_t *stmt, sql_cursor_t *cursor, knl_cursor_t *knl_cursor,
    knl_part_locate_t org_part_loc, row_head_t *curr_row)
{
    if (curr_row->size < KNL_MIN_ROW_SIZE) {
        curr_row->size = KNL_MIN_ROW_SIZE;
    }

    if (knl_cursor->rowid_count > 0) {
        if (knl_is_part_table(knl_cursor->dc_entity) && (knl_cursor->part_loc.part_no != org_part_loc.part_no ||
            knl_cursor->part_loc.subpart_no != org_part_loc.subpart_no)) {
            knl_part_locate_t curr_part_loc = knl_cursor->part_loc;
            knl_set_table_part(knl_cursor, org_part_loc);
            sql_set_partition_nologging_insert(stmt, knl_cursor->dc_entity, knl_cursor, org_part_loc);
            if (sql_batch_insert_rows(stmt, cursor, knl_cursor) != OG_SUCCESS) {
                return OG_ERROR;
            }

            knl_set_table_part(knl_cursor, curr_part_loc);
            sql_set_partition_nologging_insert(stmt, knl_cursor->dc_entity, knl_cursor, curr_part_loc);
        } else if ((uint32)(curr_row->size + knl_cursor->row_offset) > OG_MAX_ROW_SIZE ||
            knl_cursor->rowid_count == KNL_ROWID_ARRAY_SIZE) {
            if (sql_batch_insert_rows(stmt, cursor, knl_cursor) != OG_SUCCESS) {
                return OG_ERROR;
            }
        }
    }

    errno_t errcode = memcpy_s((char *)knl_cursor->row + knl_cursor->row_offset, OG_MAX_ROW_SIZE -
        knl_cursor->row_offset,
        curr_row, curr_row->size);
    if (errcode != EOK) {
        OG_THROW_ERROR(ERR_SYSTEM_CALL, errcode);
        return OG_ERROR;
    }

    knl_cursor->row_offset += curr_row->size;
    knl_cursor->rowid_count++;
    return OG_SUCCESS;
}

static status_t sql_prepare_batch_insert(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_insert_t *insert_ctx,
    knl_cursor_t *knl_cursor, sql_cursor_t *cur_select)
{
    row_head_t *org_row = knl_cursor->row;
    row_head_t *curr_row = NULL;
    knl_part_locate_t part_loc = knl_cursor->part_loc;
    insert_data_t insert_data = {
        .cur_select = cur_select,
        .row_modify = OG_FALSE
    };
    insert_assist_t assist;

    OGSQL_SAVE_STACK(stmt);
    if (sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&curr_row) != OG_SUCCESS) {
        return OG_ERROR;
    }
    knl_cursor->row = curr_row;

    sql_init_insert_assist(&assist, &insert_data, insert_ctx, cur_select);
    if (sql_generate_insert_data(stmt, knl_cursor, &assist) != OG_SUCCESS) {
        knl_cursor->row = org_row;
        OGSQL_RESTORE_STACK(stmt);
        return OG_ERROR;
    }
    knl_cursor->row = org_row;
    if (knl_cursor->vnc_column != NULL) {
        OG_THROW_ERROR(ERR_COLUMN_NOT_NULL, knl_cursor->vnc_column);
        OGSQL_RESTORE_STACK(stmt);
        return OG_ERROR;
    }
    if (assist.has_serial) {
        if (knl_update_serial_value(&stmt->session->knl_session, knl_cursor->dc_entity, assist.max_serial_val,
            OG_FALSE) !=
            OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            return OG_ERROR;
        }
    }

    if (sql_try_batch_insert(stmt, cursor, knl_cursor, part_loc, curr_row) != OG_SUCCESS) {
        OGSQL_RESTORE_STACK(stmt);
        return OG_ERROR;
    }

    OGSQL_RESTORE_STACK(stmt);

    SQL_CHECK_SESSION_VALID_FOR_RETURN(stmt);
    return OG_SUCCESS;
}

static void subcursor_init(sql_cursor_t *sub_cur, plan_node_t *plan, sql_insert_t *insert_ctx)
{
    sub_cur->plan = plan;
    sub_cur->select_ctx = insert_ctx->select_ctx;
    sub_cur->scn = OG_INVALID_ID64;
}

static status_t sql_execute_insert_all_pairs(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_insert_t *insert_ctx,
    knl_dictionary_t *dc, knl_cursor_t *knl_cur, bool32 is_batch)
{
    status_t status = OG_SUCCESS;

    for (uint32 i = 0; i < insert_ctx->pairs_count; i++) {
        stmt->pairs_pos = i;

        OGSQL_SAVE_STACK(stmt);
        status = is_batch ? sql_prepare_batch_insert(stmt, cursor, insert_ctx, knl_cur, NULL) :
                            sql_insert_single_row(stmt, cursor, insert_ctx, dc, knl_cur, NULL);
        OGSQL_RESTORE_STACK(stmt);

        if (status != OG_SUCCESS) {
            break;
        }

        if (!is_batch) {
            cursor->total_rows++;
        }
    }

    return status;
}

static status_t sql_execute_insert_all(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_insert_t *insert_ctx,
    knl_dictionary_t *dc, knl_cursor_t *knl_cur)
{
    sql_cursor_t *sub_cursor = NULL;
    bool32 eof = OG_FALSE;
    plan_node_t *plan = insert_ctx->select_ctx->plan;
    bool32 is_batch = sql_batch_insert_enable(stmt, insert_ctx);
    status_t status = OG_SUCCESS;

    if (sql_alloc_cursor(stmt, &sub_cursor) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (sql_execute_select_plan(stmt, sub_cursor, plan->select_p.next) != OG_SUCCESS) {
        sql_free_cursor(stmt, sub_cursor);
        return OG_ERROR;
    }

    if (SQL_CURSOR_PUSH(stmt, sub_cursor) != OG_SUCCESS) {
        sql_free_cursor(stmt, sub_cursor);
        return OG_ERROR;
    }

    for (;;) {
        OGSQL_SAVE_STACK(stmt);
        if (sql_fetch_cursor(stmt, sub_cursor, plan->select_p.next, &eof) != OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            status = OG_ERROR;
            break;
        }

        if (eof) {
            if (is_batch) {
                status = sql_batch_insert_rows(stmt, cursor, knl_cur);
            }
            OGSQL_RESTORE_STACK(stmt);
            break;
        }

        // subselect may apear in values(),like values(subselect,1,2)
        // so must push the insert cursor for later use
        if (SQL_CURSOR_PUSH(stmt, cursor) != OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            status = OG_ERROR;
            break;
        }
        status = sql_execute_insert_all_pairs(stmt, cursor, insert_ctx, dc, knl_cur, is_batch);
        SQL_CURSOR_POP(stmt);

        if (status != OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            break;
        }
        OGSQL_RESTORE_STACK(stmt);
    }

    SQL_CURSOR_POP(stmt);
    sql_free_cursor(stmt, sub_cursor);
    return status;
}

static status_t sql_execute_insert_select_plan(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_insert_t *insert_ctx,
    knl_dictionary_t *dc, knl_cursor_t *knl_cur)
{
    sql_cursor_t *sub_cursor = NULL;
    bool32 eof = OG_FALSE;
    plan_node_t *plan = insert_ctx->select_ctx->plan;
    status_t status = OG_SUCCESS;
    bool32 is_batch = sql_batch_insert_enable(stmt, insert_ctx);

    if (sql_alloc_cursor(stmt, &sub_cursor) != OG_SUCCESS) {
        return OG_ERROR;
    }
    subcursor_init(sub_cursor, plan, insert_ctx);

    if (sql_execute_select_plan(stmt, sub_cursor, sub_cursor->plan->select_p.next) != OG_SUCCESS) {
        sql_free_cursor(stmt, sub_cursor);
        return OG_ERROR;
    }

    OG_RETURN_IFERR(SQL_CURSOR_PUSH(stmt, sub_cursor));

    for (;;) {
        OGSQL_SAVE_STACK(stmt);
        if (sql_fetch_cursor(stmt, sub_cursor, sub_cursor->plan->select_p.next, &eof) != OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            status = OG_ERROR;
            break;
        }

        if (eof) {
            if (is_batch) {
                status = sql_batch_insert_rows(stmt, cursor, knl_cur);
            }
            OGSQL_RESTORE_STACK(stmt);
            break;
        }

        status = is_batch ? sql_prepare_batch_insert(stmt, cursor, insert_ctx, knl_cur, sub_cursor) :
                            sql_insert_single_row(stmt, cursor, insert_ctx, dc, knl_cur, sub_cursor);
        if (status != OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            status = sql_insert_try_ignore(insert_ctx);
            if (status == OG_SUCCESS) {
                continue;
            }
            break;
        }
        OGSQL_RESTORE_STACK(stmt);
        if (!is_batch) {
            cursor->total_rows++;
        }
    }

    SQL_CURSOR_POP(stmt);
    sql_free_cursor(stmt, sub_cursor);
    return status;
}

static status_t sql_handle_batch_error(sql_stmt_t *stmt, uint16 id)
{
    OG_RETURN_IFERR(sql_try_put_dml_batch_error(stmt, id, g_tls_error.code, g_tls_error.message));
    stmt->actual_batch_errs++;
    stmt->param_info.paramset_offset++;
    return OG_SUCCESS;
}

static status_t sql_execute_batch_insert(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_insert_t *insert_ctx,
    knl_cursor_t *knl_cur)
{
    status_t status = OG_SUCCESS;

    for (uint16 i = stmt->param_info.paramset_offset; i < stmt->param_info.paramset_size; i++) {
        // do read params from req packet
        status = sql_read_params(stmt);
        if (status != OG_SUCCESS) {
            // try allowed batch errors if execute error
            OG_LOG_DEBUG_ERR("failed to read param when issue dml, paramset index: %u", i);
            if (stmt->allowed_batch_errs > 0) {
                status = OG_SUCCESS;
                if (++stmt->actual_batch_errs <= stmt->allowed_batch_errs) {
                    OG_RETURN_IFERR(sql_handle_batch_error(stmt, i));
                    continue;
                }
            }
            (void)sql_batch_insert_rows(stmt, cursor, knl_cur);
            break;
        }

        // need clean value with the previous parameters
        sql_reset_first_exec_vars(stmt);
        sql_reset_sequence(stmt);

        for (uint32 j = 0; j < insert_ctx->pairs_count; j++) {
            stmt->pairs_pos = j;
            status = sql_prepare_batch_insert(stmt, cursor, insert_ctx, knl_cur, NULL);
            if (status != OG_SUCCESS) {
                break;
            }
        }

        if (status == OG_SUCCESS && i == stmt->param_info.paramset_size - 1) {
            status = sql_batch_insert_rows(stmt, cursor, knl_cur);
        }

        if (status != OG_SUCCESS) {
            OG_LOG_DEBUG_ERR("failed to execute dml when issue dml, paramset index: %u", i);
            // try allowed batch errors if execute error
            if (stmt->allowed_batch_errs > 0) {
                status = OG_SUCCESS;
                if (++stmt->actual_batch_errs <= stmt->allowed_batch_errs) {
                    OG_RETURN_IFERR(sql_handle_batch_error(stmt, i));
                    continue;
                }
            }
            break;
        }

        stmt->param_info.paramset_offset++;
    }

    if (status != OG_SUCCESS) {
        (void)sql_batch_insert_rows(stmt, cursor, knl_cur);
    }

    return status;
}

static status_t sql_execute_insert_pairs(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_insert_t *insert_ctx,
    knl_dictionary_t *dc, knl_cursor_t *knl_cur)
{
    bool32 is_batch = (insert_ctx->pairs_count > 1) && sql_batch_insert_enable(stmt, insert_ctx);
    status_t status = OG_SUCCESS;

    for (uint32 i = 0; i < insert_ctx->pairs_count; i++) {
        stmt->pairs_pos = i;

        OGSQL_SAVE_STACK(stmt);
        status = is_batch ? sql_prepare_batch_insert(stmt, cursor, insert_ctx, knl_cur, NULL) :
                            sql_insert_single_row(stmt, cursor, insert_ctx, dc, knl_cur, NULL);
        OGSQL_RESTORE_STACK(stmt);

        if (status != OG_SUCCESS) {
            status = sql_insert_try_ignore(insert_ctx);
            if (status == OG_SUCCESS) {
                continue;
            }
            break;
        }

        if (!is_batch) {
            cursor->total_rows++;
        }
    }
    if (status == OG_SUCCESS && is_batch) {
        status = sql_batch_insert_rows(stmt, cursor, knl_cur);
    }

    return status;
}

status_t sql_execute_insert_plan(sql_stmt_t *stmt, sql_cursor_t *cursor, sql_insert_t *insert_ctx)
{
    status_t status = OG_SUCCESS;
    knl_dictionary_t *dc = &cursor->tables[0].table->entry->dc;
    knl_cursor_t *knl_cursor = cursor->tables[0].knl_cur;
    bool32 table_nologging_enabled = knl_table_nologging_enabled(dc->handle);
    if (stmt->context->type == OGSQL_TYPE_INSERT && (stmt->session->nologging_enable || table_nologging_enabled)) {
        if (!DB_IS_SINGLE(&stmt->session->knl_session) ||
            (DB_IS_RCY_CHECK_PCN(&stmt->session->knl_session) && stmt->session->nologging_enable)) {
            OG_LOG_DEBUG_WAR("forbid to nologging load when database in HA mode or \
                when _RCY_CHECK_PCN is TRUE on session_level nologging insert");
            knl_cursor->logging = OG_TRUE;
            knl_cursor->nologging_type = LOGGING_LEVEL;
            stmt->session->knl_session.rm->logging = OG_TRUE;
            stmt->session->knl_session.rm->nolog_type = knl_cursor->nologging_type;
        } else {
            knl_cursor->logging = OG_FALSE;
            stmt->session->knl_session.rm->logging = OG_FALSE;
            knl_cursor->nologging_type = knl_table_nologging_enabled(dc->handle) ? TABLE_LEVEL : SESSION_LEVEL;
            stmt->session->knl_session.rm->nolog_type = knl_cursor->nologging_type;
        }
    } else {
        knl_cursor->logging = OG_TRUE;
        stmt->session->knl_session.rm->logging = OG_TRUE;
        knl_cursor->nologging_type = LOGGING_LEVEL;
        stmt->session->knl_session.rm->nolog_type = knl_cursor->nologging_type;
    }

    OG_RETURN_IFERR(knl_open_cursor(&stmt->session->knl_session, knl_cursor, dc));
    OG_RETURN_IFERR(sql_push(stmt, OG_MAX_ROW_SIZE, (void **)&knl_cursor->row));

    sql_prepare_scan(stmt, dc, knl_cursor);

    if (OG_BIT_TEST(insert_ctx->syntax_flag, INSERT_IS_ALL)) {
        status = sql_execute_insert_all(stmt, cursor, insert_ctx, dc, knl_cursor);
    } else if (insert_ctx->select_ctx != NULL) {
        status = sql_execute_insert_select_plan(stmt, cursor, insert_ctx, dc, knl_cursor);
    } else {
        if (stmt->is_batch_insert) {
            status = sql_execute_batch_insert(stmt, cursor, insert_ctx, knl_cursor);
        } else {
            status = sql_execute_insert_pairs(stmt, cursor, insert_ctx, dc, knl_cursor);
        }
    }

    OGSQL_POP(stmt);

    stmt->default_column = NULL;
    return status;
}

status_t sql_execute_insert_with_ctx(sql_stmt_t *stmt, sql_insert_t *insert_ctx)
{
    sql_cursor_t *cursor = OGSQL_ROOT_CURSOR(stmt);
    status_t status;

    cursor->scn = OG_INVALID_ID64;

    OG_RETURN_IFERR(sql_execute_insert_triggers(stmt, insert_ctx->table, TRIG_BEFORE_STATEMENT, NULL, NULL));

    // set statement ssn after the before statement triggers executed
    sql_set_scn(stmt);
    sql_set_ssn(stmt);

    OG_RETURN_IFERR(sql_open_insert_cursor(stmt, cursor, insert_ctx));
    status = sql_execute_insert_plan(stmt, cursor, insert_ctx);
    stmt->session->knl_session.rm->logging = OG_TRUE;
    OG_RETURN_IFERR(status);
    OG_RETURN_IFERR(sql_execute_insert_triggers(stmt, insert_ctx->table, TRIG_AFTER_STATEMENT, NULL, NULL));

    stmt->eof = OG_TRUE;
    cursor->eof = OG_TRUE;
    return OG_SUCCESS;
}

static status_t sql_execute_insert_core(sql_stmt_t *stmt)
{
    uint64 conflicts = 0;
    /*
     * reset index conflicts to 0, and check it after stmt
     * to see if unique constraints violated.
     */
    knl_init_index_conflicts(KNL_SESSION(stmt), &conflicts);
    OG_RETURN_IFERR(sql_execute_insert_with_ctx(stmt, (sql_insert_t *)stmt->context->entry));
    return knl_check_index_conflicts(KNL_SESSION(stmt), conflicts);
}

status_t sql_execute_insert(sql_stmt_t *stmt)
{
    status_t status = OG_ERROR;
    knl_savepoint_t savepoint;

    do {
        knl_savepoint(KNL_SESSION(stmt), &savepoint);
        status = sql_execute_insert_core(stmt);
        // execute dml failed when shrink table, need restart
        if (status == OG_ERROR && cm_get_error_code() == ERR_NEED_RESTART) {
            OG_LOG_RUN_INF("insert failed when shrink table, inset restart");
            cm_reset_error();
            knl_rollback(KNL_SESSION(stmt), &savepoint);
            sql_set_scn(stmt);
            continue;
        } else {
            break;
        }
    } while (OG_TRUE);

    return status;
}

status_t sql_calc_part_print(sql_stmt_t *stmt, char *buf, uint32 size)
{
    uint32 i;
    uint32 part_id = OG_INVALID_ID32;
    sql_insert_t *insert = (sql_insert_t *)stmt->context->entry;
    char *left_buffer = buf;
    char *flag_buffer = (char *)"...)";
    uint32 flag_len = (uint32)strlen(flag_buffer);
    uint32 left_size = size - (flag_len + 1);
    int32 offset;
    part_key_t *key = NULL;
    knl_dictionary_t *dc = &insert->table->entry->dc;
    insert_assist_t ass;

    if (insert->select_ctx != NULL) {
        PRTS_RETURN_IFERR(snprintf_s(buf, size, size - 1, "(Filter:N/A)"));
        return OG_SUCCESS;
    }

    if (size <= (flag_len + 1)) {
        return OG_SUCCESS;
    }

    offset = snprintf_s(left_buffer, left_size, left_size - 1, "(Filter:id=");
    if (SECUREC_UNLIKELY(offset == -1)) {
        OG_THROW_ERROR(ERR_SYSTEM_CALL, offset);
        return OG_ERROR;
    }
    if (offset < 0) {
        return OG_SUCCESS;
    } else {
        left_buffer = left_buffer + offset;
        left_size -= offset;
        if (left_size - 1 == 0) {
            return OG_SUCCESS;
        }
    }
    OGSQL_SAVE_STACK(stmt);
    OG_RETURN_IFERR(sql_push(stmt, OG_MAX_COLUMN_SIZE, (void **)&key));
    sql_init_insert_assist(&ass, NULL, insert, NULL);
    for (i = 0; i < insert->pairs_count; i++) {
        stmt->pairs_pos = i;
        if (sql_calc_part_for_insert(stmt, key, &part_id, &ass) != OG_SUCCESS) {
            OGSQL_RESTORE_STACK(stmt);
            return OG_ERROR;
        }
        if (part_id == OG_INVALID_ID32) {
            offset = snprintf_s(left_buffer, left_size, left_size - 1, ((i == 0) ? "N/A" : ",N/A"));
            if (SECUREC_UNLIKELY(offset == -1)) {
                OG_THROW_ERROR(ERR_SYSTEM_CALL, offset);
                return OG_ERROR;
            }
        } else {
            if (knl_verify_interval_part(dc->handle, part_id) &&
                knl_create_interval_part(&stmt->session->knl_session, dc, part_id, key) != OG_SUCCESS) {
                OGSQL_RESTORE_STACK(stmt);
                return OG_ERROR;
            }
            offset = snprintf_s(left_buffer, left_size, left_size - 1, ((i == 0) ? "%u" : ",%u"), part_id);
            if (SECUREC_UNLIKELY(offset == -1)) {
                OG_THROW_ERROR(ERR_SYSTEM_CALL, offset);
                return OG_ERROR;
            }
        }

        if (offset < 0) {
            break;
        } else {
            left_buffer = left_buffer + offset;
            left_size -= offset;
            if (left_size - 1 == 0) {
                break;
            }
        }
    }
    OGSQL_RESTORE_STACK(stmt);
    left_size += flag_len;
    offset = snprintf_s(left_buffer, left_size, left_size - 1,
        (insert->pairs_count > 1 && i < insert->pairs_count) ? flag_buffer : ")");
    if (offset < 0) {
        OG_THROW_ERROR(ERR_SYSTEM_CALL, offset);
        return OG_ERROR;
    }
    return OG_SUCCESS;
}

status_t sql_prepare_view_row_insteadof(sql_stmt_t *stmt, sql_table_cursor_t *tab_cursor, knl_cursor_t *knl_cursor)
{
    rs_column_t *rs_column = NULL;
    variant_t value;
    sql_cursor_t *cursor = tab_cursor->sql_cur;
    sql_table_t *table = tab_cursor->table;
    row_assist_t ra;
    knl_column_t *knl_col = NULL;
    status_t status;
    knl_dictionary_t *dc = &table->entry->dc;
    uint32 col_count = knl_get_column_count(dc->handle);

    row_init(&ra, (char *)knl_cursor->row, OG_MAX_ROW_SIZE, cursor->columns->count);
    OG_RETURN_IFERR(SQL_CURSOR_PUSH(stmt, cursor));
    for (uint32 i = 0; i < col_count; i++) {
        rs_column = (rs_column_t *)cm_galist_get(cursor->columns, i);
        if (rs_column->type == RS_COL_COLUMN) {
            status = sql_get_table_value(stmt, &rs_column->v_col, &value);
        } else {
            status = sql_exec_expr(stmt, rs_column->expr, &value);
        }

        if (status != OG_SUCCESS) {
            SQL_CURSOR_POP(stmt);
            return OG_ERROR;
        }

        knl_col = knl_get_column(dc->handle, i);
        if (sql_set_table_value(stmt, knl_cursor, &ra, knl_col, &value) != OG_SUCCESS) {
            SQL_CURSOR_POP(stmt);
            return OG_ERROR;
        }
    }
    knl_cursor->rowid = cursor->tables[0].knl_cur->rowid;
    cm_decode_row((char *)knl_cursor->row, knl_cursor->offsets, knl_cursor->lens, NULL);
    SQL_CURSOR_POP(stmt);
    return OG_SUCCESS;
}

static status_t sql_execute_insteadof_triggers_core(sql_stmt_t *stmt, trig_set_t *set, void *knl_cur, void *data,
    trig_dml_type_t dml_type)
{
    pl_dc_t pl_dc;
    trig_item_t *trig_item = NULL;
    trig_desc_t *trig_desc = NULL;

    OGSQL_SAVE_STACK(stmt);
    for (uint32 i = 0; i < set->trig_count; ++i) {
        trig_item = &set->items[i];
        if (!trig_item->trig_enable) {
            continue;
        }

        if ((uint32)trig_item->trig_type != TRIG_INSTEAD_OF || (trig_item->trig_event & dml_type) == 0) {
            continue;
        }

        OG_BREAK_IF_ERROR(pl_dc_open_trig_by_entry(stmt, &pl_dc, trig_item));

        trig_desc = &pl_dc.entity->trigger->desc;
        if (dml_type == TRIG_EVENT_UPDATE) {
            upd_object_t *obj = data;
            if (!sql_find_trigger_column(obj->pairs, &trig_desc->columns)) {
                pl_dc_close(&pl_dc);
                continue;
            }
        }

        if (ple_exec_trigger(stmt, (void *)pl_dc.entity, dml_type, knl_cur, data) != OG_SUCCESS) {
            ple_check_exec_trigger_error(stmt, pl_dc.entity);
            pl_dc_close(&pl_dc);
            OGSQL_RESTORE_STACK(stmt);
            return OG_ERROR;
        }

        pl_dc_close(&pl_dc);
    }
    OGSQL_RESTORE_STACK(stmt);
    return OG_SUCCESS;
}


status_t sql_insteadof_triggers(sql_stmt_t *stmt, sql_table_t *table, void *knl_cur, void *data,
    trig_dml_type_t dml_type)
{
    knl_dictionary_t *dc = &table->entry->dc;
    dc_entity_t *dc_entity = (dc_entity_t *)dc->handle;
    dc_entry_t *dc_entry = dc_entity->entry;
    bool8 __logging;
    status_t status;

    if (stmt->session->triggers_disable) {
        return OG_SUCCESS;
    }

    if (dc_entity->trig_set.trig_count == 0) {
        return OG_SUCCESS;
    }

    /* add TS lock, controls trigger concurrency */
    if (lock_table_shared(KNL_SESSION(stmt), dc_entity, LOCK_INF_WAIT) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (!dc_entry_visible(dc_entry, dc)) {
        OG_THROW_ERROR(ERR_INVALID_DC, T2S(&table->name.value));
        return OG_ERROR;
    }

    /* do not support nologging in triggers */
    __logging = stmt->session->knl_session.rm->logging;
    stmt->session->knl_session.rm->logging = OG_TRUE;

    status = sql_execute_insteadof_triggers_core(stmt, &dc_entity->trig_set, knl_cur, data, dml_type);

    stmt->session->knl_session.rm->logging = __logging;
    return status;
}