* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* ss_aio.cpp
* aio implementation
*
* IDENTIFICATION
* src/gausskernel/ddes/adapter/ss_aio.cpp
*
* ---------------------------------------------------------------------------------------
*/
#include "ddes/dms/ss_aio.h"
#include "utils/elog.h"
#ifndef ENABLE_LITE_MODE
static void WaitDSSAioComplete(DSSAioCxt *aio_cxt, int index)
{
AioUtil *aio = &aio_cxt->aio[index];
errno_t ret;
struct timespec timeout = { 0, 200 };
int event_num = aio->iocount;
while (event_num > 0) {
ret = memset_s(aio->events, sizeof(struct io_event) * event_num,
0, sizeof(struct io_event) * event_num);
securec_check_c(ret, "\0", "\0");
int num = io_getevents(aio->handle, 1, event_num, aio->events, &timeout);
if (num < 0) {
if (errno == EINTR || num == -EINTR) {
continue;
}
ereport(WARNING, (errmsg("failed to getevent by aio, errno = %d, retCode = %d", errno, num)));
_exit(0);
}
aio_cxt->aiocb(aio->events, num);
event_num -= num;
}
aio->iocount = 0;
ret = memset_s(aio->iocbs_ptr, sizeof(struct iocb *) * DSS_AIO_BATCH_SIZE,
0, sizeof(struct iocb *) * DSS_AIO_BATCH_SIZE);
securec_check_c(ret, "\0", "\0");
ret = memset_s(aio->events, sizeof(struct io_event) * DSS_AIO_BATCH_SIZE,
0, sizeof(struct io_event) * DSS_AIO_BATCH_SIZE);
securec_check_c(ret, "\0", "\0");
}
#endif
void DSSAioFlush(DSSAioCxt *aio_cxt)
{
#ifndef ENABLE_LITE_MODE
bool need_wait = false;
AioUtil *aio = &aio_cxt->aio[aio_cxt->index];
if (aio->iocount > 0) {
if (io_submit(aio->handle, aio->iocount, aio->iocbs_ptr) != aio->iocount) {
ereport(WARNING, (errmsg("io_submit failed, errno = %d", errno)));
_exit(0);
}
need_wait = true;
}
aio = &aio_cxt->aio[1 - aio_cxt->index];
if (aio->iocount > 0) {
WaitDSSAioComplete(aio_cxt, 1 - aio_cxt->index);
}
if (need_wait) {
WaitDSSAioComplete(aio_cxt, aio_cxt->index);
}
#endif
}
void DSSAioAppendIOCB(DSSAioCxt *aio_cxt, struct iocb *iocb_ptr)
{
#ifndef ENABLE_LITE_MODE
AioUtil *aio = &aio_cxt->aio[aio_cxt->index];
aio->iocbs_ptr[aio->iocount] = iocb_ptr;
aio->iocount++;
if (aio->iocount >= DSS_AIO_BATCH_SIZE) {
if (io_submit(aio->handle, aio->iocount, aio->iocbs_ptr) != aio->iocount) {
ereport(PANIC, (errmsg("io_submit failed, errno = %d", errno)));
}
aio_cxt->index = 1 - aio_cxt->index;
aio = &aio_cxt->aio[aio_cxt->index];
if (aio->iocount > 0) {
WaitDSSAioComplete(aio_cxt, aio_cxt->index);
}
}
#endif
}
struct iocb* DSSAioGetIOCB(DSSAioCxt *aio_cxt)
{
AioUtil *aio = &aio_cxt->aio[aio_cxt->index];
return &aio->iocbs[aio->iocount];
}
int DSSAioGetIOCBIndex(DSSAioCxt *aio_cxt)
{
AioUtil *aio = &aio_cxt->aio[aio_cxt->index];
return (aio_cxt->index * DSS_AIO_BATCH_SIZE + aio->iocount);
}
void DSSAioInitialize(DSSAioCxt *aio_cxt, aio_callback callback)
{
#ifndef ENABLE_LITE_MODE
errno_t err = memset_s(aio_cxt, sizeof(DSSAioCxt), 0, sizeof(DSSAioCxt));
securec_check_ss(err, "\0", "\0");
if (io_setup(DSS_AIO_BATCH_SIZE, &aio_cxt->aio[0].handle) < 0) {
ereport(WARNING, (errmsg("io_setup failed for DSS AIO, errno=%d", errno)));
_exit(0);
}
if (io_setup(DSS_AIO_BATCH_SIZE, &aio_cxt->aio[1].handle) < 0) {
ereport(WARNING, (errmsg("io_setup failed for DSS AIO, errno=%d", errno)));
_exit(0);
}
aio_cxt->initialized = true;
aio_cxt->aiocb = callback;
aio_cxt->index = 0;
#endif
}
void DSSAioDestroy(DSSAioCxt *aio_cxt)
{
#ifndef ENABLE_LITE_MODE
if (aio_cxt->initialized) {
(void)io_destroy(aio_cxt->aio[0].handle);
(void)io_destroy(aio_cxt->aio[1].handle);
aio_cxt->initialized = false;
errno_t err = memset_s(aio_cxt, sizeof(DSSAioCxt), 0, sizeof(DSSAioCxt));
securec_check_ss(err, "\0", "\0");
}
#endif
}