/* -------------------------------------------------------------------------
 *  This file is part of the oGRAC project.
 * Copyright (c) 2024 Huawei Technologies Co.,Ltd.
 *
 * oGRAC is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * cm_disk.c
 *
 *
 * IDENTIFICATION
 * src/common/cm_disk.c
 *
 * -------------------------------------------------------------------------
 */
#include "cm_common_module.h"
#include "cm_disk.h"
#ifndef WIN32
#include <pthread.h>
#endif
#include "cm_date.h"
#include "cm_error.h"
#include "cm_dbstor.h"

#define CM_FILE_BLOCK_SIZE 8192
#define CM_DISK_PART_COUNT 16
#define CM_FILE_PART_BLOCK_SIZE 512
#define CM_OPERATE_FILE_INTERVAL 50
#define CM_DBS_LOCK_OCCUPIED 80

#ifdef WIN32
status_t cm_open_disk(const char *name, disk_handle_t *handle)
{
    *handle = CreateFile(name, GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE, 0, OPEN_EXISTING, 0,
                         NULL);

    if (*handle == INVALID_HANDLE_VALUE) {
        DWORD code = GetLastError();
        OG_LOG_RUN_ERR("CreateFile failed");
        OG_THROW_ERROR(ERR_CM_OPEN_DISK, "open disk %s failed, errno %d", name, OG_ERRNO);
        return OG_ERROR;
    }

    OG_LOG_RUN_INF("CreateFile succeed\n");

    return OG_SUCCESS;
}

void cm_close_disk(disk_handle_t handle)
{
    (void)CloseHandle(handle);
}

uint64 cm_get_disk_size(disk_handle_t handle)
{
    DWORD low32;
    DWORD high32;
    uint64 size;

    low32 = GetFileSize(handle, &high32);
    if (low32 == INVALID_FILE_SIZE) {
        return 0;
    }

    size = (uint64)high32;
    size <<= 32;
    size += low32;
    return size;
}

status_t cm_seek_disk(disk_handle_t handle, uint64 offset)
{
    LONG low32;
    LONG high32;

    low32 = (LONG)(offset & 0xFFFFFFFF);
    high32 = (LONG)(offset >> 32);
    if (SetFilePointer(handle, low32, &high32, FILE_BEGIN) == INVALID_SET_FILE_POINTER) {
        OG_THROW_ERROR(ERR_CM_SEEK_DISK, OG_ERRNO);
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

status_t cm_try_read_disk(disk_handle_t handle, char *buffer, int32 size, int32 *read_size)
{
    CM_POINTER3(disk, buffer, read_size);

    if (ReadFile(handle, buffer, (DWORD)size, (LPDWORD)read_size, NULL) == FALSE) {
        DWORD code = GetLastError();
        OG_THROW_ERROR(ERR_CM_READ_DISK, OG_ERRNO);
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

status_t cm_try_write_disk(disk_handle_t handle, char *buffer, int32 size, int32 *written_size)
{
    if (!WriteFile(handle, buffer, (DWORD)size, (LPDWORD)written_size, NULL)) {
        DWORD code = GetLastError();
        OG_THROW_ERROR(ERR_CM_WRITE_DISK, OG_ERRNO);
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

#if defined(_DEBUG) || defined(DEBUG) || defined(DB_DEBUG_VERSION)
status_t _cm_lock_disk(disk_handle_t handle, uint64 offset, int32 size, const char *file, int line)
#else
status_t cm_lock_disk(disk_handle_t handle, uint64 offset, int32 size)
#endif
{
    if (cm_seek_disk(handle, offset) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (!LockFileEx(handle, LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY, 0, size, 0, NULL)) {
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

#else
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>

status_t cm_open_disk(const char *name, disk_handle_t *handle)
{
    *handle = open(name, O_RDWR | O_DIRECT | O_SYNC | O_CLOEXEC, 0);
    if (*handle == -1) {
        OG_THROW_ERROR(ERR_CM_OPEN_DISK, OG_ERRNO);
        OG_LOG_RUN_ERR("open %s failed:error code:%d,%s", name, errno, strerror(errno));
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

void cm_close_disk(disk_handle_t handle)
{
    int code;
    code = close(handle);
    if (code != 0) {
        // CMS_LOGGING("failed to close file with handle, error code %d", OG_ERRNO);
    }
}

status_t cm_seek_disk(disk_handle_t handle, uint64 offset)
{
    if (lseek64(handle, (off64_t)offset, SEEK_SET) == -1) {
        OG_THROW_ERROR(ERR_CM_SEEK_DISK, OG_ERRNO);
        OG_LOG_RUN_ERR("seek failed:error code:%d,%s", errno, strerror(errno));
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

uint64 cm_get_disk_size(disk_handle_t handle)
{
    return (uint64)lseek64(handle, 0, SEEK_END);
}

status_t cm_try_read_disk(disk_handle_t handle, char *buffer, int32 size, int32 *read_size)
{
    *read_size = read(handle, buffer, size);
    if (*read_size == -1) {
        OG_THROW_ERROR(ERR_CM_READ_DISK, OG_ERRNO);
        OG_LOG_RUN_ERR("read failed:error code:%d,%s", errno, strerror(errno));
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

status_t cm_try_write_disk(disk_handle_t handle, char *buffer, int32 size, int32 *written_size)
{
    *written_size = write(handle, buffer, size);

    if (*written_size == -1) {
        OG_THROW_ERROR(ERR_CM_WRITE_DISK, OG_ERRNO);
        OG_LOG_RUN_ERR("write failed:error code:%d,%s", errno, strerror(errno));
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

#if defined(_DEBUG) || defined(DEBUG) || defined(DB_DEBUG_VERSION)
status_t _cm_lock_disk(disk_handle_t handle, uint64 offset, int32 size, const char *file, int line)
{
    OG_LOG_DEBUG_INF("cm_disk_lock:%s:%d", file, line);
#else
status_t cm_lock_disk(disk_handle_t handle, uint64 offset, int32 size)
{
#endif
    if (cm_seek_disk(handle, offset) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (lockf(handle, F_TLOCK, size) != 0) {
        if (errno == EACCES || errno == EAGAIN) {
            return OG_ERROR;
        }

        OG_LOG_RUN_ERR("lockf failed:error code:%d,%s", errno, strerror(errno));
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

#endif

#if defined(_DEBUG) || defined(DEBUG) || defined(DB_DEBUG_VERSION)
status_t _cm_read_disk(disk_handle_t handle, uint64 offset, void *buf, int32 size, const char *file, int line)
{
#else
status_t cm_read_disk(disk_handle_t handle, uint64 offset, void *buf, int32 size)
{
#endif
    int32 curr_size;
    int32 total_size;
    date_t start = cm_now();

#ifndef WIN32
    total_size = 0;
    do {
        ssize_t n = pread64((int)handle, (char *)buf + total_size, (size_t)(size - total_size),
            (off64_t)(offset + (uint64)total_size));
        if (n == -1) {
            OG_THROW_ERROR(ERR_CM_READ_DISK, OG_ERRNO);
            OG_LOG_RUN_ERR("pread64 failed:error code:%d,%s", errno, strerror(errno));
            return OG_ERROR;
        }
        if (n == 0) {
            OG_LOG_RUN_ERR("pread64 unexpected EOF at offset %llu, got %d of %d bytes", (unsigned long long)offset,
                total_size, size);
            return OG_ERROR;
        }
        curr_size = (int32)n;
        total_size += curr_size;
    } while (total_size < size);
#else
    if (cm_seek_disk(handle, offset) != OG_SUCCESS) {
        return OG_ERROR;
    }

    total_size = 0;

    do {
        if (cm_try_read_disk(handle, (char *)buf + total_size, size - total_size, &curr_size) != OG_SUCCESS) {
            return OG_ERROR;
        }
        if (curr_size == 0) {
            OG_LOG_RUN_ERR("ReadFile unexpected EOF at offset %llu, got %d of %d bytes", (unsigned long long)offset,
                total_size, size);
            return OG_ERROR;
        }

        total_size += curr_size;
    } while (total_size < size);
#endif

    date_t end = cm_now();
#if defined(_DEBUG) || defined(DEBUG) || defined(DB_DEBUG_VERSION)
    OG_LOG_DEBUG_INF("cm_disk_read %d elapsed:%lld(ms) at %s:%d", size, (end - start) / MICROSECS_PER_MILLISEC, file,
                     line);
#else
    if (end - start > 50 * MICROSECS_PER_MILLISEC) {
        OG_LOG_RUN_WAR_LIMIT(LOG_PRINT_INTERVAL_SECOND_20, "cm_disk_read %d elapsed:%lld(ms)",
                             size, (end - start) / MICROSECS_PER_MILLISEC);
    }
#endif
    return OG_SUCCESS;
}

#if defined(_DEBUG) || defined(DEBUG) || defined(DB_DEBUG_VERSION)
status_t _cm_write_disk(disk_handle_t handle, uint64 offset, void *buf, int32 size, const char *file, int line)
{
#else
status_t cm_write_disk(disk_handle_t handle, uint64 offset, void *buf, int32 size)
{
#endif
    int32 curr_size;
    int32 total_size;

    date_t start = cm_now();

#ifndef WIN32
    total_size = 0;
    do {
        ssize_t n = pwrite64((int)handle, (const char *)buf + total_size, (size_t)(size - total_size),
            (off64_t)(offset + (uint64)total_size));
        if (n == -1) {
            OG_THROW_ERROR(ERR_CM_WRITE_DISK, OG_ERRNO);
            OG_LOG_RUN_ERR("pwrite64 failed:error code:%d,%s", errno, strerror(errno));
            return OG_ERROR;
        }
        if (n == 0) {
            OG_LOG_RUN_ERR("pwrite64 wrote 0 bytes at offset %llu, got %d of %d bytes", (unsigned long long)offset,
                total_size, size);
            return OG_ERROR;
        }
        curr_size = (int32)n;
        total_size += curr_size;
    } while (total_size < size);
#else
    if (cm_seek_disk(handle, offset) != OG_SUCCESS) {
        return OG_ERROR;
    }

    total_size = 0;

    do {
        if (cm_try_write_disk(handle, (char *)buf + total_size, size - total_size, &curr_size) != OG_SUCCESS) {
            return OG_ERROR;
        }
        if (curr_size == 0) {
            OG_LOG_RUN_ERR("WriteFile wrote 0 bytes at offset %llu, got %d of %d bytes", (unsigned long long)offset,
                total_size, size);
            return OG_ERROR;
        }

        total_size += curr_size;
    } while (total_size < size);
#endif

    date_t end = cm_now();
#if defined(_DEBUG) || defined(DEBUG) || defined(DB_DEBUG_VERSION)
    OG_LOG_DEBUG_INF("cm_disk_write %d elapsed:%lld(ms) at %s:%d", size, (end - start) / MICROSECS_PER_MILLISEC, file,
                     line);
#else
    if (end - start > 50 * MICROSECS_PER_MILLISEC) {
        OG_LOG_RUN_WAR_LIMIT(LOG_PRINT_INTERVAL_SECOND_20, "cm_disk_write %d elapsed:%lld(ms)",
                             size, (end - start) / MICROSECS_PER_MILLISEC);
    }
#endif
    return OG_SUCCESS;
}

status_t cm_lock_file_fd(int32 fd, uint8 type)
{
#ifndef _WIN32
    struct flock lk;
    if (type == DISK_LOCK_READ) {
        lk.l_type = F_RDLCK;
    } else if (type == DISK_LOCK_WRITE) {
        lk.l_type = F_WRLCK;
    } else {
        OG_LOG_DEBUG_ERR("incorrect type,type =%d.", type);
        return OG_ERROR;
    }
    lk.l_whence = SEEK_SET;
    lk.l_start = lk.l_len = 0;

    if (fcntl(fd, F_SETLK, &lk) != 0) {
        OG_LOG_DEBUG_ERR("cm_lock_file_fd failed, fd %d, errno %d, err info %s.",
            fd, errno, strerror(errno));
        return OG_ERROR;
    }
#endif
    return OG_SUCCESS;
}

status_t cm_lockw_file_fd(int32 fd)
{
    uint8 type = DISK_LOCK_WRITE;
    if (cm_lock_file_fd(fd, type) != OG_SUCCESS) {
        OG_LOG_DEBUG_ERR("cm_lockw_file_fd failed, fd %d.", fd);
        return OG_ERROR;
    }
    return OG_SUCCESS;
}

status_t cm_lockr_file_fd(int32 fd)
{
    uint8 type = DISK_LOCK_READ;
    if (cm_lock_file_fd(fd, type) != OG_SUCCESS) {
        OG_LOG_DEBUG_ERR("cm_lockr_file_fd failed, fd %d.", fd);
        return OG_ERROR;
    }
    return OG_SUCCESS;
}

status_t cm_lock_record_fd(int32 fd, uint32 id, uint8 type)
{
#ifndef _WIN32
    struct flock lk;

    uint32 part_id = id % CM_DISK_PART_COUNT;
    if (type == DISK_LOCK_READ) {
        lk.l_type = F_RDLCK;
    } else if (type == DISK_LOCK_WRITE) {
        lk.l_type = F_WRLCK;
    } else {
        OG_LOG_DEBUG_ERR("incorrect type,type =%d.", type);
        return OG_ERROR;
    }
    lk.l_whence = SEEK_SET;
    lk.l_start = (part_id - 1) * CM_FILE_PART_BLOCK_SIZE;
    lk.l_len = CM_FILE_PART_BLOCK_SIZE;

    if (fcntl(fd, F_SETLK, &lk) != 0) {
        OG_LOG_DEBUG_ERR("cm_lock_record_fd failed, fd %d.", fd);
        return OG_ERROR;
    }
#endif
    return OG_SUCCESS;
}

status_t cm_lockw_record_fd(int32 fd, uint32 id)
{
    uint8 type = DISK_LOCK_WRITE;
    if (cm_lock_record_fd(fd, id, type) != OG_SUCCESS) {
        OG_LOG_DEBUG_ERR("cm_lockw_record_fd failed, fd %d.", fd);
        return OG_ERROR;
    }
    return OG_SUCCESS;
}

status_t cm_lockr_record_fd(int32 fd, uint32 id)
{
    uint8 type = DISK_LOCK_READ;
    if (cm_lock_record_fd(fd, id, type) != OG_SUCCESS) {
        OG_LOG_DEBUG_ERR("cm_lockr_record_fd failed, fd %d.", fd);
        return OG_ERROR;
    }
    return OG_SUCCESS;
}

status_t cm_unlock_file_fd(int32 fd)
{
#ifndef _WIN32
    struct flock lk;

    lk.l_type = F_UNLCK;
    lk.l_whence = SEEK_SET;
    lk.l_start = lk.l_len = 0;

    if (fcntl(fd, F_SETLK, &lk) != 0) {
        OG_LOG_DEBUG_ERR("cm_unlock_file_fd failed, fd= %d.", fd);
        return OG_ERROR;
    }
#endif
    return OG_SUCCESS;
}

status_t cm_unlock_record_fd(int32 fd, uint32 id)
{
#ifndef _WIN32
    struct flock lk;

    uint32 part_id = id % CM_DISK_PART_COUNT;
    lk.l_type = F_UNLCK;
    lk.l_whence = SEEK_SET;
    lk.l_start = (part_id - 1) * CM_FILE_PART_BLOCK_SIZE;
    lk.l_len = CM_FILE_PART_BLOCK_SIZE;

    if (fcntl(fd, F_SETLK, &lk) != 0) {
        OG_LOG_DEBUG_ERR("cm_unlock_record_fd, fd = %d.", fd);
        return OG_ERROR;
    }
#endif
    return OG_SUCCESS;
}

status_t cm_lock_range_fd(int32 fd, uint64 l_start, uint64 l_len, uint8 type)
{
#ifndef _WIN32
    struct flock lk;
    if (type == DISK_LOCK_READ) {
        lk.l_type = F_RDLCK;
    } else if (type == DISK_LOCK_WRITE) {
        lk.l_type = F_WRLCK;
    } else {
        OG_LOG_DEBUG_ERR("incorrect type, type %d.", type);
        return OG_ERROR;
    }
    lk.l_whence = SEEK_SET;
    lk.l_start = l_start;
    lk.l_len = l_len;
 
    if (fcntl(fd, F_SETLK, &lk) != 0) {
        OG_LOG_DEBUG_ERR("cm_lock_range_fd failed, fd %d, errno %d, err info %s.",
            fd, errno, strerror(errno));
        return OG_ERROR;
    }
#endif
    return OG_SUCCESS;
}

status_t cm_dbs_lock_init(char *fileName, uint32 offset, uint32 len, int32* lockId)
{
    if (fileName == NULL || lockId == NULL) {
        OG_LOG_RUN_ERR("cm_dbs_lock_init para invalid.");
        return OG_ERROR;
    }
    int32 ret = dbs_global_handle()->dbs_init_lock(fileName, offset, len, lockId);
    if (ret != 0) {
        OG_LOG_RUN_ERR("failed(%d) to init dbs lock", ret);
        return OG_ERROR;
    }
    return OG_SUCCESS;
}

int32 cm_lock_range_dbs(int32 fd, uint8 lock_type)
{
#ifndef _WIN32
    int32 ret = dbs_global_handle()->dbs_inst_lock((uint32_t)fd, (uint32_t)lock_type);
    if (ret != 0) {
        OG_LOG_DEBUG_ERR("cm_lock_range_dbs failed %d, fd %d type %d.", ret, fd, lock_type);
        if (ret == CM_DBS_LOCK_OCCUPIED) {
            return OG_EAGAIN;
        } else if (ret == CM_DBS_LINK_DOWN_ERROR) {
            return CM_DBS_LINK_DOWN_ERROR;
        }
        return OG_ERROR;
    }
#endif
    return OG_SUCCESS;
}

status_t cm_lockw_range_fd(int32 fd, uint64 l_start, uint64 l_len)
{
    uint8 type = DISK_LOCK_WRITE;
    if (cm_lock_range_fd(fd, l_start, l_len, type) != OG_SUCCESS) {
        OG_LOG_DEBUG_ERR("cm_lockw_range_fd failed, fd %d.", fd);
        return OG_ERROR;
    }
    return OG_SUCCESS;
}
 
status_t cm_lockr_range_fd(int32 fd, uint64 l_start, uint64 l_len)
{
    uint8 type = DISK_LOCK_READ;
    if (cm_lock_range_fd(fd, l_start, l_len, type) != OG_SUCCESS) {
        OG_LOG_DEBUG_ERR("cm_lockr_range_fd failed, fd %d.", fd);
        return OG_ERROR;
    }
    return OG_SUCCESS;
}

status_t cm_unlock_range_fd(int32 fd, uint64 l_start, uint64 l_len)
{
#ifndef _WIN32
    struct flock lk;
    lk.l_type = F_UNLCK;
    lk.l_whence = SEEK_SET;
    lk.l_start = l_start;
    lk.l_len = l_len;
 
    if (fcntl(fd, F_SETLK, &lk) != 0) {
        OG_LOG_DEBUG_ERR("cm_unlock_record_fd, fd %d, errno %d, err info %s.",
            fd, errno, strerror(errno));
        return OG_ERROR;
    }
#endif
    return OG_SUCCESS;
}

int32 cm_unlock_range_dbs(int32 fd, uint8 lock_type)
{
#ifndef _WIN32
    int32 ret = dbs_global_handle()->dbs_inst_unlock(fd, lock_type);
    if (ret == CM_DBS_LINK_DOWN_ERROR) {
        return CM_DBS_LINK_DOWN_ERROR;
    }
    if (ret != 0) {
        OG_LOG_DEBUG_ERR("cm_unlock_range_dbs failed, fd %d type %d.", fd, lock_type);
        return OG_ERROR;
    }
#endif
    return OG_SUCCESS;
}

int32 cm_unlock_range_dbs_force(int32 fd, uint8 lock_type)
{
#ifndef _WIN32
    int32 ret = dbs_global_handle()->dbs_inst_unlock_force(fd, lock_type);
    if (ret == CM_DBS_LINK_DOWN_ERROR) {
        return CM_DBS_LINK_DOWN_ERROR;
    }
    if (ret != 0) {
        OG_LOG_DEBUG_ERR("cm_unlock_range_dbs_force failed, fd %d type %d.", fd, lock_type);
        return OG_ERROR;
    }
#endif
    return OG_SUCCESS;
}

bool32 cm_check_dbs_beat(uint32 timeout)
{
#ifndef _WIN32
    if ((int32)dbs_global_handle()->dbs_check_inst_heart_beat_is_normal(timeout) == 0) {
        OG_LOG_DEBUG_ERR("dbs beat not normal.");
        return OG_FALSE;
    }
#endif
    return OG_TRUE;
}