* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* repl_arch_fetch.c
*
*
* IDENTIFICATION
* src/kernel/replication/repl_arch_fetch.c
*
* -------------------------------------------------------------------------
*/
#include "knl_replication_module.h"
#include "repl_arch_fetch.h"
#include "cm_file.h"
#include "knl_context.h"
#define LFTC_SUPPORT_COMPRESS 1
static status_t lftc_write_stream(cs_pipe_t *pipe, rep_msg_type_t type, uint32 size, const char *data, int32 max_pkg_size)
{
rep_msg_header_t msg;
msg.size = size;
msg.type = type;
if (cs_write_stream(pipe, (char *)&msg, sizeof(rep_msg_header_t), max_pkg_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to send message to standby");
return OG_ERROR;
}
if (cs_write_stream(pipe, data, size, max_pkg_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to send message to standby");
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t lftc_send_req(cs_pipe_t *pipe, uint32 size, const char *data, int32 max_pkg_size)
{
rep_msg_header_t msg;
msg.size = sizeof(rep_msg_header_t) + LFTC_SUPPORT_COMPRESS;
msg.type = REP_ARCH_REQ;
if (cs_write_stream(pipe, (char *)&msg, sizeof(rep_msg_header_t), max_pkg_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to send message to primary");
return OG_ERROR;
}
if (cs_write_stream(pipe, data, size, max_pkg_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to send message to primary");
return OG_ERROR;
}
return OG_SUCCESS;
}
static inline status_t lftc_zstd_compress(lftc_srv_ctx_t *ogx, const char *buf, uint32 data_size)
{
ogx->cmp_ctx.data_size = (uint32)ZSTD_compress(ogx->cmp_ctx.compress_buf.aligned_buf,
ogx->cmp_ctx.buf_size, buf, data_size, 1);
if (ZSTD_isError(ogx->cmp_ctx.data_size)) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to compress(zstd) archive log message");
return OG_ERROR;
}
return OG_SUCCESS;
}
static inline status_t lftc_zstd_decompress(lftc_clt_task_t *task, const char *buf, uint32 size, uint32 *data_size)
{
*data_size = (uint32)ZSTD_decompress(task->msg_buf.aligned_buf, task->msg_buf_size, buf, size);
if (ZSTD_isError(*data_size)) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to decompress(zstd) archive log message");
return OG_ERROR;
}
return OG_SUCCESS;
}
static inline status_t lftc_lz4_compress(lftc_srv_ctx_t *ogx, const char *buf, uint32 data_size)
{
ogx->cmp_ctx.data_size = (uint32)LZ4_compress_default(buf, ogx->cmp_ctx.compress_buf.aligned_buf,
(int32)data_size, (int32)ogx->cmp_ctx.buf_size);
if (ogx->cmp_ctx.data_size == 0) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to compress(lz4) archive log message");
return OG_ERROR;
}
return OG_SUCCESS;
}
static inline status_t lftc_lz4_decompress(lftc_clt_task_t *task, const char *buf, uint32 size, uint32 *data_size)
{
int result = LZ4_decompress_safe(buf, task->msg_buf.aligned_buf, (int32)size, (int32)task->msg_buf_size);
if (result <= 0) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to decompress(lz4) archive log message");
return OG_ERROR;
}
*data_size = (uint32)result;
return OG_SUCCESS;
}
static status_t lftc_compress_send_data(lftc_srv_ctx_t *ogx, uint32 data_size, int32 max_pkg_size)
{
lftc_file_ctx_t *file_ctx = &ogx->file_ctx;
lftc_cmp_ctx_t *cmp_ctx = &ogx->cmp_ctx;
rep_msg_type_t data_type;
switch (ogx->compress_alg) {
case COMPRESS_ZSTD:
if (lftc_zstd_compress(ogx, file_ctx->msg_buf.aligned_buf, data_size) != OG_SUCCESS) {
return OG_ERROR;
}
data_type = REP_LFTC_ZSTD_DATA;
break;
case COMPRESS_LZ4:
if (lftc_lz4_compress(ogx, file_ctx->msg_buf.aligned_buf, data_size) != OG_SUCCESS) {
return OG_ERROR;
}
data_type = REP_LFTC_LZ4_DATA;
break;
default:
OG_LOG_RUN_ERR("[Log Fetcher] unknown compress algorithm.");
return OG_ERROR;
}
if (lftc_write_stream(file_ctx->pipe, data_type, cmp_ctx->data_size,
cmp_ctx->compress_buf.aligned_buf, max_pkg_size) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t lftc_send_data(lftc_srv_ctx_t *ogx, uint32 data_size, int32 max_pkg_size)
{
lftc_file_ctx_t *file_ctx = &ogx->file_ctx;
if (ogx->compress_alg == COMPRESS_NONE) {
if (lftc_write_stream(file_ctx->pipe, REP_ARCH_DATA, data_size,
file_ctx->msg_buf.aligned_buf, max_pkg_size) != OG_SUCCESS) {
return OG_ERROR;
}
} else {
if (lftc_compress_send_data(ogx, data_size, max_pkg_size) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static status_t lftc_decompress_receive_data(lftc_clt_task_t *task, rep_msg_header_t *msg, uint32 *data_size)
{
int32 comp_size;
if (cs_read_stream(&task->pipe, task->cmp_ctx.compress_buf.aligned_buf, OG_INVALID_ID32,
msg->size, &comp_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to receive message from primary");
return OG_ERROR;
}
if (comp_size != msg->size) {
OG_LOG_RUN_ERR("[Log Fetcher] invalid comp_size %u received, expected size is %u",
(uint32)comp_size, (uint32)msg->size);
return OG_ERROR;
}
switch (msg->type) {
case REP_LFTC_LZ4_DATA:
return lftc_lz4_decompress(task, task->cmp_ctx.compress_buf.aligned_buf, (uint32)comp_size, data_size);
case REP_LFTC_ZSTD_DATA:
return lftc_zstd_decompress(task, task->cmp_ctx.compress_buf.aligned_buf, (uint32)comp_size, data_size);
default:
OG_LOG_RUN_ERR("[Log Fetcher] unknown decompress type.");
return OG_ERROR;
}
}
static status_t lftc_clt_send_request(lftc_clt_task_t *task)
{
cs_pipe_t *pipe = &task->pipe;
lftc_clt_req_t request;
request.asn = task->asn;
request.rst_id = task->rst_id;
request.offset = task->offset;
OG_LOG_RUN_INF("[Log Fetcher] cli thread send archived info [%u/%u/%llu] ",
request.rst_id, request.asn, request.offset);
return lftc_send_req(pipe, sizeof(lftc_clt_req_t), (char *)&request,
(int32)cm_atomic_get(&task->session->kernel->attr.repl_pkg_size));
}
static status_t lftc_clt_connect_server(lftc_clt_task_t *task)
{
char url[OG_HOST_NAME_BUFFER_SIZE + OG_TCP_PORT_MAX_LENGTH + 1] = {0};
char server_host[OG_HOST_NAME_BUFFER_SIZE];
char bind_host[OG_HOST_NAME_BUFFER_SIZE];
uint16 port;
cs_pipe_t *pipe = &task->pipe;
errno_t err;
if (lrcv_get_primary_server(task->session, (int32)OG_INVALID_ID32, server_host, OG_HOST_NAME_BUFFER_SIZE,
&port) != OG_SUCCESS) {
return OG_ERROR;
}
err = snprintf_s(url, sizeof(url), sizeof(url) - 1, "%s:%u", server_host, (uint32)port);
knl_securec_check_ss(err);
pipe->connect_timeout = REPL_CONNECT_TIMEOUT;
pipe->socket_timeout = REPL_SOCKET_TIMEOUT;
arch_get_bind_host(task->session, server_host, bind_host, OG_HOST_NAME_BUFFER_SIZE);
if (cs_connect(url, pipe, bind_host, NULL, NULL) != OG_SUCCESS) {
cs_disconnect(pipe);
OG_LOG_RUN_ERR("[Log Fetcher] failed to connect %s", url);
return OG_ERROR;
}
if (knl_login(task->session, pipe, REP_LOGIN_LFTC, (const char *)bind_host, NULL) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to login primary lftc server thread");
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t lftc_check_connect(lftc_clt_task_t *task)
{
return lftc_clt_connect_server(task);
}
static void lftc_clt_clean_task(lftc_clt_task_t *task)
{
cm_release_thread(&task->thread);
cm_close_file(task->handle);
task->handle = INVALID_FILE_HANDLE;
knl_disconnect(&task->pipe);
cm_spin_lock(&task->lock, NULL);
task->is_running = OG_FALSE;
task->canceled = OG_FALSE;
cm_spin_unlock(&task->lock);
}
static status_t lftc_clt_open_tmp(lftc_clt_task_t *task, const char *tmp_file)
{
log_file_head_t log_head;
int32 read_size;
errno_t err;
if (!cm_file_exist(tmp_file)) {
if (cm_create_file(tmp_file, O_BINARY | O_SYNC | O_RDWR | O_EXCL, &task->handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to create temp archived log file %s", tmp_file);
return OG_ERROR;
}
task->offset = 0;
return OG_SUCCESS;
}
err = strcpy_sp(task->tmp_file_name, OG_FILE_NAME_BUFFER_SIZE + 4, tmp_file);
knl_securec_check(err);
if (cm_open_file(tmp_file, O_BINARY | O_SYNC | O_RDWR, &task->handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to open temp archived log file %s", tmp_file);
return OG_ERROR;
}
if (cm_read_file(task->handle, (void *)&log_head, sizeof(log_file_head_t), &read_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to read temp archived log file %s", tmp_file);
return OG_ERROR;
}
if ((uint32)read_size < sizeof(log_file_head_t)) {
task->offset = 0;
return OG_SUCCESS;
}
if (log_head.rst_id != task->rst_id || log_head.asn != task->asn || log_head.cmp_algorithm != 0) {
OG_LOG_RUN_ERR("[Log Fetcher] wrong tempory archive file %s exists.", tmp_file);
return OG_ERROR;
}
task->offset = log_head.write_pos;
return OG_SUCCESS;
}
static void lftc_wait_tasks_finished(lftc_clt_ctx_t *ogx)
{
uint32 task_cnt;
for (;;) {
task_cnt = 0;
for (uint32 i = 0; i < ogx->hwm; i++) {
if (!ogx->tasks[i].is_running) {
task_cnt++;
}
}
if (task_cnt == ogx->hwm) {
return;
}
cm_sleep(10);
}
}
void lftc_clt_close(knl_session_t *session)
{
knl_instance_t *kernel = session->kernel;
lftc_clt_ctx_t *ogx = &kernel->lftc_client_ctx;
for (uint32 i = 0; i < ogx->hwm; i++) {
ogx->tasks[i].canceled = OG_TRUE;
}
lftc_wait_tasks_finished(ogx);
for (uint32 i = 0; i < ogx->hwm; i++) {
* Log fetcher thread has detached itself before thread exit,
* so the stack resources it owns will be freed automatically.
*/
ogx->tasks[i].canceled = OG_FALSE;
cm_aligned_free(&ogx->tasks[i].msg_buf);
cm_aligned_free(&ogx->tasks[i].cmp_ctx.compress_buf);
}
ogx->hwm = 0;
OG_LOG_RUN_ERR("[Log Fetcher] log fetcher context close finish.");
}
static status_t lftc_srv_send_head(lftc_srv_ctx_t *lftc_server_ctx, const char *arch_file_name)
{
cs_pipe_t *pipe = lftc_server_ctx->pipe;
lftc_file_ctx_t *file_ctx = &lftc_server_ctx->file_ctx;
int32 read_size;
if (cm_seek_file(file_ctx->handle, 0, SEEK_SET) != 0) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to seek log file %s", arch_file_name);
return OG_ERROR;
}
if (cm_read_file(file_ctx->handle, (void *)&file_ctx->log_head, sizeof(log_file_head_t),
&read_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to read log file %s", arch_file_name);
return OG_ERROR;
}
if ((uint32)read_size < sizeof(log_file_head_t)) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to read archive log head %s", arch_file_name);
return OG_ERROR;
}
return lftc_write_stream(pipe, REP_ARCH_HEAD, sizeof(log_file_head_t), (char *)&file_ctx->log_head,
(int32)cm_atomic_get(&lftc_server_ctx->session->kernel->attr.repl_pkg_size));
}
* lftc server receive lftc_client request and send receive
* @param[in] lftc_server_ctx - lftc_server_context_t
* return
* - OG_SUCCESS
* - OG_ERROR
*/
static status_t lftc_srv_proc_request(knl_session_t *session, lftc_srv_ctx_t *lftc_ctx)
{
lftc_file_ctx_t *file_ctx = &lftc_ctx->file_ctx;
lftc_clt_req_t request;
char arch_file_name[OG_FILE_NAME_BUFFER_SIZE];
int32 recv_size;
if (cs_read_stream(lftc_ctx->pipe, (char *)&request, OG_INVALID_ID32,
sizeof(lftc_clt_req_t), &recv_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to receive message from standby");
return OG_ERROR;
}
knl_panic(sizeof(lftc_clt_req_t) == recv_size);
OG_LOG_RUN_INF("[Log Fetcher] server thread get archived info [%u/%u/%llu] ", request.rst_id, request.asn,
request.offset);
if (!arch_get_archived_log_name(lftc_ctx->session, request.rst_id, request.asn, ARCH_DEFAULT_DEST, arch_file_name,
OG_FILE_NAME_BUFFER_SIZE, session->kernel->id)) {
arch_set_archive_log_name(session, request.rst_id, request.asn, ARCH_DEFAULT_DEST, arch_file_name,
OG_FILE_NAME_BUFFER_SIZE, session->kernel->id);
if (!arch_log_not_archived(session, request.rst_id, request.asn) && !cm_file_exist(arch_file_name)) {
rep_msg_header_t msg;
msg.size = sizeof(rep_msg_header_t);
msg.type = REP_ARCH_LOST;
OG_LOG_RUN_INF("[Log Fetcher] archive log [%u-%u] lost on primary, send REP_ARCH_LOST message to standby",
request.rst_id, request.asn);
if (cs_write_stream(lftc_ctx->pipe, (char *)&msg, sizeof(rep_msg_header_t),
(int32)cm_atomic_get(&session->kernel->attr.repl_pkg_size)) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to send REP_ARCH_LOST message to standby");
}
return OG_ERROR;
}
OG_LOG_RUN_ERR("[Log Fetcher] failed to get file name for archived log file[%u-%u] temporarily",
request.rst_id, request.asn);
return OG_ERROR;
}
file_ctx->offset = MAX(request.offset, sizeof(log_file_head_t));
file_ctx->pipe = lftc_ctx->pipe;
if (cm_open_file(arch_file_name, O_BINARY | O_SYNC | O_RDWR, &file_ctx->handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to open arch file %s on primary.", arch_file_name);
return OG_ERROR;
}
if (lftc_srv_send_head(lftc_ctx, arch_file_name) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to send log file head to standby");
return OG_ERROR;
}
return OG_SUCCESS;
}
* lftc_server send logfile to lftc_client
* @param[in] lftc_server_ctx - lftc_server_context_t
* @result
* - OG_SUCCESS
* - OG_ERROR
*/
static status_t lftc_srv_send_file(lftc_srv_ctx_t *lftc_ctx)
{
lftc_file_ctx_t *file_ctx = &lftc_ctx->file_ctx;
int32 data_size;
if (cm_seek_file(file_ctx->handle, file_ctx->offset, SEEK_SET) != (int64)file_ctx->offset) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to seek file, offset:%llu, origin:%d", file_ctx->offset, SEEK_SET);
return OG_ERROR;
}
while (!lftc_ctx->thread.closed) {
if (cm_read_file(file_ctx->handle, file_ctx->msg_buf.aligned_buf, file_ctx->msg_buf_size,
&data_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to read log file");
return OG_ERROR;
}
if (data_size > 0) {
if (lftc_send_data(lftc_ctx, (uint32)data_size,
(int32)cm_atomic_get(&lftc_ctx->session->kernel->attr.repl_pkg_size)) != OG_SUCCESS) {
return OG_ERROR;
}
}
file_ctx->offset += (uint64)data_size;
if ((uint32)data_size < file_ctx->msg_buf_size) {
OG_LOG_RUN_INF("[Log Fetcher] send archive file finished, compress_alg[%u], "
"offset[%llu] rst_id [%u] asn [%u].",
lftc_ctx->compress_alg, file_ctx->offset, file_ctx->log_head.rst_id, file_ctx->log_head.asn);
break;
}
}
return lftc_write_stream(file_ctx->pipe, REP_ARCH_TAIL, sizeof(uint64), (char *)&file_ctx->offset,
(int32)cm_atomic_get(&lftc_ctx->session->kernel->attr.repl_pkg_size));
}
static status_t lftc_srv_alloc_buf(lftc_srv_ctx_t *lftc_srv_ctx)
{
int64 buf_size = LOG_LGWR_BUF_SIZE(lftc_srv_ctx->session);
uint32 zstd_buf_size = (uint32)ZSTD_compressBound((size_t)buf_size);
uint32 lz4_buf_size = (uint32)LZ4_compressBound((int32)buf_size);
if (cm_aligned_malloc(buf_size, "lftc_server", &lftc_srv_ctx->file_ctx.msg_buf) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to alloc buffer with size %lld", buf_size);
return OG_ERROR;
}
lftc_srv_ctx->file_ctx.msg_buf_size = (uint32)buf_size;
lftc_srv_ctx->cmp_ctx.buf_size = zstd_buf_size > lz4_buf_size ? zstd_buf_size : lz4_buf_size;
lftc_srv_ctx->cmp_ctx.data_size = 0;
if (cm_aligned_malloc((int64)lftc_srv_ctx->cmp_ctx.buf_size, "lftc_server compress buffer",
&lftc_srv_ctx->cmp_ctx.compress_buf) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to alloc compress buffer with size %u", lftc_srv_ctx->cmp_ctx.buf_size);
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t lftc_proc_compress(lftc_srv_ctx_t *lftc_srv_ctx, rep_msg_header_t *msg)
{
char ipstr[CM_MAX_IP_LEN];
char host[OG_HOST_NAME_BUFFER_SIZE] = { 0 };
errno_t errcode;
lsnd_context_t *lsnd_ctx = &lftc_srv_ctx->session->kernel->lsnd_ctx;
lsnd_t *lsnd = NULL;
uint32 type = msg->size - sizeof(rep_msg_header_t);
* if an old client send req to a new server, msg.size is sizeof(lftc_clt_req_t), so we return immediately here.
* if a new client send req to a new server, msg.size is sizeof(rep_msg_header_t) + LFTC_SUPPORT_COMPRESS, so the
* server can judge that the client can support compression, and then find the specific compress algorithm
*/
lftc_srv_ctx->compress_alg = COMPRESS_NONE;
if (type != LFTC_SUPPORT_COMPRESS) {
return OG_SUCCESS;
}
* we find the compress algorithm from lsnd:
* 1. get the ip of standby
* 2. find standby's lsnd according to ip
* 3. get the compress algorithm of the lsnd
* 4. if more then one standbys are configured with one ip and the algorithms of them are different, our strategy
* is zstd > lz4 > none
*/
(void)cm_inet_ntop((struct sockaddr *)&lftc_srv_ctx->pipe->link.tcp.remote.addr, ipstr, CM_MAX_IP_LEN);
errcode = strncpy_s(host, OG_HOST_NAME_BUFFER_SIZE, ipstr, OG_HOST_NAME_BUFFER_SIZE - 1);
knl_securec_check(errcode);
cm_latch_s(&lsnd_ctx->latch, lftc_srv_ctx->session->id, OG_FALSE, NULL);
for (uint16 i = 0; i < lsnd_ctx->standby_num; i++) {
lsnd = lsnd_ctx->lsnd[i];
if (lsnd == NULL || lsnd->is_disable) {
continue;
}
if (cm_strcmpi(host, lsnd->dest_info.peer_host) == 0) {
if (lsnd->dest_info.compress_alg == COMPRESS_ZSTD) {
lftc_srv_ctx->compress_alg = COMPRESS_ZSTD;
} else if (lsnd->dest_info.compress_alg == COMPRESS_LZ4 && lftc_srv_ctx->compress_alg == COMPRESS_NONE) {
lftc_srv_ctx->compress_alg = COMPRESS_LZ4;
}
}
}
cm_unlatch(&lsnd_ctx->latch, NULL);
return OG_SUCCESS;
}
status_t lftc_srv_proc(knl_session_t *session, lftc_srv_ctx_t *lftc_srv_ctx)
{
rep_msg_header_t msg;
cs_pipe_t *pipe = lftc_srv_ctx->pipe;
int32 recv_size;
if (!DB_IS_OPEN(session)) {
OG_LOG_RUN_ERR("[Log Fetcher] archive log fetch is not allowed when db is not open");
return OG_ERROR;
}
if (lftc_srv_alloc_buf(lftc_srv_ctx) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] Failed to alloc buffer");
return OG_ERROR;
}
cm_set_thread_name("log_fetcher");
OG_LOG_RUN_INF("log fetcher thread started");
while (!lftc_srv_ctx->thread.closed) {
if (lftc_srv_ctx->session->killed) {
OG_LOG_RUN_ERR("[Log Fetcher] session killed");
return OG_ERROR;
}
if (cs_read_stream(pipe, (char *)&msg, OG_INVALID_ID32, sizeof(rep_msg_header_t), &recv_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to receive message from standby");
return OG_ERROR;
}
if (recv_size != sizeof(rep_msg_header_t)) {
OG_LOG_RUN_ERR("[Log Fetcher] invalid recv_size %u received, expected size is %u",
(uint32)recv_size, (uint32)sizeof(rep_msg_header_t));
return OG_ERROR;
}
if (msg.type == REP_ARCH_REQ) {
if (lftc_proc_compress(lftc_srv_ctx, &msg) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] Failed to process LFTC compress");
return OG_ERROR;
}
if (lftc_srv_proc_request(session, lftc_srv_ctx) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] Failed to process LFTC request");
return OG_ERROR;
}
if (lftc_srv_send_file(lftc_srv_ctx) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] Failed to send file data");
return OG_ERROR;
}
}
}
return OG_SUCCESS;
}
static status_t lftc_flush_file_writepos(lftc_clt_task_t *task, bool32 end)
{
uint64 size = end ? task->origin_size : task->offset;
if (cm_seek_file(task->handle, (int64)OFFSET_OF(log_file_head_t, write_pos), SEEK_SET) !=
OFFSET_OF(log_file_head_t, write_pos)) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to seek file, offset:%lu, origin:%d",
OFFSET_OF(log_file_head_t, write_pos), SEEK_SET);
return OG_ERROR;
}
if (cm_write_file(task->handle, (void *)&size, sizeof(uint64)) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to flush file write pos");
return OG_ERROR;
}
task->log_head.write_pos = size;
return OG_SUCCESS;
}
static status_t lftc_adjust_buf_size(lftc_clt_task_t *task, rep_msg_header_t *msg)
{
if (msg->type == REP_ARCH_DATA) {
if (msg->size <= task->msg_buf_size) {
return OG_SUCCESS;
}
int64 new_buf_size = OG_MAX_BATCH_SIZE + SIZE_K(4);
if (cm_aligned_realloc(new_buf_size, "lftc client buffer", &task->msg_buf) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to alloc lftc client buffer with size %lld", new_buf_size);
return OG_ERROR;
}
task->msg_buf_size = (uint32)new_buf_size;
} else {
if (msg->size <= task->cmp_ctx.buf_size) {
return OG_SUCCESS;
}
uint32 zstd_new_buf_size = (uint32)ZSTD_compressBound((size_t)OG_MAX_BATCH_SIZE);
uint32 lz4_new_buf_size = (uint32)LZ4_compressBound((int32)OG_MAX_BATCH_SIZE);
uint32 new_compress_buf_size = zstd_new_buf_size > lz4_new_buf_size ? zstd_new_buf_size : lz4_new_buf_size;
if (cm_aligned_realloc((int64)new_compress_buf_size, "lftc client compress buffer",
&task->cmp_ctx.compress_buf) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to alloc lftc client compress buffer with size %u",
new_compress_buf_size);
return OG_ERROR;
}
task->cmp_ctx.buf_size = new_compress_buf_size;
}
return OG_SUCCESS;
}
static status_t lftc_flush_arch_data(lftc_clt_task_t *task, rep_msg_header_t *msg)
{
int32 data_size;
if (lftc_adjust_buf_size(task, msg) != OG_SUCCESS) {
return OG_ERROR;
}
if (msg->type == REP_ARCH_DATA) {
if (cs_read_stream(&task->pipe, task->msg_buf.aligned_buf, OG_INVALID_ID32,
msg->size, &data_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to receive message from primary");
return OG_ERROR;
}
if (data_size != msg->size) {
OG_LOG_RUN_ERR("[Log Fetcher] invalid data_size %u received, expected size is %u",
(uint32)data_size, msg->size);
return OG_ERROR;
}
} else {
if (lftc_decompress_receive_data(task, msg, (uint32 *)&data_size) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (cm_seek_file(task->handle, task->offset, SEEK_SET) != (int64)task->offset) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to seek file, offset:%llu, origin:%d", task->offset, SEEK_SET);
return OG_ERROR;
}
if (cm_write_file(task->handle, task->msg_buf.aligned_buf, data_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to write file");
return OG_ERROR;
}
task->offset += data_size;
if (lftc_flush_file_writepos(task, OG_FALSE) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t lftc_recv_file_head(lftc_clt_task_t *task)
{
cs_pipe_t *pipe = &task->pipe;
rep_msg_header_t msg;
int32 recv_size;
if (cs_read_stream(pipe, (char *)&msg, OG_INVALID_ID32, sizeof(rep_msg_header_t), &recv_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to receive message from primary");
return OG_ERROR;
}
if (recv_size != sizeof(rep_msg_header_t)) {
OG_LOG_RUN_ERR("[Log Fetcher] invalid recv_size %u received, expected size is %u",
(uint32)recv_size, (uint32)sizeof(rep_msg_header_t));
return OG_ERROR;
}
if (msg.type == REP_ARCH_LOST) {
task->session->kernel->lftc_client_ctx.arch_lost = OG_TRUE;
OG_LOG_RUN_ERR("[Log Fetcher] archive log [%u-%u] does not exist on primary, need repair",
task->rst_id, task->asn);
return OG_ERROR;
}
if (cs_read_stream(pipe, (char *)&task->log_head, OG_INVALID_ID32,
sizeof(log_file_head_t), &recv_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to receive message from primary");
return OG_ERROR;
}
if (recv_size != sizeof(log_file_head_t)) {
OG_LOG_RUN_ERR("[Log Fetcher] invalid recv_size %u received, expected size is %u",
(uint32)recv_size, (uint32)sizeof(log_file_head_t));
return OG_ERROR;
}
task->origin_size = task->log_head.write_pos;
if (task->offset > sizeof(log_file_head_t)) {
return OG_SUCCESS;
}
task->log_head.write_pos = sizeof(log_file_head_t);
if (cm_seek_file(task->handle, 0, SEEK_SET) != 0) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to seek file, offset:%u, origin:%d", 0, SEEK_SET);
return OG_ERROR;
}
if (cm_write_file(task->handle, (void *)&task->log_head, sizeof(log_file_head_t)) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to write file");
return OG_ERROR;
}
if (task->offset < sizeof(log_file_head_t)) {
task->offset = sizeof(log_file_head_t);
}
return OG_SUCCESS;
}
static status_t lftc_recv_file_data(lftc_clt_task_t *task)
{
cs_pipe_t *pipe = &task->pipe;
rep_msg_header_t msg;
int32 recv_size;
uint64 offset = 0;
while (!task->canceled) {
if (cs_read_stream(pipe, (char *)&msg, OG_INVALID_ID32, sizeof(rep_msg_header_t), &recv_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to receive message from primary");
return OG_ERROR;
}
if (recv_size != sizeof(rep_msg_header_t)) {
OG_LOG_RUN_ERR("[Log Fetcher] invalid recv_size %u received, expected size is %u",
(uint32)recv_size, (uint32)sizeof(rep_msg_header_t));
return OG_ERROR;
}
if (msg.type != REP_ARCH_DATA && msg.type != REP_ARCH_TAIL && msg.type != REP_LFTC_ZSTD_DATA &&
msg.type != REP_LFTC_LZ4_DATA) {
OG_LOG_RUN_ERR("[Log Fetcher] received unexpected type message from primary");
return OG_ERROR;
}
if (msg.type == REP_ARCH_TAIL) {
OG_LOG_RUN_INF("[Log Fetcher] Tail received");
break;
}
if (lftc_flush_arch_data(task, &msg) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] Failed to flush arch data");
return OG_ERROR;
}
}
if (task->canceled) {
OG_LOG_RUN_ERR("[Log Fetcher] fetch task has been canceled");
return OG_ERROR;
}
if (cs_read_stream(pipe, (char *)&offset, OG_INVALID_ID32, sizeof(uint64), &recv_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to receive message from primary");
return OG_ERROR;
}
if (recv_size != sizeof(uint64) || offset != task->offset) {
OG_LOG_RUN_ERR("[Log Fetcher] invalid recv_size %u received, expected size is %u, "
"write position %llu does not match bytes recieved %llu.",
(uint32)recv_size, (uint32)sizeof(uint64), task->offset, offset);
return OG_ERROR;
}
if (lftc_flush_file_writepos(task, OG_TRUE) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] Failed to flush write pos");
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t lftc_recv_file(lftc_clt_task_t *task)
{
if (lftc_recv_file_head(task) != OG_SUCCESS) {
return OG_ERROR;
}
OG_LOG_RUN_INF("[Log Fetcher] Head received");
if (lftc_recv_file_data(task) != OG_SUCCESS) {
return OG_ERROR;
}
OG_LOG_RUN_INF("[Log Fetcher] Data received");
return OG_SUCCESS;
}
status_t lftc_srv_ctx_alloc(lftc_srv_ctx_t **lftc_srv_ctx)
{
lftc_srv_ctx_t *ogx = NULL;
ogx = (lftc_srv_ctx_t *)malloc(sizeof(lftc_srv_ctx_t));
if (ogx == NULL) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to alloc server context");
return OG_ERROR;
}
*lftc_srv_ctx = ogx;
return OG_SUCCESS;
}
static status_t lftc_fetch_archive_logfile(lftc_clt_task_t *task, const char *tmp_file)
{
if (lftc_clt_open_tmp(task, tmp_file) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to open temp log file");
return OG_ERROR;
}
if (lftc_check_connect(task) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to check connect");
return OG_ERROR;
}
if (lftc_clt_send_request(task) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to send requset");
return OG_ERROR;
}
if (lftc_recv_file(task) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to receive log file");
return OG_ERROR;
}
if (cm_rename_file_durably(tmp_file, task->file_name) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to rename temp log file to %s", task->file_name);
return OG_ERROR;
}
if (arch_try_record_archinfo(task->session, ARCH_DEFAULT_DEST, task->file_name, &task->log_head) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to record archive log file %s with [%u-%u]",
task->file_name, task->log_head.rst_id, task->log_head.asn);
return OG_ERROR;
}
return OG_SUCCESS;
}
static void lftc_clt_proc(thread_t *thread)
{
lftc_clt_task_t *task = (lftc_clt_task_t *)thread->argument;
char tmp_file[OG_FILE_NAME_BUFFER_SIZE + 4];
errno_t err;
err = memset_sp(tmp_file, OG_FILE_NAME_BUFFER_SIZE + 4, 0, OG_FILE_NAME_BUFFER_SIZE + 4);
knl_securec_check(err);
err = snprintf_s(tmp_file, sizeof(tmp_file), sizeof(tmp_file) - 1, "%s.tmp", task->file_name);
knl_securec_check_ss(err);
if (lftc_fetch_archive_logfile(task, tmp_file) == OG_SUCCESS) {
OG_LOG_RUN_INF("[Log Fetcher] recvfile process fetched archive log file_name/offset [%s/%llu]",
task->file_name, task->offset);
}
lftc_clt_clean_task(task);
}
static status_t lftc_clt_task_start(knl_session_t *session, lftc_clt_ctx_t *ogx, uint32 task_id)
{
lftc_clt_task_t *task = ogx->tasks + (task_id % LFTC_MAX_TASK);
task->handle = INVALID_FILE_HANDLE;
if (cm_create_thread(lftc_clt_proc, 0, task, &task->thread) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to create log fetcher client thread");
return OG_ERROR;
}
OG_LOG_RUN_INF("[Log Fetcher] Start task for asn %u.", task->asn);
return OG_SUCCESS;
}
static inline void lftc_clt_task_reset(knl_session_t *session, lftc_clt_task_t *task, uint32 rst_id,
uint32 asn, const char *arch_name)
{
task->id += LFTC_MAX_TASK;
task->is_running = OG_TRUE;
task->canceled = OG_FALSE;
task->rst_id = rst_id;
task->asn = asn;
task->session = session;
errno_t err = strcpy_sp(task->file_name, OG_FILE_NAME_BUFFER_SIZE, arch_name);
knl_securec_check(err);
task->tmp_file_name[0] = 0;
task->origin_size = 0;
}
static status_t lftc_clt_task_init(knl_session_t *session, lftc_clt_ctx_t *ogx, uint32 task_id)
{
lftc_clt_task_t *task = ogx->tasks + (task_id % LFTC_MAX_TASK);
char task_name[16];
errno_t ret;
int64 buf_size = LOG_LGWR_BUF_SIZE(session);
uint32 zstd_buf_size = (uint32)ZSTD_compressBound((size_t)buf_size);
uint32 lz4_buf_size = (uint32)LZ4_compressBound((int32)buf_size);
ret = snprintf_s(task_name, sizeof(task_name), sizeof(task_name) - 1, "LFTC CLIENT %u", task_id);
knl_securec_check_ss(ret);
if (cm_aligned_malloc(buf_size, task_name, &task->msg_buf) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to alloc buffer with size %lld", buf_size);
return OG_ERROR;
}
task->msg_buf_size = (uint32)buf_size;
task->cmp_ctx.buf_size = zstd_buf_size > lz4_buf_size ? zstd_buf_size : lz4_buf_size;
task->cmp_ctx.data_size = 0;
if (cm_aligned_malloc((int64)task->cmp_ctx.buf_size, task_name, &task->cmp_ctx.compress_buf) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to alloc compress buffer with size %u", task->cmp_ctx.buf_size);
cm_aligned_free(&task->msg_buf);
return OG_ERROR;
}
return OG_SUCCESS;
}
static void lftc_clt_reuse_task(knl_session_t *session, uint32 rst_id, uint32 asn, const char *arch_name,
uint32 *task_id, bool32 *task_exist)
{
lftc_clt_ctx_t *ogx = &session->kernel->lftc_client_ctx;
lftc_clt_task_t *task = NULL;
for (uint32 i = 0; i < ogx->hwm; i++) {
task = ogx->tasks + i;
cm_spin_lock(&task->lock, NULL);
if (!task->is_running) {
lftc_clt_task_reset(session, task, rst_id, asn, arch_name);
*task_id = task->id;
cm_spin_unlock(&task->lock);
return;
}
if (task->rst_id == rst_id && task->asn == asn) {
*task_id = task->id;
cm_spin_unlock(&task->lock);
*task_exist = OG_TRUE;
return;
}
cm_spin_unlock(&task->lock);
}
}
status_t lftc_clt_create_task(knl_session_t *session, uint32 rst_id, uint32 asn,
const char *arch_name, lftc_task_handle_t *handle)
{
lftc_clt_ctx_t *ogx = &session->kernel->lftc_client_ctx;
lftc_clt_task_t *task = NULL;
uint32 task_id = OG_INVALID_ID32;
bool32 task_exist = OG_FALSE;
knl_session_t *lftc_session = session->kernel->sessions[SESSION_ID_LFTC_CLIENT];
cm_spin_lock(&ogx->lock, NULL);
lftc_clt_reuse_task(lftc_session, rst_id, asn, arch_name, &task_id, &task_exist);
if (task_id == OG_INVALID_ID32) {
if (ogx->hwm == LFTC_MAX_TASK) {
OG_LOG_RUN_WAR("[Log Fetcher] failed to create task, %u tasks are running.", LFTC_MAX_TASK);
cm_spin_unlock(&ogx->lock);
return OG_SUCCESS;
}
task = ogx->tasks + ogx->hwm;
errno_t err = memset_sp((void *)task, sizeof(lftc_clt_task_t), 0, sizeof(lftc_clt_task_t));
knl_securec_check(err);
task->is_running = OG_TRUE;
task->canceled = OG_FALSE;
task->rst_id = rst_id;
task->asn = asn;
task->id = ogx->hwm++;
task->session = lftc_session;
err = strcpy_sp(task->file_name, OG_FILE_NAME_BUFFER_SIZE, arch_name);
knl_securec_check(err);
task->tmp_file_name[0] = 0;
task->origin_size = 0;
task_id = task->id;
if (lftc_clt_task_init(lftc_session, ogx, task_id) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to init task");
lftc_clt_clean_task(task);
cm_spin_unlock(&ogx->lock);
return OG_ERROR;
}
}
handle->task_id = task_id;
handle->rst_id = rst_id;
handle->asn = asn;
task = ogx->tasks + (task_id % LFTC_MAX_TASK);
if (!task_exist) {
if (lftc_clt_task_start(session, ogx, task_id) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Fetcher] failed to start task");
lftc_clt_clean_task(task);
cm_spin_unlock(&ogx->lock);
return OG_ERROR;
}
}
cm_spin_unlock(&ogx->lock);
return OG_SUCCESS;
}
bool32 lftc_clt_task_running(knl_session_t *session, lftc_task_handle_t *handle, bool32 *is_done)
{
lftc_clt_ctx_t *ogx = &session->kernel->lftc_client_ctx;
lftc_clt_task_t *task;
char *arch_name = NULL;
bool32 is_running = OG_FALSE;
task = &ogx->tasks[handle->task_id % LFTC_MAX_TASK];
cm_spin_lock(&task->lock, NULL);
if (task->id != handle->task_id) {
is_running = OG_FALSE;
} else {
is_running = task->is_running;
}
cm_spin_unlock(&task->lock);
if (!is_running) {
arch_name = (char *)cm_push(session->stack, OG_FILE_NAME_BUFFER_SIZE);
arch_set_archive_log_name(session, handle->rst_id, handle->asn, ARCH_DEFAULT_DEST, arch_name,
OG_FILE_NAME_BUFFER_SIZE, session->kernel->id);
*is_done = cm_file_exist((const char *)arch_name);
cm_pop(session->stack);
}
return is_running;
}