76fc9503创建于 2022年2月24日历史提交
/*
 * Copyright (c) 2021 Huawei Technologies Co.,Ltd.
 *
 * openGauss is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * batcher.c
 *    batch flush
 *
 * IDENTIFICATION
 *    src/storage/batcher.c
 *
 * -------------------------------------------------------------------------
 */

#include "batcher.h"
#include "util_profile_stat.h"

status_t batcher_append(log_storage_t *storage, batcher_t *batcher, const log_entry_t *entry)
{
    if (ENTRY_INDEX(entry) != batcher->last_index + 1) {
        LOG_DEBUG_WAR("[STG]Invalid log index %llu, segment's %llu", ENTRY_INDEX(entry), batcher->last_index + 1);
        return CM_SUCCESS;
    }

    log_index_t log_index;
    log_index.term   = ENTRY_TERM(entry);
    log_index.offset = batcher->offset + batcher->size;
    if (index_buf_add(&batcher->ibuf, &log_index) != CM_SUCCESS) {
        LOG_DEBUG_ERR("[STG]batcher_append add index cache failed");
        return CM_ERROR;
    }

    if (memcpy_sp(batcher->buf + batcher->size, batcher->capacity - batcher->size,
        entry->data, ENTRY_LENGTH(entry)) != EOK) {
        LOG_DEBUG_ERR("[STG]batcher_append add entry to batch buf failed");
        return CM_ERROR;
    }

    batcher->last_index++;
    batcher->last_term = ENTRY_TERM(entry);
    batcher->size += ENTRY_LENGTH(entry);
    return CM_SUCCESS;
}

status_t batcher_flush(log_storage_t *storage, batcher_t *batcher)
{
    segment_t *segment = batcher->segment;
    if (cm_pwrite_file_stat(segment->fd, batcher->buf, (int32)batcher->size, (int64)batcher->offset) != CM_SUCCESS) {
        LOG_DEBUG_ERR("[STG]batcher_flush write file failed");
        return CM_ERROR;
    }

    cm_latch_x(&segment->latch, 0, NULL);
    if (index_buf_add_batch(&segment->indexes, &batcher->ibuf) != CM_SUCCESS) {
        cm_unlatch(&segment->latch, NULL);
        LOG_DEBUG_ERR("[STG]batcher_flush add index failed");
        return CM_ERROR;
    }

    segment->size      += batcher->size;
    segment->last_index = batcher->last_index;
    cm_unlatch(&segment->latch, NULL);

    cm_latch_x(&storage->latch, 0, NULL);
    storage->last_term  = batcher->last_term;
    storage->last_index = batcher->last_index;
    cm_unlatch(&storage->latch, NULL);
    return CM_SUCCESS;
}