* Copyright (c) 2021 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* segment.c
*
*
* IDENTIFICATION
* src/storage/segment.c
*
* -------------------------------------------------------------------------
*/
#include "segment.h"
#include "cm_list.h"
#include "cm_text.h"
#include "stream.h"
#include "cm_checksum.h"
#include "util_profile_stat.h"
status_t switch_segment_to(segment_t *segment, bool32 to_open)
{
#ifdef WIN32
release_segment_resource(segment);
#endif
char old_name[CM_MAX_PATH_LEN];
CM_RETURN_IFERR(make_file_name(old_name, segment->home, segment->first_index, segment->last_index, !to_open));
char new_name[CM_MAX_PATH_LEN];
CM_RETURN_IFERR(make_file_name(new_name, segment->home, segment->first_index, segment->last_index, to_open));
CM_RETURN_IFERR(cm_rename_file(old_name, new_name));
segment->is_open = to_open;
return CM_SUCCESS;
}
static inline void decode_entry_head(const char *buf, entry_head_t *head)
{
head->term = *(uint64*)(buf + OFFSET_OF_TERM);
head->index = *(uint64*)(buf + OFFSET_OF_INDEX);
head->key = *(uint64*)(buf + OFFSET_OF_KEY);
head->type = *(uint32*)(buf + OFFSET_OF_TYPE);
head->size = *(uint32*)(buf + OFFSET_OF_SIZE);
head->data_chksum = *(uint32*)(buf + OFFSET_OF_DATA_CHKSUM);
head->head_chksum = *(uint32*)(buf + OFFSET_OF_HEAD_CHKSUM);
}
status_t load_log_entry(int32 fd, int64 offset, char *buf, uint32 size, entry_head_t *head, bool32 *is_valid)
{
int32 read_size;
size = MAX(size, ENTRY_HEAD_SIZE);
CM_RETURN_IFERR(cm_pread_file(fd, buf, size, offset, &read_size));
*is_valid = read_size != size ? CM_FALSE : CM_TRUE;
if (!(*is_valid)) {
return CM_SUCCESS;
}
decode_entry_head(buf, head);
if (!cm_verify_checksum(buf, ENTRY_HEAD_SIZE - sizeof(uint32), head->head_chksum)) {
LOG_DEBUG_ERR("[STG]Log entry head checksum is invalid at offset:%lld", offset);
*is_valid = CM_FALSE;
return CM_SUCCESS;
}
if (size == ENTRY_HEAD_SIZE) {
return CM_SUCCESS;
}
if (!cm_verify_checksum(buf + ENTRY_HEAD_SIZE, head->size, head->data_chksum)) {
LOG_DEBUG_ERR("[STG]Log entry data checksum is invalid at offset:%lld", offset);
*is_valid = CM_FALSE;
return CM_SUCCESS;
}
return CM_SUCCESS;
}
static status_t load_index(segment_t *segment)
{
char log_file[CM_MAX_PATH_LEN];
CM_RETURN_IFERR(make_file_name(log_file, segment->home, segment->first_index, segment->last_index,
segment->is_open));
CM_RETURN_IFERR(cm_open_file(log_file, O_BINARY | O_RDWR | O_SYNC, &segment->fd));
int64 entry_off = 0;
bool32 is_valid = CM_TRUE;
uint64 last_index = segment->first_index - 1;
int64 file_size = cm_file_size(segment->fd);
if (file_size < 0) {
LOG_DEBUG_ERR("[STG]Get segment file size failed");
return CM_ERROR;
}
int64 scan_size = file_size;
if (segment->max_size > 0 && (uint64)file_size > segment->max_size) {
LOG_DEBUG_ERR("[STG]Segment file size %lld exceeds limit %llu", file_size, segment->max_size);
scan_size = (int64)segment->max_size;
}
entry_head_t entry_head;
char head_buf[ENTRY_HEAD_SIZE];
for (int64 i = segment->first_index; entry_off < scan_size; i++) {
CM_RETURN_IFERR(load_log_entry(segment->fd, entry_off, head_buf, ENTRY_HEAD_SIZE, &entry_head, &is_valid));
if (!is_valid) {
break;
}
int64 skip_len = entry_head.size + ENTRY_HEAD_SIZE;
if (entry_off + skip_len > scan_size) {
break;
}
log_index_t log_index;
log_index.term = entry_head.term;
log_index.offset = (uint64)entry_off;
CM_RETURN_IFERR(index_buf_add(&segment->indexes, &log_index));
++last_index;
entry_off += skip_len;
}
if (segment->is_open) {
segment->last_index = last_index;
} else if (last_index < segment->last_index) {
LOG_DEBUG_ERR("[STG]Last index does not match between file and segment");
return CM_ERROR;
}
if (entry_off != file_size) {
CM_RETURN_IFERR(cm_truncate_file(segment->fd, entry_off));
}
if (cm_seek_file(segment->fd, entry_off, SEEK_SET) < 0) {
return CM_ERROR;
}
segment->size = (uint64)entry_off;
return CM_SUCCESS;
}
status_t try_load_index(segment_t *segment)
{
if (segment->fd != -1) {
return CM_SUCCESS;
}
cm_latch_x(&segment->latch, 0, NULL);
if (segment->fd != -1) {
cm_unlatch(&segment->latch, NULL);
return CM_SUCCESS;
}
if (load_index(segment) != CM_SUCCESS) {
release_segment_resource(segment);
cm_unlatch(&segment->latch, NULL);
return CM_ERROR;
}
cm_unlatch(&segment->latch, NULL);
return CM_SUCCESS;
}
static status_t segment_get_log_meta(segment_t *segment, uint64 index, log_meta_t *meta)
{
cm_latch_s(&segment->latch, 0, CM_FALSE, NULL);
if (segment->first_index == segment->last_index + 1) {
cm_unlatch(&segment->latch, NULL);
return CM_ERROR;
}
if (index > segment->last_index || index < segment->first_index) {
cm_unlatch(&segment->latch, NULL);
return CM_ERROR;
}
log_index_t curr, next;
uint32 curr_idx = (uint32)(index - segment->first_index);
if (index_buf_get(&segment->indexes, curr_idx, &curr) != CM_SUCCESS) {
cm_unlatch(&segment->latch, NULL);
return CM_ERROR;
}
meta->term = curr.term;
meta->offset = curr.offset;
if (index < segment->last_index) {
if (index_buf_get(&segment->indexes, curr_idx + 1, &next) != CM_SUCCESS) {
cm_unlatch(&segment->latch, NULL);
return CM_ERROR;
}
meta->size = (uint32)((next.offset - curr.offset) - ENTRY_HEAD_SIZE);
} else {
meta->size = (uint32)((segment->size - curr.offset) - ENTRY_HEAD_SIZE);
}
cm_unlatch(&segment->latch, NULL);
return CM_SUCCESS;
}
log_entry_t* segment_get_entry(segment_t *segment, uint64 index, mem_pool_t *pool)
{
log_meta_t meta;
if (segment_get_log_meta(segment, index, &meta) != CM_SUCCESS) {
return NULL;
}
uint32 total_size = sizeof(log_entry_t) + ENTRY_HEAD_SIZE + meta.size;
log_entry_t *entry = (log_entry_t*)galloc(total_size, pool);
if (entry == NULL) {
LOG_DEBUG_ERR("[STG]segment_get_entry alloc entry failed");
return NULL;
}
entry->ref_count = 1;
entry->valid = CM_FALSE;
entry->from_pool = CM_TRUE;
entry->data = (char *)entry + sizeof(log_entry_t);
cm_latch_init(&entry->latch);
bool32 is_valid = CM_TRUE;
char *io_buf = entry->data;
uint32 io_size = meta.size + ENTRY_HEAD_SIZE;
entry_head_t entry_head;
if (load_log_entry(segment->fd, meta.offset, io_buf, io_size, &entry_head, &is_valid) != CM_SUCCESS || !is_valid) {
gfree(entry);
return NULL;
}
if (entry_head.term != meta.term) {
LOG_DEBUG_ERR("[STG]Mismatch term %llu with index %llu", entry_head.term, meta.term);
gfree(entry);
return NULL;
}
return entry;
}
status_t segment_write_entry(segment_t *segment, log_entry_t *entry)
{
char *io_buf = entry->data;
uint32 io_size = ENTRY_LENGTH(entry);
if (cm_pwrite_file_stat(segment->fd, io_buf, (int32)io_size, (int64)segment->size) != CM_SUCCESS) {
LOG_DEBUG_ERR("[STG]segment_write_entry:write segment file failed");
return CM_ERROR;
}
write_conf_func_t write_conf_func = get_write_conf_func();
if (ENTRY_TYPE(entry) == ENTRY_TYPE_CONF && write_conf_func != NULL) {
if (write_conf_func(ENTRY_BUF(entry), ENTRY_SIZE(entry)) != CM_SUCCESS) {
LOG_DEBUG_ERR("[STG]segment_write_entry:write config file failed");
return CM_ERROR;
}
}
log_index_t log_index;
cm_latch_x(&segment->latch, 0, NULL);
log_index.offset = segment->size;
log_index.term = ENTRY_TERM(entry);
if (index_buf_add(&segment->indexes, &log_index) != CM_SUCCESS) {
cm_unlatch(&segment->latch, NULL);
LOG_DEBUG_ERR("[STG]segment_write_entry:add index cache failed");
return CM_ERROR;
}
segment->last_index++;
segment->size += io_size;
cm_unlatch(&segment->latch, NULL);
return CM_SUCCESS;
}
uint64 segment_get_term(segment_t *segment, uint64 index)
{
log_meta_t meta;
if (segment_get_log_meta(segment, index, &meta) != CM_SUCCESS) {
return CM_INVALID_TERM_ID;
}
return meta.term;
}
status_t segment_trunc_suffix(segment_t *segment, uint64 last_index_kept)
{
if (last_index_kept >= segment->last_index) {
return CM_SUCCESS;
}
if (!segment->is_open) {
CM_RETURN_IFERR(switch_segment_to(segment, CM_TRUE));
}
CM_RETURN_IFERR(try_load_index(segment));
cm_latch_s(&segment->latch, 0, CM_FALSE, NULL);
uint32 trunc_id = (uint32)(last_index_kept + 1 - segment->first_index);
log_index_t trunc_index;
if (index_buf_get(&segment->indexes, trunc_id, &trunc_index) != CM_SUCCESS) {
cm_unlatch(&segment->latch, NULL);
return CM_ERROR;
}
cm_unlatch(&segment->latch, NULL);
CM_RETURN_IFERR(cm_truncate_file(segment->fd, trunc_index.offset));
if (cm_seek_file(segment->fd, trunc_index.offset, SEEK_SET) < 0) {
return CM_ERROR;
}
cm_latch_x(&segment->latch, 0, NULL);
segment->size = trunc_index.offset;
segment->last_index = last_index_kept;
index_buf_resize(&segment->indexes, trunc_id);
cm_unlatch(&segment->latch, NULL);
return CM_SUCCESS;
}