/*
 * Copyright (c) 2022 Huawei Technologies Co.,Ltd.
 *
 * DSS is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * dss_handshake_pool.c
 *
 *
 * IDENTIFICATION
 *    src/service/dss_handshake_pool.c
 *
 * -------------------------------------------------------------------------
 */

#include "dss_handshake_pool.h"
#include "dss_log.h"
#include "dss_service.h"
#include "cm_spinlock.h"
#include "cm_thread.h"

typedef struct st_dss_connect_queue {
    cs_pipe_t items[DSS_CONNECT_POOL_QUEUE_SIZE];
    uint32 head;
    uint32 tail;
    uint32 count;
    spinlock_t lock;
} dss_connect_queue_t;

typedef struct st_dss_connect_pool {
    dss_connect_queue_t queue;
    thread_t *workers;
    uint32 worker_num;
    bool32 started;
    bool32 stopping;
} dss_connect_pool_t;

typedef struct st_dss_handshake_pool {
    dss_connect_queue_t queue;
    thread_t workers[DSS_HANDSHAKE_POOL_WORKER_NUM];
    bool32 started;
    bool32 stopping;
    dss_handshake_worker_fn worker_fn;
} dss_handshake_pool_t;

static dss_connect_pool_t g_ack_pool;
static thread_t g_ack_workers[DSS_ACK_POOL_WORKER_NUM];
static dss_handshake_pool_t g_handshake_pool;

static status_t dss_handshake_pool_submit_inner(const cs_pipe_t *pipe);
static bool32 dss_connect_queue_dequeue(dss_connect_queue_t *queue, cs_pipe_t *pipe);

static void dss_connect_queue_reset(dss_connect_queue_t *queue)
{
    queue->head = 0;
    queue->tail = 0;
    queue->count = 0;
}

static void dss_connect_queue_drain_disconnect(dss_connect_queue_t *queue, const char *pool_name)
{
    cs_pipe_t pipe;
    uint32 drained = 0;

    while (dss_connect_queue_dequeue(queue, &pipe)) {
        LOG_RUN_INF("[DSS_CONNECT] %s stop: disconnect queued conn_sock=%d", pool_name, (int)pipe.link.uds.sock);
        cs_disconnect(&pipe);
        drained++;
    }
    if (drained > 0) {
        LOG_RUN_INF("[DSS_CONNECT] %s stop: drained %u queued connection(s)", pool_name, drained);
    }
}

static bool32 dss_connect_queue_dequeue(dss_connect_queue_t *queue, cs_pipe_t *pipe)
{
    cm_spin_lock(&queue->lock, NULL);
    if (queue->count == 0) {
        cm_spin_unlock(&queue->lock);
        return CM_FALSE;
    }

    *pipe = queue->items[queue->head];
    queue->head = (queue->head + 1) % DSS_CONNECT_POOL_QUEUE_SIZE;
    queue->count--;
    cm_spin_unlock(&queue->lock);
    return CM_TRUE;
}

static status_t dss_connect_queue_enqueue(dss_connect_queue_t *queue, const cs_pipe_t *pipe, const char *pool_name,
    uint32 backlog_warn_workers)
{
    cm_spin_lock(&queue->lock, NULL);
    if (queue->count >= DSS_CONNECT_POOL_QUEUE_SIZE) {
        cm_spin_unlock(&queue->lock);
        LOG_RUN_ERR("[DSS_CONNECT] %s queue full, conn_sock=%d", pool_name, (int)pipe->link.uds.sock);
        return CM_ERROR;
    }

    queue->items[queue->tail] = *pipe;
    queue->tail = (queue->tail + 1) % DSS_CONNECT_POOL_QUEUE_SIZE;
    queue->count++;
    uint32 queue_len = queue->count;
    cm_spin_unlock(&queue->lock);

    if (backlog_warn_workers > 0 && queue_len > backlog_warn_workers) {
        LOG_RUN_WAR("[DSS_CONNECT] ack pool backlog, conn_sock=%d, queue_len=%u, workers=%u",
            (int)pipe->link.uds.sock, queue_len, backlog_warn_workers);
    }
    return CM_SUCCESS;
}

static void dss_handshake_pool_worker(thread_t *thread)
{
    dss_handshake_pool_t *pool = &g_handshake_pool;
    cs_pipe_t pipe;

    (void)thread;
    cm_set_thread_name("hs-pool");
    LOG_RUN_INF("handshake pool worker started");

    while (!thread->closed) {
        if (!dss_connect_queue_dequeue(&pool->queue, &pipe)) {
            if (pool->stopping) {
                break;
            }
            cm_sleep(1);
            continue;
        }

        if (pool->worker_fn != NULL) {
            (void)pool->worker_fn(&pipe);
        }
    }

    LOG_RUN_INF("handshake pool worker closed");
}

static void dss_ack_pool_worker(thread_t *thread)
{
    cs_pipe_t pipe;
    cs_pipe_t local;

    (void)thread;
    cm_set_thread_name("ack-pool");
    LOG_RUN_INF("ack pool worker started");

    while (!thread->closed) {
        if (!dss_connect_queue_dequeue(&g_ack_pool.queue, &pipe)) {
            if (g_ack_pool.stopping) {
                break;
            }
            cm_sleep(1);
            continue;
        }

        local = pipe;
        local.socket_timeout = DSS_CONNECT_ACK_IO_TIMEOUT;
        if (dss_link_ready_ack(&local) != CM_SUCCESS) {
            LOG_RUN_ERR("[DSS_CONNECT] ack pool link_ready_ack failed, conn_sock=%d, err_code=%d, errno=%d",
                (int)pipe.link.uds.sock, cm_get_error_code(), cm_get_os_error());
            cs_disconnect(&local);
            continue;
        }

        if (dss_handshake_pool_submit_inner(&local) != CM_SUCCESS) {
            cs_disconnect(&local);
        }
    }

    LOG_RUN_INF("ack pool worker closed");
}

static status_t dss_handshake_pool_submit_inner(const cs_pipe_t *pipe)
{
    return dss_connect_queue_enqueue(&g_handshake_pool.queue, pipe, "handshake pool", 0);
}

static status_t dss_connect_pool_start_workers(dss_connect_pool_t *pool, thread_entry_t entry, uint32 worker_num,
    const char *pool_name)
{
    pool->workers = g_ack_workers;
    pool->worker_num = worker_num;
    for (uint32 i = 0; i < worker_num; i++) {
        if (cm_create_thread(entry, SIZE_K(512), pool, &pool->workers[i]) != CM_SUCCESS) {
            LOG_RUN_ERR("[DSS_CONNECT] failed to create %s worker %u, errno %d", pool_name, i, cm_get_os_error());
            pool->stopping = CM_TRUE;
            for (uint32 j = 0; j < i; j++) {
                cm_close_thread(&pool->workers[j]);
            }
            return CM_ERROR;
        }
    }
    return CM_SUCCESS;
}

static void dss_connect_pool_stop_workers(dss_connect_pool_t *pool)
{
    pool->stopping = CM_TRUE;
    dss_connect_queue_drain_disconnect(&pool->queue, "ack pool");
    for (uint32 i = 0; i < pool->worker_num; i++) {
        cm_close_thread(&pool->workers[i]);
    }
    pool->started = CM_FALSE;
    pool->stopping = CM_FALSE;
    dss_connect_queue_reset(&pool->queue);
}

static void dss_handshake_pool_stop_workers(void)
{
    dss_handshake_pool_t *pool = &g_handshake_pool;

    pool->stopping = CM_TRUE;
    dss_connect_queue_drain_disconnect(&pool->queue, "handshake pool");
    for (uint32 i = 0; i < DSS_HANDSHAKE_POOL_WORKER_NUM; i++) {
        cm_close_thread(&pool->workers[i]);
    }
    pool->started = CM_FALSE;
    pool->stopping = CM_FALSE;
    pool->worker_fn = NULL;
    dss_connect_queue_reset(&pool->queue);
}

status_t dss_handshake_pool_start(dss_handshake_worker_fn worker)
{
    dss_handshake_pool_t *pool = &g_handshake_pool;
    dss_connect_pool_t *ack_pool = &g_ack_pool;
    errno_t err;

    if (pool->started) {
        return CM_SUCCESS;
    }

    err = memset_s(pool, sizeof(dss_handshake_pool_t), 0, sizeof(dss_handshake_pool_t));
    if (err != EOK) {
        CM_THROW_ERROR(ERR_SYSTEM_CALL, err);
        return CM_ERROR;
    }
    err = memset_s(ack_pool, sizeof(dss_connect_pool_t), 0, sizeof(dss_connect_pool_t));
    if (err != EOK) {
        CM_THROW_ERROR(ERR_SYSTEM_CALL, err);
        return CM_ERROR;
    }

    pool->worker_fn = worker;
    pool->started = CM_TRUE;
    for (uint32 i = 0; i < DSS_HANDSHAKE_POOL_WORKER_NUM; i++) {
        if (cm_create_thread(dss_handshake_pool_worker, SIZE_K(512), pool, &pool->workers[i]) != CM_SUCCESS) {
            LOG_RUN_ERR("[DSS_CONNECT] failed to create handshake pool worker %u, errno %d", i, cm_get_os_error());
            pool->stopping = CM_TRUE;
            for (uint32 j = 0; j < i; j++) {
                cm_close_thread(&pool->workers[j]);
            }
            pool->started = CM_FALSE;
            pool->worker_fn = NULL;
            return CM_ERROR;
        }
    }

    if (dss_connect_pool_start_workers(ack_pool, dss_ack_pool_worker, DSS_ACK_POOL_WORKER_NUM, "ack pool") !=
        CM_SUCCESS) {
        dss_handshake_pool_stop_workers();
        return CM_ERROR;
    }
    ack_pool->started = CM_TRUE;

    LOG_RUN_INF("[DSS_CONNECT] connect pools started, ack_workers=%u, hs_workers=%u, queue_size=%u",
        DSS_ACK_POOL_WORKER_NUM, DSS_HANDSHAKE_POOL_WORKER_NUM, DSS_CONNECT_POOL_QUEUE_SIZE);
    return CM_SUCCESS;
}

void dss_handshake_pool_stop(void)
{
    if (g_ack_pool.started) {
        dss_connect_pool_stop_workers(&g_ack_pool);
        LOG_RUN_INF("[DSS_CONNECT] ack pool stopped");
    }
    if (g_handshake_pool.started) {
        dss_handshake_pool_stop_workers();
        LOG_RUN_INF("[DSS_CONNECT] handshake pool stopped");
    }
}

status_t dss_handshake_pool_submit(const cs_pipe_t *pipe)
{
    if (!g_ack_pool.started || pipe == NULL) {
        return CM_ERROR;
    }
    return dss_connect_queue_enqueue(&g_ack_pool.queue, pipe, "ack pool", DSS_ACK_POOL_WORKER_NUM);
}