* Copyright (c) 2021 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* segment.h
*
*
* IDENTIFICATION
* src/storage/segment.h
*
* -------------------------------------------------------------------------
*/
#ifndef __SEGMENT_H__
#define __SEGMENT_H__
#include "bucket_list.h"
#include "stg_manager.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct st_log_index {
uint64 term;
uint64 offset;
}log_index_t;
typedef struct st_log_meta {
uint32 size;
uint64 term;
uint64 offset;
}log_meta_t;
typedef struct st_index_buf {
uint32 size;
uint32 capacity;
log_index_t *buf;
}index_buf_t;
typedef struct st_segment {
int fd;
char *home;
uint64 first_index;
uint64 last_index;
uint64 size;
uint64 max_size;
latch_t latch;
bool32 valid;
bool32 is_open;
uint32 ref_count;
index_buf_t indexes;
}segment_t;
#define IBUF_EXTENT_SIZE 100000
static inline void index_buf_init(index_buf_t *ibuf)
{
ibuf->size = 0;
ibuf->buf = NULL;
ibuf->capacity = 0;
}
static inline status_t index_buf_add(index_buf_t *ibuf, const log_index_t *log_index)
{
if (ibuf->size >= ibuf->capacity) {
uint32 buf_size = (ibuf->capacity + IBUF_EXTENT_SIZE) * sizeof(log_index_t);
char *new_buf = (char *)malloc(buf_size);
if (new_buf == NULL) {
LOG_DEBUG_ERR("[STG]index_buf_add malloc %d failed", buf_size);
return CM_ERROR;
}
if (ibuf->buf != NULL) {
errno_t errcode = memcpy_sp(new_buf, (size_t)buf_size, ibuf->buf,
(size_t)(ibuf->size * sizeof(log_index_t)));
if (errcode != EOK) {
CM_FREE_PTR(new_buf);
LOG_DEBUG_ERR("[STG]index_buf_add memcpy_sp failed");
return CM_ERROR;
}
CM_FREE_PTR(ibuf->buf);
}
ibuf->buf = (log_index_t *)new_buf;
ibuf->capacity += IBUF_EXTENT_SIZE;
}
ibuf->buf[ibuf->size++] = *log_index;
return CM_SUCCESS;
}
static inline status_t index_buf_add_batch(index_buf_t *dst, const index_buf_t *src)
{
if (dst->size + src->size > dst->capacity) {
uint32 buf_size = (dst->capacity + src->size + IBUF_EXTENT_SIZE) * sizeof(log_index_t);
char *new_buf = (char *)malloc(buf_size);
if (new_buf == NULL) {
LOG_DEBUG_ERR("[STG]index_buf_add_batch malloc %d failed", buf_size);
return CM_ERROR;
}
if (dst->buf != NULL) {
errno_t errcode = memcpy_sp(new_buf, (size_t)buf_size, dst->buf, (size_t)(dst->size * sizeof(log_index_t)));
if (errcode != EOK) {
CM_FREE_PTR(new_buf);
LOG_DEBUG_ERR("[STG]index_buf_add_batch memcpy_sp failed");
return CM_ERROR;
}
CM_FREE_PTR(dst->buf);
}
dst->buf = (log_index_t *)new_buf;
dst->capacity += IBUF_EXTENT_SIZE + src->size;
}
if (memcpy_sp(dst->buf + dst->size, (size_t)((dst->capacity - dst->size) * sizeof(log_index_t)),
src->buf, (size_t)(src->size * sizeof(log_index_t))) != EOK) {
LOG_DEBUG_ERR("[STG]index_buf_add_batch memcpy_sp failed");
return CM_ERROR;
}
dst->size += src->size;
return CM_SUCCESS;
}
static inline status_t index_buf_get(const index_buf_t *ibuf, uint32 index, log_index_t *log_index)
{
if (index >= ibuf->size) {
return CM_ERROR;
}
*log_index = ibuf->buf[index];
return CM_SUCCESS;
}
static inline void index_buf_resize(index_buf_t *ibuf, uint32 size)
{
ibuf->size = size;
}
static inline void index_buf_deinit(index_buf_t *ibuf)
{
if (ibuf->buf != NULL) {
CM_FREE_PTR(ibuf->buf);
}
index_buf_init(ibuf);
}
typedef struct st_entry_head {
uint64 term;
uint64 index;
uint64 key;
uint32 type;
uint32 size;
uint32 data_chksum;
uint32 head_chksum;
}entry_head_t;
#define STG_CLOSE_PATTERN "log_%020" "llu" "_%020" "llu"
#define STG_OPEN_PATTERN "log_inprogress_%020" "llu"
status_t try_load_index(segment_t *segment);
log_entry_t* segment_get_entry(segment_t *segment, uint64 index, mem_pool_t *pool);
status_t segment_write_entry(segment_t *segment, log_entry_t *log_entry);
uint64 segment_get_term(segment_t *segment, uint64 index);
status_t segment_trunc_suffix(segment_t *segment, uint64 last_index_kept);
status_t switch_segment_to(segment_t *segment, bool32 to_open);
status_t load_log_entry(int32 fd, int64 offset, char *buf, uint32 size, entry_head_t *head, bool32 *is_valid);
static inline status_t make_file_name(char *name, char *home, uint64 first_index, uint64 last_index, bool32 is_open)
{
if (!is_open) {
PRTS_RETURN_IFERR(snprintf_s(name, CM_MAX_PATH_LEN, CM_MAX_PATH_LEN - 1,
"%s/" STG_CLOSE_PATTERN, home, first_index, last_index));
return CM_SUCCESS;
}
PRTS_RETURN_IFERR(snprintf_s(name, CM_MAX_PATH_LEN, CM_MAX_PATH_LEN - 1,
"%s/" STG_OPEN_PATTERN, home, first_index));
return CM_SUCCESS;
}
static inline status_t segment_unlink(segment_t *segment)
{
#ifdef WIN32
if (segment->fd != -1) {
cm_close_file(segment->fd);
segment->fd = -1;
}
#endif
char file_name[CM_MAX_PATH_LEN];
CM_RETURN_IFERR(make_file_name(file_name, segment->home, segment->first_index,
segment->last_index, segment->is_open));
char tmp_name[CM_MAX_PATH_LEN];
PRTS_RETURN_IFERR(snprintf_s(tmp_name, CM_MAX_PATH_LEN, CM_MAX_PATH_LEN - 1, "%s.tmp", file_name));
return cm_rename_file(file_name, tmp_name);
}
static inline void release_segment_resource(segment_t *segment)
{
if (segment->fd != -1) {
cm_close_file(segment->fd);
segment->fd = -1;
}
index_buf_deinit(&segment->indexes);
}
static inline void destroy_segment_object(segment_t *segment)
{
release_segment_resource(segment);
CM_FREE_PTR(segment);
}
static inline void try_destroy_segment_object(segment_t *segment)
{
cm_latch_x(&segment->latch, 0, NULL);
segment->valid = CM_FALSE;
if (segment->ref_count > 0) {
cm_unlatch(&segment->latch, NULL);
return;
}
cm_unlatch(&segment->latch, NULL);
destroy_segment_object(segment);
}
static inline status_t destroy_segment_instance(segment_t *segment)
{
status_t status = segment_unlink(segment);
try_destroy_segment_object(segment);
return status;
}
static inline void segment_inc_def(segment_t *segment)
{
cm_latch_x(&segment->latch, 0, NULL);
segment->ref_count++;
cm_unlatch(&segment->latch, NULL);
}
static inline void segment_dec_def(segment_t *segment)
{
bool32 need_free = CM_FALSE;
cm_latch_x(&segment->latch, 0, NULL);
if (segment->ref_count == 1 && !segment->valid) {
need_free = CM_TRUE;
} else {
segment->ref_count--;
}
cm_unlatch(&segment->latch, NULL);
if (need_free) {
destroy_segment_object(segment);
}
}
#ifdef __cplusplus
}
#endif
#endif