Bbowenliu1030 CM patch
7ab5b2eb创建于 2022年11月7日历史提交
/*
 * Copyright (c) 2022 Huawei Technologies Co.,Ltd.
 *
 * CM is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * cm_msg_buf_pool.cpp
 *
 *
 * IDENTIFICATION
 *    src/cm_common/cm_msg_buf_pool.cpp
 *
 * -------------------------------------------------------------------------
 */

#include <pthread.h>
#include "cm/cm_elog.h"
#include "cm_c.h"
#include "cm_msg_buf_pool.h"

typedef struct CmMsgBufSt {
    uint32 size;
    CmMsgBufSt *next;
    char data[0];
} CmMsgBuf;

typedef struct CmMsgBufTypeSt {
    uint32 bufSize;
    uint32 freeCount;
    uint32 allocCount;
    CmMsgBuf *freeList;
    CmMsgBuf *allocList;
    CmMsgBufTypeSt *next;
} CmMsgBufType;

typedef struct CmMsgPoolSt {
    uint32 curBufCount;
    uint32 maxBufCount;
    uint32 curPoolSize;
    uint32 maxPoolSize;
    CmMsgBufType *list;
    CmMsgBuf *tmpList;
    pthread_rwlock_t rwlock;
} CmMsgPool;

#ifndef ENABLE_UT
#define static
#endif

static CmMsgPool g_msgPool;

static void PrintMsgPoolInfoCore(int logLevel);

void MsgPoolInit(uint32 maxPoolSize, uint32 maxBufCount)
{
    static volatile bool isInit = false;
    if (isInit) {
        write_runlog(LOG, "[MsgPool] msg pool has init, can't do init again.\n");
        return;
    }
    g_msgPool.list = NULL;
    g_msgPool.tmpList = NULL;
    g_msgPool.curPoolSize = 0;
    g_msgPool.curBufCount = 0;
    g_msgPool.maxBufCount = maxBufCount;
    g_msgPool.maxPoolSize = maxPoolSize;
    (void)pthread_rwlock_init(&g_msgPool.rwlock, NULL);
    isInit = true;
    write_runlog(LOG, "[MsgPool] init msg pool success, maxPoolSize=%u, maxBufCount=%u.\n", maxPoolSize, maxBufCount);
}

static inline void CleanFreeBuf(CmMsgBuf *freePtr, uint32 bufSize)
{
    while (freePtr != NULL) {
        char *tmpPtr = (char*)freePtr;
        freePtr = freePtr->next;
        free(tmpPtr);
        --g_msgPool.curBufCount;
        g_msgPool.curPoolSize -= bufSize;
    }
}

static void CleanBufType(CmMsgBufType *&typePrePtr, CmMsgBufType *&typePtr)
{
    CmMsgBufType *tmpPtr = typePtr;
    if (typePtr == typePrePtr) {
        g_msgPool.list = typePtr->next;
        typePrePtr = typePtr->next;
    } else {
        typePrePtr->next = typePtr->next;
    }
    typePtr = typePtr->next;
    free(tmpPtr);
}

static void DestroyFreeBufCore(uint32 leaveBufCount)
{
    CmMsgBufType *typePtr = g_msgPool.list;
    CmMsgBufType *typePrePtr = typePtr;
    while (typePtr != NULL) {
        if (typePtr->freeCount <= leaveBufCount) {
            typePrePtr = typePtr;
            typePtr = typePtr->next;
            continue;
        }
        typePtr->freeCount = leaveBufCount;
        uint32 count = leaveBufCount;
        CmMsgBuf *freePtr = typePtr->freeList;
        CmMsgBuf *prePtr = freePtr;
        while (freePtr != NULL) {
            if (count > 0) {
                --count;
                prePtr = freePtr;
                freePtr = freePtr->next;
                continue;
            }
            if (prePtr == freePtr) {
                typePtr->freeList = NULL;
            } else {
                prePtr->next = NULL;
            }
            CleanFreeBuf(freePtr, typePtr->bufSize);
            break;
        }
        if (typePtr->freeList == NULL && typePtr->allocList == NULL) {
            CleanBufType(typePrePtr, typePtr);
            continue;
        }
        typePrePtr = typePtr;
        typePtr = typePtr->next;
    }
}

void MsgPoolClean(uint32 leaveBufCount)
{
    (void)pthread_rwlock_wrlock(&g_msgPool.rwlock);
    DestroyFreeBufCore(leaveBufCount);
    (void)pthread_rwlock_unlock(&g_msgPool.rwlock);
}

static bool IsMsgPoolExtraSpace(uint32 bufSize)
{
    return ((g_msgPool.maxBufCount != 0) && ((g_msgPool.curBufCount + 1) > g_msgPool.maxBufCount)) ||
           ((g_msgPool.curPoolSize + bufSize) > g_msgPool.maxPoolSize);
}

static bool CanCreateNewBuf(uint32 bufSize)
{
    if (IsMsgPoolExtraSpace(bufSize)) {
        write_runlog(LOG, "[MsgPool] pool is full, try to destroy free buf.\n");
        DestroyFreeBufCore(0);
    } else {
        return true;
    }

    if (IsMsgPoolExtraSpace(bufSize)) {
        write_runlog(ERROR, "[MsgPool] newBufSize=(%u), msg pool have no space.\n", bufSize);
        return false;
    }

    return true;
}

static CmMsgBuf *CreateNewBuf(uint32 bufSize)
{
    void *tmpPtr = malloc(sizeof(CmMsgBuf) + sizeof(char) * bufSize);
    if (tmpPtr == NULL) {
        write_runlog(ERROR, "[MsgPool] out of memory, CreateNewBuf.\n");
        return NULL;
    }

    CmMsgBuf *newBuf = (CmMsgBuf*)tmpPtr;
    newBuf->size = bufSize;
    newBuf->next = NULL;

    return newBuf;
}

static char *CreateNewTmpBuf(uint32 bufSize)
{
    CmMsgBuf *newTmpBuf = CreateNewBuf(bufSize);
    if (newTmpBuf == NULL) {
        write_runlog(ERROR, "[MsgPool] create tmp buf failed.\n");
        return NULL;
    }

    if (g_msgPool.tmpList != NULL) {
        newTmpBuf->next = g_msgPool.tmpList;
    }
    g_msgPool.tmpList = newTmpBuf;

    write_runlog(DEBUG1, "[MsgPool] create tmp buf (%u) success.\n", bufSize);

    return newTmpBuf->data;
}

static CmMsgBufType *CreateNewBufType(uint32 bufSize)
{
    CmMsgBufType *typePtr = (CmMsgBufType*)malloc(sizeof(CmMsgBufType));
    if (typePtr == NULL) {
        write_runlog(ERROR, "[MsgPool] out of memory, CreateNewBufType.\n");
        return NULL;
    }
    typePtr->bufSize = bufSize;
    typePtr->freeCount = 0;
    typePtr->allocCount = 0;
    typePtr->freeList = NULL;
    typePtr->allocList = NULL;
    typePtr->next = NULL;

    return typePtr;
}

static char *GetBufFromTypeList(CmMsgBufType *typePtr, uint32 bufSize)
{
    CmMsgBuf *freePtr = typePtr->freeList;
    if (freePtr != NULL) {
        typePtr->freeList = freePtr->next;
        freePtr->next = typePtr->allocList;
        typePtr->allocList = freePtr;
        --typePtr->freeCount;
        ++typePtr->allocCount;
        return freePtr->data;
    }
    write_runlog(DEBUG1, "[MsgPool] have no free buf(size=%u), need create new buf.\n", bufSize);

    if (!CanCreateNewBuf(bufSize)) {
        write_runlog(ERROR, "[MsgPool] can't find free buf, and can't create new buf.\n");
        return NULL;
    }

    CmMsgBuf *newBuf = CreateNewBuf(bufSize);
    if (newBuf == NULL) {
        write_runlog(ERROR, "[MsgPool] GetBufFromTypeList, can't create new buf.\n");
        return NULL;
    }
    newBuf->next = typePtr->allocList;
    typePtr->allocList = newBuf;
    ++typePtr->allocCount;

    ++g_msgPool.curBufCount;
    g_msgPool.curPoolSize += bufSize;

    return newBuf->data;
}

static CmMsgBufType *GetBufTypeFromPool(uint32 bufSize)
{
    CmMsgBufType *typePtr = g_msgPool.list;
    CmMsgBufType *perPtr = typePtr;
    while (typePtr != NULL) {
        if (typePtr->bufSize == bufSize) {
            return typePtr;
        }
        if (typePtr->bufSize > bufSize) {
            break;
        }
        perPtr = typePtr;
        typePtr = typePtr->next;
    }
    write_runlog(DEBUG1, "[MsgPool] can't find msg type, need create new msg type, bufSize=%u.\n", bufSize);

    if (!CanCreateNewBuf(bufSize)) {
        write_runlog(ERROR, "[MsgPool] can't find suit type list, and can't create new type list.\n");
        return NULL;
    }

    CmMsgBufType *newType = CreateNewBufType(bufSize);
    if (newType == NULL) {
        write_runlog(ERROR, "[MsgPool] GetBufTypeFromPool, can't create new buf type.\n");
        return NULL;
    }
    if (perPtr == typePtr) {
        g_msgPool.list = newType;
    } else {
        perPtr->next = newType;
    }
    newType->next = typePtr;

    return newType;
}

void *AllocBufFromMsgPool(uint32 msgSize)
{
#ifndef ENABLE_UT
    char *tmp = (char *)malloc(msgSize);
    if (tmp == NULL) {
        return NULL;
    }
    errno_t rc = memset_s(tmp, msgSize, 0, msgSize);
    securec_check_errno(rc, (void)rc);
    return tmp;
#else
    if (msgSize > g_msgPool.maxPoolSize) {
        write_runlog(ERROR, "[MsgPool] alloc buf size (%u) is big, max size (%u).\n", msgSize, g_msgPool.maxPoolSize);
        return NULL;
    }
    if (msgSize == 0) {
        write_runlog(ERROR, "[MsgPool] alloc invalid buf size (%u).\n", msgSize);
        return NULL;
    }

    (void)pthread_rwlock_wrlock(&g_msgPool.rwlock);
    CmMsgBufType *typePtr = GetBufTypeFromPool(msgSize);
    if (typePtr == NULL) {
        write_runlog(ERROR, "[MsgPool] can't get buf type from pool, msgSize=%u.\n", msgSize);
        PrintMsgPoolInfoCore(LOG);
        char *tmpBuf = CreateNewTmpBuf(msgSize);
        (void)pthread_rwlock_unlock(&g_msgPool.rwlock);
        return tmpBuf;
    }
    char *bufPtr = GetBufFromTypeList(typePtr, msgSize);
    if (bufPtr == NULL) {
        write_runlog(ERROR, "[MsgPool] can't get free buf from type list, msgSize=%u.\n", msgSize);
        PrintMsgPoolInfoCore(LOG);
        char *tmpBuf = CreateNewTmpBuf(msgSize);
        (void)pthread_rwlock_unlock(&g_msgPool.rwlock);
        return tmpBuf;
    }
    (void)pthread_rwlock_unlock(&g_msgPool.rwlock);

    write_runlog(DEBUG5, "[MsgPool] AllocBufFromMsgPool, alloc buf success.\n");

    return (void*)bufPtr;
#endif
}

status_t FreeBufFromAllocList(CmMsgBufType *typePtr, const char *bufPtr)
{
    CmMsgBuf *allocPtr = typePtr->allocList;
    CmMsgBuf *perPtr = allocPtr;
    while (allocPtr != NULL) {
        if (allocPtr->data != bufPtr) {
            perPtr = allocPtr;
            allocPtr = allocPtr->next;
            continue;
        }
        errno_t rc = memset_s(allocPtr->data, typePtr->bufSize, 0, typePtr->bufSize);
        securec_check_errno(rc, (void)rc);
        if (perPtr == allocPtr) {
            typePtr->allocList = allocPtr->next;
        } else {
            perPtr->next = allocPtr->next;
        }
        allocPtr->next = typePtr->freeList;
        typePtr->freeList = allocPtr;
        --typePtr->allocCount;
        ++typePtr->freeCount;
        return CM_SUCCESS;
    }
    return CM_ERROR;
}

bool IsBufInFreeList(CmMsgBuf *freePtr, const char *buf)
{
    while (freePtr != NULL) {
        if (freePtr->data == buf) {
            return true;
        }
        freePtr = freePtr->next;
    }
    return false;
}

static status_t FreeBufFromTmpList(const char *buf)
{
    CmMsgBuf *tmpBufPtr = g_msgPool.tmpList;
    CmMsgBuf *prePtr = g_msgPool.tmpList;
    while (tmpBufPtr != NULL) {
        if (tmpBufPtr->data != buf) {
            prePtr = tmpBufPtr;
            tmpBufPtr = tmpBufPtr->next;
            continue;
        }
        if (prePtr == tmpBufPtr) {
            g_msgPool.tmpList = tmpBufPtr->next;
        } else {
            prePtr->next = tmpBufPtr->next;
        }
        free(tmpBufPtr);
        return CM_SUCCESS;
    }
    return CM_ERROR;
}

void FreeBufFromMsgPool(void *buf)
{
    if (buf == NULL) {
        write_runlog(ERROR, "[MsgPool] buf is NULL, can't do free.\n");
        return;
    }
#ifndef ENABLE_UT
    FREE_AND_RESET(buf);
    return;
#else
    CmMsgBuf *bufPtr = (CmMsgBuf*)((char*)buf - sizeof(CmMsgBuf));
    (void)pthread_rwlock_wrlock(&g_msgPool.rwlock);
    CmMsgBufType *typePtr = g_msgPool.list;
    while (typePtr != NULL) {
        if (typePtr->bufSize < bufPtr->size) {
            typePtr = typePtr->next;
            continue;
        }
        if (typePtr->bufSize > bufPtr->size) {
            break;
        }
        if (FreeBufFromAllocList(typePtr, bufPtr->data) == CM_SUCCESS) {
            (void)pthread_rwlock_unlock(&g_msgPool.rwlock);
            write_runlog(DEBUG5, "[MsgPool] buf free success.\n");
            return;
        }
        if (IsBufInFreeList(typePtr->freeList, bufPtr->data)) {
            (void)pthread_rwlock_unlock(&g_msgPool.rwlock);
            write_runlog(FATAL, "[MsgPool] buf has been free, can't do free again.\n");
            Assert(false);
            exit(1);
        }
        break;
    }
    if (FreeBufFromTmpList((char*)buf) == CM_SUCCESS) {
        (void)pthread_rwlock_unlock(&g_msgPool.rwlock);
        write_runlog(DEBUG1, "[MsgPool] tmp buf free success.\n");
        return;
    }
    (void)pthread_rwlock_unlock(&g_msgPool.rwlock);
    write_runlog(FATAL, "[MsgPool] buf not find buf in msg pool.\n");
    Assert(false);
    exit(1);
#endif
}

static void PrintMsgPoolInfoCore(int logLevel)
{
    write_runlog(logLevel, "[MsgPool] total info: curBufCount=%u, maxBufCount=%u, curPoolSize=%u, maxPoolSize=%u.\n",
        g_msgPool.curBufCount, g_msgPool.maxBufCount, g_msgPool.curPoolSize, g_msgPool.maxPoolSize);
    CmMsgBufType *typePtr = g_msgPool.list;
    while (typePtr != NULL) {
        write_runlog(logLevel, "[MsgPool] buf type info: bufSize=%u, freeCount=%u, allocCount=%u, toolCount=%u.\n",
            typePtr->bufSize, typePtr->freeCount, typePtr->allocCount, (typePtr->freeCount + typePtr->allocCount));
        typePtr = typePtr->next;
    }
}

void PrintMsgPoolInfo(int logLevel)
{
    (void)pthread_rwlock_rdlock(&g_msgPool.rwlock);
    PrintMsgPoolInfoCore(logLevel);
    (void)pthread_rwlock_unlock(&g_msgPool.rwlock);
}

void PrintMsgBufPoolUsage(int logLevel)
{
    uint32 freeCount = 0;
    uint32 allocCount = 0;
    uint32 freeSize = 0;
    uint32 allocSize = 0;
    (void)pthread_rwlock_rdlock(&g_msgPool.rwlock);
    CmMsgBufType *typePtr = g_msgPool.list;
    while (typePtr != NULL) {
        freeCount += typePtr->freeCount;
        allocCount += typePtr->allocCount;
        freeSize = (typePtr->freeCount * typePtr->bufSize);
        allocSize = (typePtr->allocCount * typePtr->bufSize);
        typePtr = typePtr->next;
    }
    (void)pthread_rwlock_unlock(&g_msgPool.rwlock);
    uint32 totalCount = freeCount + allocCount;
    uint32 totalSize = freeSize + allocSize;
    uint32 usageRate = 0;
    if (totalSize != 0) {
        usageRate = (allocSize * 100) / totalSize;
    }
    write_runlog(logLevel, "[MsgPool] usage: total_buf_count=%u, freeCount=%u, allocCount=%u; "
        "total_buf_size=%u, freeSize=%u, allocSize=%u, usage_rate=%u%%.\n",
        totalCount, freeCount, allocCount, totalSize, freeSize, allocSize, usageRate);
}

void GetTotalBufInfo(uint32 *freeCount, uint32 *allocCount, uint32 *typeCount)
{
    uint32 getFreeCount = 0;
    uint32 getAllocCount = 0;
    uint32 getTypeCount = 0;
    (void)pthread_rwlock_rdlock(&g_msgPool.rwlock);
    CmMsgBufType *typePtr = g_msgPool.list;
    while (typePtr != NULL) {
        getFreeCount += typePtr->freeCount;
        getAllocCount += typePtr->allocCount;
        ++getTypeCount;
        typePtr = typePtr->next;
    }
    (void)pthread_rwlock_unlock(&g_msgPool.rwlock);
    if (freeCount != NULL) {
        *freeCount = getFreeCount;
    }
    if (allocCount != NULL) {
        *allocCount = getAllocCount;
    }
    if (typeCount != NULL) {
        *typeCount = getTypeCount;
    }
}

void GetTypeBufInfo(uint32 bufSize, uint32 *freeCount, uint32 *allocCount)
{
    (void)pthread_rwlock_rdlock(&g_msgPool.rwlock);
    CmMsgBufType *typePtr = g_msgPool.list;
    while (typePtr != NULL) {
        if (typePtr->bufSize == bufSize) {
            if (freeCount != NULL) {
                *freeCount = typePtr->freeCount;
            }
            if (allocCount != NULL) {
                *allocCount = typePtr->allocCount;
            }
            (void)pthread_rwlock_unlock(&g_msgPool.rwlock);
            return;
        }
        if (typePtr->bufSize > bufSize) {
            break;
        }
        typePtr = typePtr->next;
    }
    (void)pthread_rwlock_unlock(&g_msgPool.rwlock);
    write_runlog(LOG, "[MsgPool] buf size(%u) not found.\n", bufSize);
}

void GetTmpBufInfo(uint32 *tmpBufCount)
{
    uint32 getTmpBufCount = 0;
    (void)pthread_rwlock_rdlock(&g_msgPool.rwlock);
    CmMsgBuf *tmpPtr = g_msgPool.tmpList;
    while (tmpPtr != NULL) {
        ++getTmpBufCount;
        tmpPtr = tmpPtr->next;
    }
    (void)pthread_rwlock_unlock(&g_msgPool.rwlock);
    if (tmpBufCount != NULL) {
        *tmpBufCount = getTmpBufCount;
    }
}