/*
 * Copyright (c) 2020 Huawei Technologies Co.,Ltd.
 *
 * openGauss is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * lightProxy.cpp
 *
 * IDENTIFICATION
 *	  src/gausskernel/runtime/executor/lightProxy.cpp
 *
 * -------------------------------------------------------------------------
 */

#include "access/transam.h"
#include "access/xact.h"
#include "utils/dynahash.h"
#include "utils/hotkey.h"
#include "catalog/pgxc_node.h"
#include "commands/prepare.h"
#include "executor/executor.h"
#include "executor/lightProxy.h"
#include "mb/pg_wchar.h"
#include "optimizer/pgxcship.h"
#include "pgxc/poolmgr.h"
#include "pgxc/pgxc.h"
#include "utils/snapmgr.h"
#include "pgstat.h"
#include "pgaudit.h"
#include "auditfuncs.h"
#include "pgxc/route.h"
#include "libpq/pqformat.h"
#include "gs_policy/policy_common.h"
#include "instruments/instr_unique_sql.h"
#include "instruments/instr_slow_query.h"
#include "tcop/tcopprot.h"
#include "optimizer/streamplan.h"
#include "gs_ledger/blockchain.h"
#include "parser/parse_hint.h"
#include "replication/walreceiver.h"

const int MAX_COMMAND = 51;
typedef struct commandType {
    CmdType type;
    const char* commandTag;
} CommandType;

static CommandType g_command_type_array[MAX_COMMAND] = {{CMD_DML, "SELECT"},
    {CMD_DML, "UPDATE"},
    {CMD_DML, "INSERT"},
    {CMD_DML, "DELETE"},
    {CMD_DML, "MERGE"},
    {CMD_TCL, "BEGIN"},
    {CMD_TCL, "COMMIT"},
    {CMD_TCL, "ROLLBACK"},
    {CMD_TCL, "START"},
    {CMD_TCL, "SAVEPOINT"},
    {CMD_TCL, "RELEASE"},
    {CMD_TCL, "PREPARE TRANSACTION"},
    {CMD_DDL, "PREPARE"},
    {CMD_DML, "CHECKPOINT"},
    {CMD_DML, "TRUNCATE"},
    {CMD_DML, "EXPLAIN"},
    {CMD_DML, "SHOW"},
    {CMD_DML, "LOCK"},
    {CMD_DML, "COPY"},
    {CMD_DML, "CLUSTER"},
    {CMD_DML, "ANONYMOUS"},
    {CMD_DML, "VACUUM"},
    {CMD_DML, "DELTA"},
    {CMD_DML, "ANALYZE"},
    {CMD_DML, "EXECUTE"},
    {CMD_DDL, "MOVE"},
    {CMD_DDL, "FETCH"},
    {CMD_DCL, "CREATE ROLE"},
    {CMD_DCL, "CREATE USER"},
    {CMD_DDL, "CREATE"},
    {CMD_DCL, "DROP ROLE"},
    {CMD_DCL, "DROP USER"},
    {CMD_DDL, "DROP"},
    {CMD_DCL, "ALTER ROLE"},
    {CMD_DCL, "ALTER USER"},
    {CMD_DCL, "ALTER DEFAULT PRIVILEGES"},
    {CMD_DDL, "ALTER"},
    {CMD_DDL, "CLOSE"},
    {CMD_DDL, "DEALLOCATE"},
    {CMD_DDL, "DECLARE"},
    {CMD_DDL, "REINDEX"},
    {CMD_DDL, "COMMENT"},
    {CMD_DDL, "BARRIER"},
    {CMD_DCL, "GRANT"},
    {CMD_DCL, "REVOKE"},
    {CMD_DCL, "NOTIFY"},
    {CMD_DCL, "LISTEN"},
    {CMD_DCL, "LOAD"},
    {CMD_DCL, "DISCARD"},
    {CMD_DCL, "REASSIGN"},
    {CMD_DCL, "CONSTRAINTS"}};

extern void pgxc_node_init(PGXCNodeHandle* handle, int sock);
extern void pgxc_handle_unsupported_stmts(Query* query);
extern Oid exprType(const Node* expr);

extern int light_node_send_begin(PGXCNodeHandle* handle, bool check_gtm_mode);
extern int light_handle_response(PGXCNodeHandle* conn, lightProxyMsgCtl* msgctl, lightProxy* lp);
extern void light_node_report_error(lightProxyErrData* combiner);
extern bool light_node_receive(PGXCNodeHandle* handle);
extern bool light_node_receive_from_logic_conn(PGXCNodeHandle* handle);
extern void light_pgaudit_ExecutorEnd(Query* query);

void report_qps_type(CmdType commandType);
CmdType set_cmd_type(const char* commandTag);

static void report_iud_time_for_lightproxy(const Query* query)
{
    if (u_sess->attr.attr_sql.enable_save_datachanged_timestamp == false)
        return;

    if (!IS_PGXC_COORDINATOR && !IS_SINGLE_NODE)
        return;

    if (query->commandType != CMD_INSERT && query->commandType != CMD_DELETE && query->commandType != CMD_UPDATE &&
        query->commandType != CMD_MERGE) {
        return;
    }

    if (query->rtable == NULL)
        return;

    if (linitial_int(query->resultRelations) <= list_length(query->rtable)) {
        RangeTblEntry* rte = (RangeTblEntry*)list_nth(query->rtable, linitial_int(query->resultRelations) - 1);
        if (RTE_RELATION != rte->rtekind)
            return;

        MemoryContext current_ctx = CurrentMemoryContext;
        Relation rel = NULL;

        PG_TRY();
        {
            rel = heap_open(rte->relid, AccessShareLock);
            if (rel->rd_rel->relkind == RELKIND_RELATION) {
                if (rel->rd_rel->relpersistence == RELPERSISTENCE_PERMANENT ||
                    rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED) {
                    pgstat_report_data_changed(rte->relid, STATFLG_RELATION, rel->rd_rel->relisshared);
                }
            }

            heap_close(rel, AccessShareLock);
        }
        PG_CATCH();
        {
            (void)MemoryContextSwitchTo(current_ctx);

            ErrorData* edata = CopyErrorData();

            ereport(DEBUG1, (errmsg("Failed to send data changed time, cause: %s", edata->message)));

            FlushErrorState();

            FreeErrorData(edata);

            if (rel != NULL)
                heap_close(rel, AccessShareLock);
        }
        PG_END_TRY();
    }
}

static void report_unsupport_light(LightUnSupportType type)
{
    if (type == CTRL_DISABLE) {
        return;
    }

    char* unsupport_msg[MAX_UNSUPPORT_TYPE] = {"guc ctrl disable",
        "not support client encoding different from database encoding",
        "not support cursor",
        "not support execute direct on",
        "not support others cmd type except I/D/U/S",
        "not support table entry relkind is foreign",
        "not support query has a statement trigger",
        "not support user-defined type",
        "not support query with node_name hint"};

    ereport(DEBUG2, (errmodule(MOD_LIGHTPROXY),
                    errmsg("[LIGHT PROXY]  check failed with type: %s.", unsupport_msg[type])));

    return;
}

static bool isSupportLightQuery(Query* query)
{
    ListCell* item = NULL;

    if (!u_sess->attr.attr_sql.enable_fast_query_shipping ||
        (!u_sess->attr.attr_sql.enable_light_proxy && !GTM_LITE_MODE)) {
        report_unsupport_light(CTRL_DISABLE);

        return false;
    }

    if (pg_get_client_encoding() != GetDatabaseEncoding()) {
        report_unsupport_light(ENCODE_UNSUPPORT);

        return false;
    }

    /* not support cursor */
    if (query->utilityStmt && IsA(query->utilityStmt, DeclareCursorStmt)) {
        report_unsupport_light(CURSOR_UNSUPPORT);

        return false;
    }

    /* not support execute direct on */
    if (query->utilityStmt && IsA(query->utilityStmt, RemoteQuery)) {
        report_unsupport_light(REMOTE_UNSUPPORT);

        return false;
    }

    /* not support others */
    if (query->commandType != CMD_SELECT && query->commandType != CMD_UPDATE && query->commandType != CMD_INSERT &&
        query->commandType != CMD_DELETE && !(HAS_ROUTER && query->commandType == CMD_MERGE)) {
        report_unsupport_light(CMD_UNSUPPORT);

        return false;
    }

    /* do not support node_name hint due to agg function's different behavior */
    if (CheckNodeNameHint(query->hintState)) {
        report_unsupport_light(NODE_NAME_UNSUPPORT);
        return false;
    }

    foreach (item, query->rtable) {
        RangeTblEntry* rte = (RangeTblEntry*)lfirst(item);
#ifdef ENABLE_MOT
        if (rte->relkind == RELKIND_STREAM || (rte->relkind == RELKIND_FOREIGN_TABLE && !isMOTFromTblOid(rte->relid))) {
#else
        if (rte->relkind == RELKIND_STREAM || rte->relkind == RELKIND_FOREIGN_TABLE) {
#endif
            report_unsupport_light(FOREIGN_UNSUPPORT);

            return false;
        }

        /*
         * Essentially, lightProxy is a fast path for FQSed when length
         * of exec_nodes is 1, which is not supported when query has a
         * statement trigger.
         */
        if (pgxc_find_statement_trigger(rte->relid, query->commandType)) {
            report_unsupport_light(STATEMENT_UNSUPPORT);

            return false;
        }
    }

    /* check the target list for T message */
    foreach (item, query->targetList) {
        TargetEntry* tle = (TargetEntry*)lfirst(item);
        if (tle->resjunk)
            continue;

        /* not support user-defined type */
        if (exprType((Node*)tle->expr) >= FirstBootstrapObjectId) {
            report_unsupport_light(USERTYPE_UNSUPPORT);

            return false;
        }
    }

    return true;
}

lightProxy::lightProxy(Query *query)
    : m_cplan(NULL),
      m_nodeIdx(-1),
      m_context(NULL),
      m_stmtName(NULL),
      m_formats(NULL),
      m_entry(NULL),
      m_query(query),
      m_handle(NULL)
{
    m_cmdType = query ? query->commandType : CMD_UNKNOWN;
    queryType = CMD_DML;
    m_portalName = NULL;
    m_msgctl = (lightProxyMsgCtl*)palloc0(sizeof(lightProxyMsgCtl));
    m_msgctl->errData = (lightProxyErrData*)palloc0(sizeof(lightProxyErrData));
    m_msgctl->relhash = 0;
    m_msgctl->has_relhash = false;
    m_isRowTriggerShippable = query->isRowTriggerShippable;
    initStringInfo(&m_bindMessage);
    initStringInfo(&m_describeMessage);
#ifdef LPROXY_DEBUG
    m_msgctl->stmt_name = NULL;
    m_msgctl->query_string = query->sql_statement;
#endif
}

lightProxy::lightProxy(MemoryContext context, CachedPlanSource *psrc, const char *portalname, const char *stmtname)
    : m_cplan(psrc),
      m_nodeIdx(-1),
      m_context(context),
      m_stmtName(NULL),
      m_portalName(NULL),
      m_formats(NULL),
      m_entry(NULL),
      m_cmdType(CMD_UNKNOWN),
      m_query(NULL),
      m_handle(NULL)
{
    MemoryContext old_context = MemoryContextSwitchTo(context);

    if (psrc) {
        m_cmdType = set_cmd_type(psrc->commandTag);
        queryType = set_command_type_by_commandTag(psrc->commandTag);
    }
    initStringInfo(&m_bindMessage);
    initStringInfo(&m_describeMessage);

    m_msgctl = (lightProxyMsgCtl *)palloc0(sizeof(lightProxyMsgCtl));
    m_msgctl->errData = (lightProxyErrData *)palloc0(sizeof(lightProxyErrData));
    m_msgctl->relhash = 0;
    m_msgctl->has_relhash = false;

    if (psrc != NULL && list_length(psrc->query_list) == 1 && linitial(psrc->query_list) != NULL) {
        m_isRowTriggerShippable = ((Query*)linitial(psrc->query_list))->isRowTriggerShippable;
    } else {
        m_isRowTriggerShippable = false;
    }

    if (portalname != NULL && portalname[0] != '\0') {
        storeLightProxy(portalname);
    } else {
        m_portalName = NULL;
    }
    m_stmtName = (stmtname != NULL && stmtname[0] != '\0') ? pstrdup(stmtname) : NULL;

#ifdef LPROXY_DEBUG
    m_msgctl->stmt_name = pstrdup(m_stmtName);
    m_msgctl->query_string = pstrdup(m_cplan->query_string);
#endif
    MemoryContextSwitchTo(old_context);
}


lightProxy::~lightProxy()
{
    m_cplan = NULL;
    m_context = NULL;
    m_query = NULL;
    m_handle = NULL;
    m_msgctl = NULL;
    m_entry = NULL;
    pfree_ext(m_formats);
}

void lightProxy::getResultFormat(StringInfo message)
{
    pfree_ext(m_formats);
    if (m_cplan->resultDesc == NULL)
        return;
    /* Get the result format codes */
    int numRFormats = pq_getmsgint(message, 2);
    int i = 0;
    int natts = m_cplan->resultDesc->natts;
    int16* formats = NULL;
    m_formats = (int16*)palloc(natts * sizeof(int16));

    /* Get the result format codes */
    if (numRFormats > 0) {
        formats = (int16*)palloc(numRFormats * sizeof(int16));
        for (i = 0; i < numRFormats; i++)
            formats[i] = pq_getmsgint(message, 2);
    }
    pq_getmsgend(message);

    if (numRFormats > 1) {
        if (numRFormats != natts) {
            pfree_ext(formats);
            ereport(ERROR,
                (errcode(ERRCODE_PROTOCOL_VIOLATION),
                    errmsg("bind message has %d result formats but query has %d columns", numRFormats, natts)));
        }
        int rc = memcpy_s(m_formats, natts * sizeof(int16), formats, natts * sizeof(int16));
        securec_check(rc, "\0", "\0");
    } else if (numRFormats > 0) {
        for (i = 0; i < natts; i++)
            m_formats[i] = formats[0];
    } else {
        for (i = 0; i < natts; i++)
            m_formats[i] = 0;
    }
    pfree_ext(formats);
}

void lightProxy::saveMsg(int msgType, StringInfo message)
{
    // to do , save memory consumption
    AutoContextSwitch contexts(m_context);

    if (msgType == BIND_MESSAGE) {
        /* clean previous message if exists */
        if (m_bindMessage.len > 0)
            resetStringInfo(&m_bindMessage);
        if (m_describeMessage.len > 0)
            resetStringInfo(&m_describeMessage);

        getResultFormat(message);
        appendBinaryStringInfo(&m_bindMessage, message->data, message->len);
    } else if (msgType == DESC_MESSAGE) {
        /* clean previous message if exists */
        if (m_describeMessage.len > 0)
            resetStringInfo(&m_describeMessage);
        appendBinaryStringInfo(&m_describeMessage, message->data, message->len);
    }
}

void lightProxy::connect()
{
    List* dn_allocate = NULL;
    errno_t ss_rc = 0;
    int dnNum = u_sess->pgxc_cxt.NumDataNodes;
    if (IS_CN_DISASTER_RECOVER_MODE) {
        dnNum = u_sess->pgxc_cxt.NumTotalDataNodes;
        if (!u_sess->pgxc_cxt.DisasterReadArrayInit) {
            disaster_read_array_init();
        }
        Assert(m_nodeIdx < u_sess->pgxc_cxt.NumDataNodes);
        if (u_sess->pgxc_cxt.disasterReadArray[m_nodeIdx] != -1) {
            m_nodeIdx = u_sess->pgxc_cxt.disasterReadArray[m_nodeIdx];
        }
    }

    m_handle = &u_sess->pgxc_cxt.dn_handles[m_nodeIdx];
    if (!IS_VALID_CONNECTION(m_handle)) {
        Assert(m_nodeIdx < dnNum);
        if (m_nodeIdx >= dnNum) {
            ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
                errmsg("[LIGHT PROXY] m_nodeIdx error, m_nodeIdx:%d, numDataNodes:%d",
                    m_nodeIdx, dnNum)));
        }

        dn_allocate = lappend_int(dn_allocate, m_nodeIdx);
        PoolConnDef* pfds = PoolManagerGetConnections(dn_allocate, NULL);
        if (pfds == NULL)
            ereport(ERROR,
                (errcode(ERRCODE_CONNECTION_FAILURE),
                    errmsg("[LIGHT PROXY] Failed to get pooled connections from %s[%u]",
                        m_handle->remoteNodeName,
                        m_handle->nodeoid)));

        int fdsock = pfds->fds[0];
        PoolConnInfo* conn_info = &pfds->connInfos[0];

        pgxc_node_init(m_handle, fdsock);
        ss_rc = memcpy_s(&m_handle->connInfo, sizeof(PoolConnInfo), conn_info, sizeof(PoolConnInfo));

        securec_check(ss_rc, "\0", "\0");
#ifdef ENABLE_MULTIPLE_NODES
        pgxc_node_send_global_session_id((PGXCNodeHandle*)m_handle);
#endif
        u_sess->pgxc_cxt.dn_handles[m_nodeIdx] = *m_handle;
        u_sess->pgxc_cxt.datanode_count++;

        if (pfds->gsock[0].type != GSOCK_INVALID) {
            m_handle->gsock = pfds->gsock[0];
            m_handle->is_logic_conn = true;
        }
        pgxc_node_free_def(pfds);
        pfds = NULL;
    } else if (m_handle->state == DN_CONNECTION_STATE_QUERY) {
        BufferConnection(m_handle);
    }
}

void lightProxy::sendParseIfNecessary()
{
    /* if no stmt_name, we need to send parse every time */
    if (m_stmtName == NULL || m_stmtName[0] == '\0') {
        if (pgxc_node_send_parse(
                m_handle, m_stmtName, m_cplan->query_string, m_cplan->num_params, m_cplan->param_types)) {
            ereport(ERROR,
                (errcode(ERRCODE_CONNECTION_EXCEPTION),
                    errmsg("[LIGHT PROXY] Failed to send parse to %s[%u]",
                        m_handle->remoteNodeName,
                        m_handle->nodeoid)));
        }
        return;
    }

    if (unlikely(m_entry == NULL)) {
        /*
         * If we have reloaded pooler, we need to add it into datanode_queries again,
         * as we do in parse phrase previously.
         */
        m_entry = light_set_datanode_queries(m_stmtName);
        Assert(m_entry != NULL);
    }
    Assert(m_nodeIdx != -1);
    bool need_send_again = false;
    /* see if statement already active on the node */
    for (int i = 0; i < m_entry->current_nodes_number; i++) {
        if (m_entry->dns_node_indices[i] == m_nodeIdx) {
            if (ENABLE_CN_GPC || IN_GPC_GRAYRELEASE_CHANGE) {
                need_send_again = true;
            } else {
                return;
            }
        }
    }

    if (pgxc_node_send_parse(
            m_handle, m_stmtName, m_cplan->query_string, m_cplan->num_params, m_cplan->param_types)) {
        ereport(ERROR,
            (errcode(ERRCODE_CONNECTION_EXCEPTION),
                errmsg("[LIGHT PROXY] Failed to send parse to %s[%u]",
                    m_handle->remoteNodeName,
                    m_handle->nodeoid)));
    }
    if (need_send_again) {
        return;
    }
    /* After cluster expansion, must expand entry->dns_node_indices array too */
    if (unlikely(m_entry->current_nodes_number == m_entry->max_nodes_number)) {
        int* new_dns_node_indices = (int*)MemoryContextAllocZero(
            u_sess->pcache_cxt.datanode_queries->hcxt, m_entry->max_nodes_number * 2 * sizeof(int));
        errno_t error_no = EOK;
        error_no = memcpy_s(new_dns_node_indices,
            m_entry->max_nodes_number * 2 * sizeof(int),
            m_entry->dns_node_indices,
            m_entry->max_nodes_number * sizeof(int));
        securec_check(error_no, "\0", "\0");
        pfree_ext(m_entry->dns_node_indices);
        m_entry->dns_node_indices = new_dns_node_indices;
        m_entry->max_nodes_number = m_entry->max_nodes_number * 2;
        elog(LOG,
            "expand node ids array for active datanode statements "
            "after cluster expansion, now array size is %d",
            m_entry->max_nodes_number);
    }

    /* statement is not active on the specified node append item to the list */
    m_entry->dns_node_indices[m_entry->current_nodes_number++] = m_nodeIdx;
}

/*
 * @Description: Send BEGIN command to the DataNode. Also send the GXID for the transaction.
 *    See pgxc_node_begin for more details.
 */
void lightProxy::proxyNodeBegin(bool is_read_only)
{
    bool need_tran_block = false;
    GlobalTransactionId gxid = InvalidTransactionId;
    if (HAS_ROUTER) {
        // push down function to router dn, need transaction
        is_read_only = false;
    }
    if (IsTransactionBlock()) {
        need_tran_block = true;
    } else if (is_read_only) {
        need_tran_block = false;
    } else {
        need_tran_block = true;
    }

    if (is_read_only) {
        gxid = GetCurrentTransactionIdIfAny();
    }

    /*
     * If the node is already a participant in the transaction, skip it
     */
    if (list_member(u_sess->pgxc_cxt.XactReadNodes, m_handle) ||
        list_member(u_sess->pgxc_cxt.XactWriteNodes, m_handle)) {
        if (!is_read_only) {
            RegisterTransactionNodes(1, (void**)&m_handle, true);
        }
    } else {
        SetCurrentStmtTimestamp();

        TimestampTz gtmstart_timestamp = GetCurrentGTMStartTimestamp();
        TimestampTz stmtsys_timestamp = GetCurrentStmtsysTimestamp();
        if (GlobalTimestampIsValid(gtmstart_timestamp) &&
            GlobalTimestampIsValid(stmtsys_timestamp) &&
            pgxc_node_send_timestamp(m_handle, gtmstart_timestamp, stmtsys_timestamp)) {
            ereport(ERROR,
                (errcode(ERRCODE_CONNECTION_EXCEPTION),
                    errmsg("[LIGHT PROXY] Failed to send timestamp to %s[%u]",
                        m_handle->remoteNodeName,
                        m_handle->nodeoid)));
        }

        if (need_tran_block) {
            if (light_node_send_begin(m_handle, g_instance.attr.attr_storage.enable_gtm_free)) {
                ereport(ERROR,
                    (errcode(ERRCODE_CONNECTION_EXCEPTION),
                        errmsg("[LIGHT PROXY] Failed to send internal begin to %s[%u]",
                            m_handle->remoteNodeName,
                            m_handle->nodeoid)));
            }
            LPROXY_DEBUG(ereport(DEBUG2,(errmodule(MOD_LIGHTPROXY),
                errmsg("[LIGHT PROXY] Send internal begin to DataNode %u: query %s",
                m_handle->nodeoid,
                m_msgctl->query_string))));

            /* recieve message */
            m_msgctl->cnMsg = true;
            handleResponse();
            RegisterTransactionNodes(1, (void**)&m_handle, !is_read_only);
        }
    }

    CommandId cid = GetCurrentCommandId(!is_read_only);
    if (pgxc_node_send_cmd_id(m_handle, cid) < 0) {
        ereport(ERROR,
            (errcode(ERRCODE_CONNECTION_EXCEPTION),
                errmsg("[LIGHT PROXY] Failed to send cid to %s[%u]",
                    m_handle->remoteNodeName,
                    m_handle->nodeoid)));
    }
    /* print the XactWriteNodes and XactReadNodes list info */
    PrintRegisteredTransactionNodes();

    Snapshot snapshot = GetActiveSnapshot();
    if (!GTM_FREE_MODE && snapshot != NULL &&
        pgxc_node_send_snapshot(m_handle, snapshot)) {
        ereport(ERROR,
            (errcode(ERRCODE_CONNECTION_EXCEPTION),
                errmsg("[LIGHT PROXY] Failed to send snapshot to %s[%u]",
                    m_handle->remoteNodeName,
                    m_handle->nodeoid)));
    }

    if (u_sess->attr.attr_resource.use_workload_manager && *u_sess->wlm_cxt->control_group &&
        pgxc_node_send_wlm_cgroup(m_handle)) {
        ereport(ERROR,
            (errcode(ERRCODE_CONNECTION_EXCEPTION),
                errmsg("[LIGHT PROXY] Failed to send cgroup to %s[%u]",
                    m_handle->remoteNodeName,
                    m_handle->nodeoid)));
    }

    /* Only generate one time when debug_query_id = 0 in CN */
    if (unlikely(u_sess->debug_query_id == 0)) {
        u_sess->debug_query_id = generate_unique_id64(&gt_queryId);
        pgstat_report_queryid(u_sess->debug_query_id);
    }
    if (pgxc_node_send_queryid(m_handle, u_sess->debug_query_id)) {
        ereport(ERROR,
            (errcode(ERRCODE_CONNECTION_EXCEPTION),
                errmsg("[LIGHT PROXY] Failed to send query id to %s[%u]",
                    m_handle->remoteNodeName,
                    m_handle->nodeoid)));
    }

    /* Instrumentation/Unique SQL: send unique sql id to DN node */
    if (is_unique_sql_enabled() && pgxc_node_send_unique_sql_id(m_handle)) {
        ereport(ERROR,
            (errcode(ERRCODE_CONNECTION_EXCEPTION),
                errmsg("[LIGHT PROXY] Failed to send unique sql id to %s[%u]",
                    m_handle->remoteNodeName,
                    m_handle->nodeoid)));
    }
}

void lightProxy::setCurrentProxy(lightProxy* proxy)
{
    /* set process_count = NULL for common PBE */
    if (proxy != NULL && proxy->m_msgctl != NULL && proxy->m_msgctl->process_count != NULL)
        proxy->m_msgctl->process_count = NULL;

    u_sess->exec_cxt.cur_light_proxy_obj = proxy;
}

ExecNodes* lightProxy::checkRouterQuery(Query* query)
{
    ExecNodes* exec_nodes = NULL;
    if (!isSupportLightQuery(query) || !HAS_ROUTER) {
        return NULL;
    }
    exec_nodes = makeNode(ExecNodes);
    exec_nodes->nodeList = lappend_int(exec_nodes->nodeList, u_sess->exec_cxt.CurrentRouter->GetRouterNodeId());
    return exec_nodes;
}

/*
 * Constraints specially defined for Light CN are checked here
 */
ExecNodes* lightProxy::checkLightQuery(Query* query)
{
    ExecNodes* exec_nodes = NULL;

    /* for UPSERT, use the insert part to check Light CN */
    if (query->upsertQuery != NULL) {
        /* only allow UPSERT transformed MERGE statement have an upsertQuery */
        if (unlikely(query->commandType != CMD_MERGE)) {
            ereport(ERROR,
                (errmodule(MOD_OPT),
                    errcode(ERRCODE_OPTIMIZER_INCONSISTENT_STATE),
                    (errmsg("INSERT ON DUPLICATE KEY UPDATE must have an transformed InsertStmt query."))));
        }
        query = query->upsertQuery;
    }

    if (!isSupportLightQuery(query)) {
        return NULL;
    }

    /* handle the un-supported statements, obvious errors etc. */
    pgxc_handle_unsupported_stmts(query);

    /* Do permissions checks */
    Assert(IS_PGXC_COORDINATOR && !IsConnFromCoord());
    (void)ExecCheckRTPerms(query->rtable, true);

    exec_nodes = pgxc_is_query_shippable(query, 0, true);

    return exec_nodes;
}

void lightProxy::tearDown(lightProxy *proxy)
{
    MemoryContext context = proxy->m_context;

    proxy->removeLpByStmtName(proxy->m_stmtName);

    removeLightProxy(proxy->m_portalName);

    MemoryContextDelete(context);

    lightProxy::setCurrentProxy(NULL);
}

void lightProxy::initStmtHtab()
{
    HASHCTL hash_ctl;
    errno_t rc = 0;
    int htab_size = 64;
    
    rc = memset_s(&hash_ctl, sizeof(hash_ctl), 0, sizeof(hash_ctl));
    securec_check(rc, "\0", "\0");

    hash_ctl.keysize = NAMEDATALEN;
    hash_ctl.entrysize = sizeof(stmtLpObj);
    hash_ctl.hcxt = u_sess->cache_mem_cxt;
    u_sess->pcache_cxt.stmt_lightproxy_htab = hash_create("lightProxy Named Object for GPC",
                                                          htab_size,
                                                          &hash_ctl,
                                                          HASH_ELEM | HASH_CONTEXT);
}


void lightProxy::initlightProxyTable()
{
    HASHCTL hash_ctl;
    errno_t rc = 0;
	rc = memset_s(&hash_ctl, sizeof(hash_ctl), 0, sizeof(hash_ctl));
	securec_check(rc, "\0", "\0");

	hash_ctl.keysize = NAMEDATALEN;
	hash_ctl.entrysize = sizeof(lightProxyNamedObj);
	hash_ctl.hcxt = u_sess->cache_mem_cxt;
	u_sess->pcache_cxt.lightproxy_objs = hash_create("lightProxy Named Object", 64, &hash_ctl, HASH_ELEM | HASH_CONTEXT);
}

void lightProxy::removeLpByStmtName(const char *stmtname)
{
    if (u_sess->pcache_cxt.stmt_lightproxy_htab && stmtname != NULL && stmtname[0] != '\0') {
        (void)hash_search(u_sess->pcache_cxt.stmt_lightproxy_htab, stmtname, HASH_REMOVE, NULL);
    }
}

void lightProxy::removeLightProxy(const char* portalname)
{
	if(u_sess->pcache_cxt.lightproxy_objs && portalname != NULL) {
		 hash_search(u_sess->pcache_cxt.lightproxy_objs, portalname, HASH_REMOVE, NULL);
	}
}

void lightProxy::storeLpByStmtName(const char *stmtname)
{
    stmtLpObj *entry = NULL;
    if (!u_sess->pcache_cxt.stmt_lightproxy_htab)
        initStmtHtab();

    entry = (stmtLpObj *)hash_search(u_sess->pcache_cxt.stmt_lightproxy_htab, stmtname, HASH_ENTER, NULL);
    entry->proxy = this;
}

void lightProxy::storeLightProxy(const char* portalname)
{
    lightProxyNamedObj* entry = NULL;
    if(!u_sess->pcache_cxt.lightproxy_objs)
        initlightProxyTable();

    entry = (lightProxyNamedObj*)hash_search(u_sess->pcache_cxt.lightproxy_objs, portalname, HASH_ENTER, NULL);
    entry->proxy = this;
    pfree_ext(m_portalName);
    MemoryContext old_context = MemoryContextSwitchTo(m_context);
    m_portalName = pstrdup(portalname);
    (void)MemoryContextSwitchTo(old_context);
}

lightProxy *lightProxy::locateLpByStmtName(const char *stmtname)
{
    stmtLpObj *entry = NULL;
    if (u_sess->pcache_cxt.stmt_lightproxy_htab && stmtname && stmtname[0] != '\0') {
        entry = (stmtLpObj *)hash_search(u_sess->pcache_cxt.stmt_lightproxy_htab, stmtname, HASH_FIND, NULL);
    }

    if (entry) {
        return entry->proxy;
    } else {
        return NULL;
    }
}

lightProxy* lightProxy::locateLightProxy(const char* portalname)
{
	lightProxyNamedObj* entry = NULL;
	if(u_sess->pcache_cxt.lightproxy_objs) {
		entry = (lightProxyNamedObj*)hash_search(u_sess->pcache_cxt.lightproxy_objs, portalname, HASH_FIND, NULL);
	}

	if(entry) {
		return entry->proxy;
	} else {
		return NULL;
	}
}

bool lightProxy::processMsg(int msgType, StringInfo msg)
{
    lightProxy* lp = u_sess->exec_cxt.cur_light_proxy_obj;

    if (msgType == EXEC_MESSAGE) {
        lp = lightProxy::tryLocateLightProxy(msg);
    }
    bool res = false;
    bool old_status = u_sess->exec_cxt.need_track_resource;
    if (lp != NULL) {
        switch (msgType) {
            case BIND_MESSAGE:
            case DESC_MESSAGE:
                lp->saveMsg(msgType, msg);
                break;

            case EXEC_MESSAGE:
                if (u_sess->attr.attr_resource.resource_track_cost == 0 &&
                    u_sess->attr.attr_resource.enable_resource_track &&
                    u_sess->attr.attr_resource.resource_track_level != RESOURCE_TRACK_NONE) {
                    u_sess->exec_cxt.need_track_resource = true;
                    WLMSetCollectInfoStatus(WLM_STATUS_RUNNING);
                }
                lp->runMsg(msg);
                u_sess->exec_cxt.need_track_resource = old_status;
                break;

            default:
                ereport(ERROR,
                    (errcode(ERRCODE_CASE_NOT_FOUND), errmsg("invalid msgType %d for process message \n", msgType)));
        }
        res = true;;
    }

    /*
    * Emit duration logging if appropriate.
    */
    char msec_str[PRINTF_DST_MAX];
    switch (check_log_duration(msec_str, false)) {
        case 1:
            Assert(false);
            break;
        case 2: {
            ereport(LOG, (errmsg("duration: %s ms queryid %ld unique id %ld", msec_str,
                u_sess->debug_query_id, u_sess->slow_query_cxt.slow_query.unique_sql_id), errhidestmt(true)));
            break;
        }
        default:
            break;
    }
    return res;
}

void lightProxy::assemableMsg(char msgtype, StringInfo msgBuf, bool trigger_ship)
{
    int msg_len = 4 + msgBuf->len;
    errno_t ss_rc;

    /* If trigger is being shipped to DN. */
    if (trigger_ship) {
        ensure_out_buffer_capacity(msg_len + 2, m_handle);
        m_handle->outBuffer[m_handle->outEnd++] = 'a';
    } else {
        ensure_out_buffer_capacity(msg_len + 1, m_handle);
    }
    m_handle->outBuffer[m_handle->outEnd++] = msgtype;

    msg_len = htonl(msg_len);

    ss_rc = memcpy_s(m_handle->outBuffer + m_handle->outEnd, m_handle->outSize - m_handle->outEnd,
                    &msg_len, sizeof(uint32));
    securec_check(ss_rc, "\0", "\0");
    m_handle->outEnd += 4;
    ss_rc = memcpy_s(m_handle->outBuffer + m_handle->outEnd, m_handle->outSize - m_handle->outEnd,
                    msgBuf->data, (size_t)msgBuf->len);
    securec_check(ss_rc, "\0", "\0");
    m_handle->outEnd += msgBuf->len;
}

void lightProxy::handleResponse()
{
    int res = 0;

    /*
     * Reset lightProxyErrData. This only happens for PBE.
     * Memory of lightProxyErrData itself is in m_context, no need to free here.
     * Memory of char* inside is in t_thrd.mem_cxt.msg_mem_cxt, no need to free too.
     */
    if (m_msgctl->errData->hasError) {
        errno_t rc = 0;
        rc = memset_s(m_msgctl->errData, sizeof(lightProxyErrData), 0, sizeof(lightProxyErrData));
        securec_check_c(rc, "\0", "\0");
    }

    m_handle->state = DN_CONNECTION_STATE_QUERY;
    // process all messages.
    while (true) {
        if (m_handle->is_logic_conn)
            res = light_node_receive_from_logic_conn(m_handle);
        else
            res = light_node_receive(m_handle);

        if (res) {
            ereport(ERROR,
                (errcode(ERRCODE_CONNECTION_FAILURE),
                    errmsg("[LIGHT PROXY] Failed to fetch from Datanode %s[%u]",
                        m_handle->remoteNodeName,
                        m_handle->nodeoid)));
        }

        res = light_handle_response(m_handle, m_msgctl, this);

        if (res == LPROXY_FINISH) {
            break;
        } else if (res == LPROXY_ERROR) {
            ereport(ERROR,
                (errcode(ERRCODE_CONNECTION_EXCEPTION),
                    errmsg("[LIGHT PROXY] Unexpected response from %s[%u]",
                        m_handle->remoteNodeName,
                        m_handle->nodeoid)));
        }
    }

    /* report error if any */
    if (m_msgctl->errData->hasError) {
        setCurrentProxy(NULL);
        light_node_report_error(m_msgctl->errData);
    }
}

lightProxy* lightProxy::tryLocateLightProxy(StringInfo msg)
{
	lightProxy* lp = u_sess->exec_cxt.cur_light_proxy_obj;

	int oldCursor = msg->cursor;
	const char* portal_name = pq_getmsgstring(msg);
	if(portal_name[0] != '\0') {
	   lp = lightProxy::locateLightProxy(portal_name);
	   lightProxy::setCurrentProxy(lp);
	}
	msg->cursor = oldCursor;

	return lp;
}

void lightProxy::runSimpleQuery(StringInfo exec_message)
{
    connect();

    LPROXY_DEBUG(ereport(DEBUG2, (errmodule(MOD_LIGHTPROXY), errmsg(
        "[LIGHT PROXY] Got exec_simple_query slim to Datanode %u: query %s",
        m_handle->nodeoid,
        m_query->sql_statement))));

    bool is_read_only = (m_query->commandType == CMD_SELECT && !m_query->hasForUpdate);

    proxyNodeBegin(is_read_only);

    bool trigger_ship = false;
    if (u_sess->attr.attr_sql.enable_trigger_shipping && m_isRowTriggerShippable)
        trigger_ship = true;
    assemableMsg('Q', exec_message, trigger_ship);

    if (pgxc_node_flush(m_handle)) {
        ereport(ERROR,
            (errcode(ERRCODE_CONNECTION_EXCEPTION),
                errmsg("[LIGHT PROXY] Unexpected response from %s[%u]",
                    m_handle->remoteNodeName,
                    m_handle->nodeoid)));
    }

    m_msgctl->sendDMsg = true;
    m_msgctl->cnMsg = false;
    m_msgctl->hasResult = (m_query->commandType == CMD_SELECT || m_query->returningList != NIL);

    handleResponse();

    /* pgaudit */
    bool is_full_audit_user = audit_check_full_audit_user();
    if ((u_sess->attr.attr_security.Audit_DML_SELECT != 0 || u_sess->attr.attr_security.Audit_DML != 0 ||
        is_full_audit_user) && u_sess->attr.attr_security.Audit_enabled && IsPostmasterEnvironment) {
        light_pgaudit_ExecutorEnd(m_query);
    }
    /* unified auditing policy */
    if (!(g_instance.status > NoShutdown) && light_unified_audit_executor_hook) {
        light_unified_audit_executor_hook(m_query);
    }
    /* global chain record */
    if (IS_PGXC_COORDINATOR && m_msgctl->has_relhash) {
        light_ledger_ExecutorEnd(m_query, m_msgctl->relhash);
    }
    /* doing sql count accordiong to cmdType */
    if (u_sess->attr.attr_common.pgstat_track_activities && u_sess->attr.attr_common.pgstat_track_sql_count &&
        !u_sess->attr.attr_sql.enable_cluster_resize) {
        report_qps_type(m_cmdType);
        report_qps_type(queryType);
    }

    /* update unique sql stat */
    if (is_unique_sql_enabled() && is_local_unique_sql()) {
        instr_unique_sql_report_elapse_time(GetCurrentStatementLocalStartTimestamp());
    }
    pgstate_update_percentile_responsetime();
    // no more proxy
    setCurrentProxy(NULL);

    report_iud_time_for_lightproxy(m_query);
}

int lightProxy::runBatchMsg(StringInfo batch_message, bool sendDMsg, int batch_count)
{
    int process_count = 0;

    SetUniqueSQLIdFromCachedPlanSource(this->m_cplan);

    /* Must set snapshot before starting executor. */
    PushActiveSnapshot(GetTransactionSnapshot(GTM_LITE_MODE));

    connect();

    LPROXY_DEBUG(ereport(DEBUG2,(errmodule(MOD_LIGHTPROXY),
        errmsg("[LIGHT PROXY] Got Batch slim to DataNode %u: name %s, query %s",
        m_handle->nodeoid,
        m_stmtName,
        m_cplan->query_string))));

    proxyNodeBegin(m_cplan->is_read_only);

    /* check if we need to send parse or not */
    sendParseIfNecessary();

    assemableMsg('U', batch_message);

    // send sync message  and flush
    if (pgxc_node_send_sync(m_handle)) {
        ereport(ERROR,
            (errcode(ERRCODE_CONNECTION_EXCEPTION),
                errmsg("[LIGHT PROXY] Failed to send sync to %s[%u]",
                    m_handle->remoteNodeName,
                    m_handle->nodeoid)));
    }

    m_msgctl->sendDMsg = sendDMsg;
    m_msgctl->process_count = &process_count;
    m_msgctl->cnMsg = false;
    m_msgctl->hasResult = (m_cplan->resultDesc != NULL) ? true : false;
    handleResponse();

    PopActiveSnapshot();

    /*
     * We need a CommandCounterIncrement after every query, except
     * those that start or end a transaction block.
     */
    CommandCounterIncrement();

    /* pgaudit */
    bool is_full_audit_user = audit_check_full_audit_user();
    if ((u_sess->attr.attr_security.Audit_DML_SELECT != 0 || u_sess->attr.attr_security.Audit_DML != 0 ||
        is_full_audit_user) && u_sess->attr.attr_security.Audit_enabled && IsPostmasterEnvironment) {
        for (int i = 0; i < batch_count; i++)
            light_pgaudit_ExecutorEnd((Query*)linitial(m_cplan->query_list));
    }
    /* unified auditing policy */
    if (!(g_instance.status > NoShutdown) && light_unified_audit_executor_hook) {
        light_unified_audit_executor_hook((Query*)linitial(m_cplan->query_list));
    }
    /* global chain record */
    if (IS_PGXC_COORDINATOR && m_msgctl->has_relhash) {
        light_ledger_ExecutorEnd((Query*)linitial(m_cplan->query_list), m_msgctl->relhash);
    }

    /*
     * track_sql_count is on, counting WaitEventSQL for per user
     */
    if (u_sess->attr.attr_common.pgstat_track_activities && u_sess->attr.attr_common.pgstat_track_sql_count &&
        !u_sess->attr.attr_sql.enable_cluster_resize) {
        for (int i = 0; i < batch_count; i++) {
            report_qps_type(m_cmdType);
            report_qps_type(queryType);
        }
    }

    // finish with this proxy
    setCurrentProxy(NULL);
    return process_count;
}

void lightProxy::runMsg(StringInfo exec_message)
{
    /*
     * If we are in aborted transaction state, the only portals we can
     * actually run are those containing COMMIT or ROLLBACK commands.
     */
    if (IsAbortedTransactionBlockState())
        ereport(ERROR,
            (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
                errmsg("current transaction is aborted, "
                    "commands ignored until end of transaction block, firstChar[%c]",
                    u_sess->proc_cxt.firstChar), 0));

    /* Must set snapshot before starting executor. */
    PushActiveSnapshot(GetTransactionSnapshot(GTM_LITE_MODE));

    connect();

    LPROXY_DEBUG(ereport(DEBUG2,(errmodule(MOD_LIGHTPROXY),
        errmsg("[LIGHT PROXY] Got Execute slim to DataNode %u: name %s, query %s",
        m_handle->nodeoid,
        m_stmtName,
        m_cplan->query_string))));

    bool trigger_ship = false;
    if (u_sess->attr.attr_sql.enable_trigger_shipping && m_isRowTriggerShippable)
        trigger_ship = true;

    /*
     * Ensure we are in a transaction command (this should normally be the
     * case already due to prior BIND).
     */
    start_xact_command();

    /* Set after start transaction in case there is no CurrentResourceOwner */
    SetUniqueSQLIdFromCachedPlanSource(this->m_cplan);

    proxyNodeBegin(m_cplan->is_read_only);
    /* check if we need to send parse or not */
    sendParseIfNecessary();

    if (m_bindMessage.len > 0) {
        assemableMsg('B', &m_bindMessage);
        resetStringInfo(&m_bindMessage);
    }

    if (m_describeMessage.len > 0) {
        assemableMsg('D', &m_describeMessage);
        resetStringInfo(&m_describeMessage);
        m_msgctl->sendDMsg = true;
    } else {
        m_msgctl->sendDMsg = false;
    }

    assemableMsg('E', exec_message, trigger_ship);

    /* send sync message and flush */
    if (pgxc_node_send_sync(m_handle)) {
        ereport(ERROR,
            (errcode(ERRCODE_CONNECTION_EXCEPTION),
                errmsg("[LIGHT PROXY] Failed to send sync to %s[%u]",
                    m_handle->remoteNodeName, m_handle->nodeoid)));
    }

    m_msgctl->cnMsg = false;
    m_msgctl->hasResult = (m_cplan->resultDesc != NULL) ? true : false;
    handleResponse();

    PopActiveSnapshot();

    /*
     * We need a CommandCounterIncrement after every query, except
     * those that start or end a transaction block.
     */
    CommandCounterIncrement();
    t_thrd.wlm_cxt.parctl_state.except = 0;

    if (ENABLE_WORKLOAD_CONTROL) {
        if (g_instance.wlm_cxt->dynamic_workload_inited) {
            dywlm_client_max_release(&t_thrd.wlm_cxt.parctl_state);
        } else {
            WLMParctlRelease(&t_thrd.wlm_cxt.parctl_state);
        }
    }

    /* pgaudit */
    bool is_full_audit_user = audit_check_full_audit_user();
    if ((u_sess->attr.attr_security.Audit_DML_SELECT != 0 || u_sess->attr.attr_security.Audit_DML != 0 ||
        is_full_audit_user) && u_sess->attr.attr_security.Audit_enabled && IsPostmasterEnvironment) {
        light_pgaudit_ExecutorEnd((Query*)linitial(m_cplan->query_list));
    }
    /* unified auditing policy */
    if (!(g_instance.status > NoShutdown) && light_unified_audit_executor_hook) {
        light_unified_audit_executor_hook((Query*)linitial(m_cplan->query_list));
    }
    /* global chain record */
    if (IS_PGXC_COORDINATOR && m_msgctl->has_relhash) {
        light_ledger_ExecutorEnd((Query*)linitial(m_cplan->query_list), m_msgctl->relhash);
    }

    /*
     * track_sql_count is on, counting WaitEventSQL for per user
     */
    if (u_sess->attr.attr_common.pgstat_track_activities && u_sess->attr.attr_common.pgstat_track_sql_count &&
        !u_sess->attr.attr_sql.enable_cluster_resize) {
        report_qps_type(m_cmdType);
        report_qps_type(queryType);
    }

    /* update unique sql stat */
    if (is_unique_sql_enabled() && is_local_unique_sql()) {
        instr_unique_sql_report_elapse_time(GetCurrentStatementLocalStartTimestamp());
    }
    pgstate_update_percentile_responsetime();
    setCurrentProxy(NULL);
}

bool lightProxy::isDeleteLimit(const Query* query)
{
    if (query == NULL || query->commandType != CMD_DELETE)
        return false;
    if (query->limitCount != NULL)
        return true;
    return false;
}

/*
 * @Description:  according to commandType get corresponsile  WaitEventSQL,
 *    and call function 'pgstat_report_wait_count' to increase sql count
 */
void report_qps_type(CmdType commandType)
{
    switch (commandType) {
        case CMD_SELECT:
            pgstat_report_wait_count(WAIT_EVENT_SQL_SELECT);
            break;
        case CMD_UPDATE:
            pgstat_report_wait_count(WAIT_EVENT_SQL_UPDATE);
            break;
        case CMD_INSERT:
            pgstat_report_wait_count(WAIT_EVENT_SQL_INSERT);
            break;
        case CMD_DELETE:
            pgstat_report_wait_count(WAIT_EVENT_SQL_DELETE);
            break;
        case CMD_MERGE:
            pgstat_report_wait_count(WAIT_EVENT_SQL_MERGEINTO);
            break;
        case CMD_DML:
            pgstat_report_wait_count(WAIT_EVENT_SQL_DML);
            break;
        case CMD_DDL:
            pgstat_report_wait_count(WAIT_EVENT_SQL_DDL);
            break;
        case CMD_DCL:
            pgstat_report_wait_count(WAIT_EVENT_SQL_DCL);
            break;
        case CMD_TCL:
            pgstat_report_wait_count(WAIT_EVENT_SQL_TCL);
            break;
        default:
            /* do not map any commandType */
            break;
    }
}

/*
 * @Description:  according to sql query get corresponsile cmdType
 * @in  - const char *commandTag
 * @out - static CmdType
 */
CmdType set_cmd_type(const char* commandTag)
{
    CmdType cmd_type = CMD_UNKNOWN;
    if (strcmp(commandTag, "SELECT") == 0)
        cmd_type = CMD_SELECT;
    else if (strcmp(commandTag, "UPDATE") == 0)
        cmd_type = CMD_UPDATE;
    else if (strcmp(commandTag, "INSERT") == 0)
        cmd_type = CMD_INSERT;
    else if (strcmp(commandTag, "DELETE") == 0)
        cmd_type = CMD_DELETE;
    else if (strcmp(commandTag, "MERGE") == 0)
        cmd_type = CMD_MERGE;
    return cmd_type;
}

CmdType set_command_type_by_commandTag(const char* commandTag)
{
    CmdType cmd_type = CMD_UNKNOWN;
    int i;
    for (i = 0; i < MAX_COMMAND; i++) {
        if (strstr(commandTag, g_command_type_array[i].commandTag))
            return g_command_type_array[i].type;
    }
    return cmd_type;
}

bool IsLightProxyOn(void)
{
    return (u_sess->exec_cxt.cur_light_proxy_obj != NULL);
}

#ifdef ENABLE_MULTIPLE_NODES
bool exec_query_through_light_proxy(List* querytree_list, Node* parsetree, bool snapshot_set, StringInfo msg, 
                                    MemoryContext OptimizerContext)
{
    OgRecordAutoController _local_opt(SRT5_LIGHT_QUERY);
    if ((list_length(querytree_list) == 1) && !IsA(parsetree, CreateTableAsStmt) &&
        !IsA(parsetree, RefreshMatViewStmt)) {
        ExecNodes* single_exec_node = NULL;
        lightProxy* proxy = NULL;
        Query* query = (Query*)linitial(querytree_list);
        
        if (ENABLE_ROUTER(query->commandType)) {
            single_exec_node = lightProxy::checkRouterQuery(query);
        } else {
            single_exec_node = lightProxy::checkLightQuery(query);
        }
        /* only deal with single node */
        if (single_exec_node && list_length(single_exec_node->nodeList) +
            list_length(single_exec_node->primarynodelist) == 1) {
            /* GTMLite: need to mark that this is single shard statement */
            u_sess->exec_cxt.single_shard_stmt = true;
            if (CmdtypeSupportsHotkey(query->commandType))
                SendHotkeyToPgstat();

            proxy = New(OptimizerContext) lightProxy(query);
            proxy->m_nodeIdx = linitial_int(single_exec_node->nodeList);
            bool old_status = u_sess->exec_cxt.need_track_resource;
            if (u_sess->attr.attr_resource.resource_track_cost == 0 &&
                u_sess->attr.attr_resource.enable_resource_track &&
                u_sess->attr.attr_resource.resource_track_level != RESOURCE_TRACK_NONE) {
                u_sess->exec_cxt.need_track_resource = true;
                WLMSetCollectInfoStatus(WLM_STATUS_RUNNING);
            }
            proxy->runSimpleQuery(msg);

            /* Done with the snapshot used for parsing/planning */
            if (snapshot_set) {
                PopActiveSnapshot();
            }

            FreeExecNodes(&single_exec_node);
            u_sess->exec_cxt.need_track_resource = old_status;
            t_thrd.wlm_cxt.parctl_state.except = 0;
            return true;
        }
        FreeExecNodes(&single_exec_node);
        CleanHotkeyCandidates(true);
        return false;
    }
    return false;
}
#endif
void GPCDropLPIfNecessary(const char *stmt_name, bool need_drop_dnstmt, bool need_del, CachedPlanSource *reset_plan) {
    if (stmt_name == NULL || stmt_name[0] == '\0' || !IS_PGXC_COORDINATOR)
        return;
    lightProxy *lp = lightProxy::locateLpByStmtName(stmt_name);
    if (lp != NULL) {
        if (reset_plan) {
            lp->m_cplan = reset_plan;
        }
        if (stmt_name && need_drop_dnstmt) {
            lp->m_entry = NULL;
            DropDatanodeStatement(stmt_name);
        }
        if (need_del) {
            lightProxy::tearDown(lp);
        }
        return;
    }
    return;
}

void GPCFillMsgForLp(CachedPlanSource* psrc)
{
    Assert(psrc != NULL);
    if (psrc->gpc.status.InSavePlanList(GPC_SHARED)) {
        pfree_ext(psrc->gpc.key);
        MemoryContext oldcxt = MemoryContextSwitchTo(psrc->context);
        psrc->gpc.key = (GPCKey*)palloc0(sizeof(GPCKey));
        psrc->gpc.key->query_string = psrc->query_string;
        psrc->gpc.key->query_length = (uint32)strlen(psrc->query_string);
        psrc->gpc.key->spi_signature = psrc->spi_signature;
        GlobalPlanCache::EnvFill(&psrc->gpc.key->env, psrc->dependsOnRole);
        psrc->gpc.key->env.search_path = psrc->search_path;
        psrc->gpc.key->env.num_params = psrc->num_params;
        psrc->gpc.key->env.param_types = psrc->param_types;
        (void)MemoryContextSwitchTo(oldcxt);
    }
}