* Copyright (c) 2021 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* log_storage.c
* log storage
*
* IDENTIFICATION
* src/storage/log_storage.c
*
* -------------------------------------------------------------------------
*/
#include "log_storage.h"
#include "cm_list.h"
#include "cm_text.h"
#include "cm_checksum.h"
#include "meta_storage.h"
static inline bool32 is_valid_log_file(const char *file_name, bool32 is_empty)
{
if ((is_empty && strncmp(file_name, "log_", strlen("log_")) == 0) ||
(strncmp(file_name + (strlen(file_name) - strlen(".tmp")), ".tmp", strlen(".tmp"))) == 0) {
return CM_FALSE;
}
return CM_TRUE;
}
static inline segment_t *alloc_segment(char *home, uint64 first_index, uint64 last_index,
uint64 max_size, bool32 is_open)
{
segment_t *segment = (segment_t *)malloc(sizeof(segment_t));
if (segment == NULL) {
LOG_DEBUG_ERR("[STG]alloc_segment malloc failed");
return NULL;
}
segment->fd = -1;
segment->home = home;
segment->size = 0;
segment->max_size = max_size;
segment->valid = CM_TRUE;
segment->is_open = is_open;
segment->ref_count = 0;
segment->last_index = last_index;
segment->first_index = first_index;
cm_latch_init(&segment->latch);
index_buf_init(&segment->indexes);
return segment;
}
static inline segment_t *create_open_segment(log_storage_t *storage)
{
char file_name[CM_MAX_PATH_LEN];
if (make_file_name(file_name, storage->home, storage->last_index + 1,
storage->last_index, CM_TRUE) != CM_SUCCESS) {
LOG_DEBUG_ERR("[STG]create_open_segment make file name failed");
return NULL;
}
segment_t *segment = alloc_segment(storage->home, storage->last_index + 1, storage->last_index,
storage->segment_size, CM_TRUE);
if (segment == NULL) {
LOG_DEBUG_ERR("[STG]create_open_segment alloc_segment failed");
return NULL;
}
if (cm_create_file(file_name, O_RDWR | O_TRUNC | O_BINARY | O_SYNC, &(segment->fd)) != CM_SUCCESS) {
destroy_segment_object(segment);
LOG_DEBUG_ERR("[STG]create_open_segment create file failed");
return NULL;
}
return segment;
}
segment_t *get_open_segment(log_storage_t *storage)
{
cm_latch_x(&storage->latch, 0, NULL);
if (storage->open_segment == NULL) {
storage->open_segment = create_open_segment(storage);
} else if (storage->open_segment->size >= storage->segment_size) {
if (bucket_list_add(&storage->segments, storage->open_segment) != CM_SUCCESS) {
cm_unlatch(&storage->latch, NULL);
LOG_DEBUG_ERR("[STG]get_open_segment add segment failed");
return NULL;
}
if (switch_segment_to(storage->open_segment, CM_FALSE) != CM_SUCCESS) {
bucket_list_del_last(&storage->segments);
cm_unlatch(&storage->latch, NULL);
LOG_DEBUG_ERR("[STG]get_open_segment switch segment failed");
return NULL;
}
storage->open_segment = create_open_segment(storage);
}
if (storage->open_segment != NULL) {
segment_inc_def(storage->open_segment);
}
cm_unlatch(&storage->latch, NULL);
return storage->open_segment;
}
static status_t load_segment(log_storage_t *storage, char *file_name, bool32 is_empty)
{
if (CM_STR_EQUAL(file_name, ".") || CM_STR_EQUAL(file_name, "..") || CM_STR_EQUAL(file_name, STG_LOG_META) ||
CM_STR_EQUAL(file_name, STG_RAFT_META_01) || CM_STR_EQUAL(file_name, STG_RAFT_META_02)) {
return CM_SUCCESS;
}
if (!is_valid_log_file(file_name, is_empty)) {
return stg_remove_file(storage->home, file_name);
}
uint64 first_index = 0;
uint64 last_index = 0;
segment_t *segment = NULL;
int match = sscanf_s(file_name, STG_CLOSE_PATTERN, &first_index, &last_index);
if (match == 2) {
segment = alloc_segment(storage->home, first_index, last_index, storage->segment_size, CM_FALSE);
if (segment == NULL) {
return CM_ERROR;
}
if (bucket_list_add(&storage->segments, segment) != CM_SUCCESS) {
destroy_segment_object(segment);
return CM_ERROR;
}
return CM_SUCCESS;
}
match = sscanf_s(file_name, STG_OPEN_PATTERN, &first_index);
if (match != 1) {
LOG_DEBUG_ERR("[STG]Invalid log file %s", file_name);
return CM_ERROR;
}
if (storage->open_segment != NULL) {
LOG_DEBUG_ERR("[STG]Invalid log file %s", file_name);
return CM_ERROR;
}
storage->open_segment = alloc_segment(storage->home, first_index, first_index - 1, storage->segment_size, CM_TRUE);
return storage->open_segment == NULL ? CM_ERROR : CM_SUCCESS;
}
static status_t check_segments(log_storage_t *storage)
{
uint64 last_index = CM_INVALID_ID64;
for (uint32 i = 0; i < storage->segments.size; ++i) {
segment_t *segment = (segment_t *)bucket_list_get(&storage->segments, i);
if (segment->first_index > segment->last_index) {
LOG_DEBUG_ERR("[STG]Segment first index is greater than last index");
return CM_ERROR;
}
if (last_index != CM_INVALID_ID64 && segment->first_index != last_index + 1) {
LOG_DEBUG_ERR("[STG]Index is discontinuous between segments");
return CM_ERROR;
}
if (last_index == CM_INVALID_ID64 && segment->first_index > storage->first_index) {
LOG_DEBUG_ERR("[STG]First segment's first index is greater than storage's");
return CM_ERROR;
}
if (last_index == CM_INVALID_ID64 && segment->last_index < storage->first_index) {
LOG_DEBUG_WAR("[STG]Segment is garbage data, remove it");
bucket_list_del_first(&storage->segments);
(void)destroy_segment_instance(segment);
continue;
}
last_index = segment->last_index;
}
if (storage->open_segment != NULL) {
if (last_index == CM_INVALID_ID64 && storage->open_segment->first_index > storage->first_index) {
LOG_DEBUG_ERR("[STG]First segment's first index is greater than storage's");
return CM_ERROR;
}
if (last_index != CM_INVALID_ID64 && storage->open_segment->first_index != last_index + 1) {
LOG_DEBUG_ERR("[STG]Index is discontinuous between segments");
return CM_ERROR;
}
}
storage->last_index = last_index != CM_INVALID_ID64 ? last_index : storage->last_index;
return CM_SUCCESS;
}
static status_t list_segments(log_storage_t *storage, bool32 is_empty)
{
#ifdef WIN32
intptr_t handle;
struct _finddata_t fileData;
char file_name[CM_MAX_PATH_LEN] = { 0 };
char *prefix = (char *)"log_*";
PRTS_RETURN_IFERR(snprintf_s(file_name, CM_MAX_PATH_LEN, CM_MAX_PATH_LEN - 1, "%s/%s", storage->home, prefix));
handle = (intptr_t)_findfirst(file_name, &fileData);
if (-1L == handle) {
return CM_SUCCESS;
}
if (load_segment(storage, (char *)fileData.name, is_empty) != CM_SUCCESS) {
_findclose(handle);
return CM_ERROR;
}
while (_findnext(handle, &fileData) == 0) {
if (load_segment(storage, (char *)fileData.name, is_empty) != CM_SUCCESS) {
_findclose(handle);
return CM_ERROR;
}
}
_findclose(handle);
#else
DIR *dirPtr = NULL;
struct dirent *direntPtr = NULL;
dirPtr = opendir(storage->home);
if (dirPtr == NULL) {
return CM_ERROR;
}
direntPtr = readdir(dirPtr);
while (direntPtr != NULL) {
if (load_segment(storage, (char *)direntPtr->d_name, is_empty) != CM_SUCCESS) {
(void)closedir(dirPtr);
return CM_ERROR;
}
direntPtr = readdir(dirPtr);
}
(void)closedir(dirPtr);
#endif
return CM_SUCCESS;
}
static inline int32 segment_cmp_2(const pointer_t seg1, const pointer_t seg2)
{
if (((segment_t*)seg1)->last_index < ((segment_t*)seg2)->first_index) {
return -1;
}
if (((segment_t*)seg1)->first_index > ((segment_t*)seg2)->last_index) {
return 1;
}
return 0;
}
static status_t load_segments(log_storage_t *storage, bool32 is_empty)
{
if (list_segments(storage, is_empty) != CM_SUCCESS) {
return CM_ERROR;
}
bucket_list_sort(&storage->segments, segment_cmp_2);
return check_segments(storage);
}
static status_t load_open_segment_index(log_storage_t *storage)
{
segment_t *segment = storage->open_segment;
if (segment != NULL) {
CM_RETURN_IFERR(try_load_index(segment));
if (segment->last_index < storage->first_index) {
storage->open_segment = NULL;
LOG_DEBUG_WAR("[STG]open file is garbage data");
CM_RETURN_IFERR(destroy_segment_instance(segment));
} else {
storage->last_index = segment->last_index;
storage->last_term = segment_get_term(segment, segment->last_index);
}
}
if (storage->last_index == 0) {
storage->last_index = storage->first_index - 1;
}
return CM_SUCCESS;
}
void destroy_log_storage(log_storage_t *storage)
{
if (storage->open_segment != NULL) {
try_destroy_segment_object(storage->open_segment);
}
while (!bucket_list_empty(&storage->segments)) {
segment_t *segment = bucket_list_first(&storage->segments);
bucket_list_del_first(&storage->segments);
try_destroy_segment_object(segment);
}
bucket_list_deinit(&storage->segments);
buddy_pool_deinit(&storage->mem_pool);
}
static status_t save_log_meta(log_storage_t *storage, uint64 first_kept_index)
{
char file_name[CM_MAX_PATH_LEN];
PRTS_RETURN_IFERR(snprintf_s(file_name, CM_MAX_PATH_LEN, CM_MAX_PATH_LEN - 1, "%s/%s",
storage->home, STG_LOG_META));
int32 fd = -1;
CM_RETURN_IFERR(cm_open_file(file_name, O_RDWR | O_CREAT | O_BINARY, &fd));
char buf[LOG_META_LENGTH];
*(uint64*)(buf + LOG_META_OF_FST_INDEX) = first_kept_index;
uint32 checksum = cm_get_checksum(buf, LOG_META_LENGTH - sizeof(uint32));
*(uint32*)(buf + LOG_META_OF_CHECKSUM) = checksum;
if (cm_write_file(fd, buf, (int32)LOG_META_LENGTH) != CM_SUCCESS) {
cm_close_file(fd);
return CM_ERROR;
}
status_t status = cm_fdatasync_file(fd);
cm_close_file(fd);
return status;
}
static status_t load_log_meta(log_storage_t *storage, bool32 *is_empty)
{
char file_name[CM_MAX_PATH_LEN];
PRTS_RETURN_IFERR(snprintf_s(file_name, CM_MAX_PATH_LEN, CM_MAX_PATH_LEN - 1, "%s/%s",
storage->home, STG_LOG_META));
if (!cm_file_exist(file_name)) {
*is_empty = CM_TRUE;
return save_log_meta(storage, storage->first_index);
}
int32 fd = -1;
if (cm_open_file(file_name, O_RDONLY | O_BINARY, &fd) != CM_SUCCESS) {
return CM_ERROR;
}
int32 read_size;
char buf[LOG_META_LENGTH];
if (cm_read_file(fd, buf, LOG_META_LENGTH, &read_size) != CM_SUCCESS || read_size != LOG_META_LENGTH) {
cm_close_file(fd);
LOG_DEBUG_ERR("[STG]Read log meta file failed");
return CM_ERROR;
}
cm_close_file(fd);
uint32 chksum = *(uint32 *)(buf + LOG_META_OF_CHECKSUM);
if (!cm_verify_checksum(buf, LOG_META_LENGTH - sizeof(uint32), chksum)) {
LOG_DEBUG_ERR("[STG]Read log meta file failed, mismatch checksum");
return CM_ERROR;
}
storage->first_index = *(uint64 *)(buf + LOG_META_OF_FST_INDEX);
return CM_SUCCESS;
}
static status_t load_log_storage(log_storage_t *storage)
{
bool32 is_empty = CM_FALSE;
if (load_log_meta(storage, &is_empty) != CM_SUCCESS) {
return CM_ERROR;
}
if (load_segments(storage, is_empty) != CM_SUCCESS) {
return CM_ERROR;
}
return load_open_segment_index(storage);
}
status_t init_log_storage(log_storage_t *storage, char *home, uint64 applied_index)
{
storage->home = home;
storage->first_index = 1;
storage->last_index = 0;
storage->open_segment = NULL;
cm_latch_init(&storage->latch);
cm_latch_init(&storage->trunc_latch);
param_value_t init_size, max_size, file_size;
CM_RETURN_IFERR(md_get_param(DCF_PARAM_STG_POOL_INIT_SIZE, &init_size));
CM_RETURN_IFERR(md_get_param(DCF_PARAM_STG_POOL_MAX_SIZE, &max_size));
CM_RETURN_IFERR(buddy_pool_init("storage", init_size.stg_pool_init_size,
max_size.stg_pool_max_size, &storage->mem_pool));
CM_RETURN_IFERR(bucket_list_init(&storage->segments));
CM_RETURN_IFERR(md_get_param(DCF_PARAM_DATA_FILE_SIZE, &file_size));
storage->segment_size = file_size.data_file_size;
CM_RETURN_IFERR(load_log_storage(storage));
if (storage->last_index != CM_INVALID_INDEX_ID || applied_index == CM_INVALID_INDEX_ID) {
return CM_SUCCESS;
}
storage->first_index = applied_index + 1;
storage->last_index = applied_index;
return save_log_meta(storage, storage->first_index);
}
static inline int32 segment_cmp(const segment_t *segment, uint64 index)
{
if (index < segment->first_index) {
return -1;
}
if (index > segment->last_index) {
return 1;
}
return 0;
}
static segment_t* find_archive_segment(bucket_list_t *list, uint64 index)
{
uint32 curr;
uint32 begin = 0;
uint32 end = list->size;
while (begin < end) {
curr = (begin + end) / CM_2X_FIXED;
segment_t *segment = (segment_t *)bucket_list_get(list, curr);
int32 result = segment_cmp(segment, index);
if (result > 0) {
begin = curr + 1;
} else if (result < 0) {
end = curr;
} else {
return segment;
}
}
return NULL;
}
static segment_t* get_segment(log_storage_t *storage, uint64 index)
{
cm_latch_s(&storage->latch, 0, CM_FALSE, NULL);
if (storage->first_index == storage->last_index + 1) {
cm_unlatch(&storage->latch, NULL);
return NULL;
}
if (index < storage->first_index || index > storage->last_index) {
cm_unlatch(&storage->latch, NULL);
return NULL;
}
segment_t *segment = storage->open_segment;
if (segment == NULL || index < segment->first_index) {
segment = find_archive_segment(&storage->segments, index);
}
if (segment == NULL) {
cm_unlatch(&storage->latch, NULL);
return NULL;
}
segment_inc_def(segment);
cm_unlatch(&storage->latch, NULL);
if (try_load_index(segment) != CM_SUCCESS) {
segment_dec_def(segment);
return NULL;
}
return segment;
}
log_entry_t* storage_get_entry(log_storage_t *storage, uint64 index)
{
segment_t *segment = get_segment(storage, index);
if (segment == NULL) {
return NULL;
}
log_entry_t *log_entry = segment_get_entry(segment, index, &storage->mem_pool);
segment_dec_def(segment);
return log_entry;
}
status_t storage_write_entry_impl(log_storage_t *storage, log_entry_t *entry)
{
segment_t *segment = get_open_segment(storage);
if (segment == NULL) {
return CM_ERROR;
}
CM_ASSERT(storage->last_index == segment->last_index);
if (ENTRY_INDEX(entry) != segment->last_index + 1) {
LOG_DEBUG_WAR("[STG]Invalid log index %llu, segment's %llu", ENTRY_INDEX(entry), segment->last_index + 1);
segment_dec_def(segment);
return CM_SUCCESS;
}
if (segment_write_entry(segment, entry) != CM_SUCCESS) {
segment_dec_def(segment);
return CM_ERROR;
}
cm_latch_x(&storage->latch, 0, NULL);
storage->last_index++;
storage->last_term = ENTRY_TERM(entry);
cm_unlatch(&storage->latch, NULL);
segment_dec_def(segment);
return CM_SUCCESS;
}
status_t storage_write_entry(log_storage_t *storage, log_entry_t *entry)
{
cm_latch_s(&storage->trunc_latch, 0, CM_FALSE, NULL);
status_t status = storage_write_entry_impl(storage, entry);
cm_unlatch(&storage->trunc_latch, NULL);
return status;
}
uint64 storage_get_term(log_storage_t *storage, uint64 index)
{
segment_t *segment = get_segment(storage, index);
if (segment == NULL) {
return CM_INVALID_TERM_ID;
}
uint64 term = segment_get_term(segment, index);
segment_dec_def(segment);
return term;
}
static status_t pop_segment_from_first(log_storage_t *storage, uint64 first_index_kept, ptlist_t *list)
{
cm_latch_x(&storage->latch, 0, NULL);
storage->first_index = first_index_kept;
while (!bucket_list_empty(&storage->segments)) {
segment_t *segment = bucket_list_first(&storage->segments);
if (segment->last_index >= first_index_kept) {
cm_unlatch(&storage->latch, NULL);
return CM_SUCCESS;
}
if (cm_ptlist_add(list, segment) != CM_SUCCESS) {
cm_unlatch(&storage->latch, NULL);
return CM_ERROR;
}
bucket_list_del_first(&storage->segments);
}
if (storage->open_segment == NULL) {
storage->last_index = first_index_kept - 1;
cm_unlatch(&storage->latch, NULL);
return CM_SUCCESS;
}
if (storage->open_segment->last_index < first_index_kept) {
if (cm_ptlist_add(list, storage->open_segment) != CM_SUCCESS) {
cm_unlatch(&storage->latch, NULL);
return CM_ERROR;
}
storage->open_segment = NULL;
storage->last_index = first_index_kept - 1;
}
cm_unlatch(&storage->latch, NULL);
return CM_SUCCESS;
}
static status_t storage_trunc_prefix_impl(log_storage_t *storage, uint64 first_index_kept)
{
if (save_log_meta(storage, first_index_kept) != CM_SUCCESS) {
return CM_ERROR;
}
ptlist_t remove_list;
cm_ptlist_init(&remove_list);
if (pop_segment_from_first(storage, first_index_kept, &remove_list) != CM_SUCCESS) {
cm_destroy_ptlist(&remove_list);
return CM_ERROR;
}
for (uint32 i = 0; i < remove_list.count; ++i) {
segment_t *segment = (segment_t *)cm_ptlist_get(&remove_list, i);
(void)destroy_segment_instance(segment);
}
cm_destroy_ptlist(&remove_list);
return CM_SUCCESS;
}
status_t storage_trunc_prefix(log_storage_t *storage, uint64 first_index_kept)
{
cm_latch_x(&storage->trunc_latch, 0, NULL);
status_t status = storage_trunc_prefix_impl(storage, first_index_kept);
cm_unlatch(&storage->trunc_latch, NULL);
return status;
}
static status_t storage_unsafe_trunc_suffix(log_storage_t *storage, uint64 last_index_kept, uint64 last_term,
segment_t **last_segment)
{
storage->last_term = last_term;
storage->last_index = last_index_kept;
if (storage->open_segment != NULL) {
if (storage->open_segment->first_index <= last_index_kept) {
*last_segment = storage->open_segment;
return CM_SUCCESS;
}
CM_RETURN_IFERR(segment_unlink(storage->open_segment));
try_destroy_segment_object(storage->open_segment);
storage->open_segment = NULL;
}
while (!bucket_list_empty(&storage->segments)) {
segment_t *segment = bucket_list_last(&storage->segments);
if (segment->first_index <= last_index_kept) {
break;
}
CM_RETURN_IFERR(segment_unlink(segment));
bucket_list_del_last(&storage->segments);
try_destroy_segment_object(segment);
}
if (bucket_list_empty(&storage->segments)) {
storage->first_index = last_index_kept + 1;
return CM_SUCCESS;
}
*last_segment = bucket_list_last(&storage->segments);
return CM_SUCCESS;
}
static status_t storage_trunc_suffix_impl(log_storage_t *storage, uint64 last_index_kept, uint64 last_term)
{
segment_t *last_segment = NULL;
cm_latch_x(&storage->latch, 0, NULL);
if (storage_unsafe_trunc_suffix(storage, last_index_kept, last_term, &last_segment) != CM_SUCCESS) {
cm_unlatch(&storage->latch, NULL);
return CM_ERROR;
}
if (last_segment == NULL) {
cm_unlatch(&storage->latch, NULL);
return CM_SUCCESS;
}
bool32 is_closed = !last_segment->is_open;
if (segment_trunc_suffix(last_segment, last_index_kept) != CM_SUCCESS) {
cm_unlatch(&storage->latch, NULL);
return CM_ERROR;
}
if (is_closed == CM_FALSE || last_segment->is_open == CM_FALSE) {
cm_unlatch(&storage->latch, NULL);
return CM_SUCCESS;
}
bucket_list_del_last(&storage->segments);
storage->open_segment = last_segment;
cm_unlatch(&storage->latch, NULL);
return CM_SUCCESS;
}
status_t storage_trunc_suffix(log_storage_t *storage, uint64 last_index_kept, uint64 last_term)
{
cm_latch_x(&storage->trunc_latch, 0, NULL);
status_t status = storage_trunc_suffix_impl(storage, last_index_kept, last_term);
cm_unlatch(&storage->trunc_latch, NULL);
return status;
}