* This file is part of the MindStudio project.
* Copyright (c) 2025 Huawei Technologies Co.,Ltd.
*
* MindStudio is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*/
#ifndef MS_SERVICE_PROFILER_MULTITHREADBUFFERMANAGER_H
#define MS_SERVICE_PROFILER_MULTITHREADBUFFERMANAGER_H
#include <functional>
#include <string>
#include <map>
#include <set>
#include <mutex>
#include <cmath>
#include <algorithm>
#include <memory>
#include "Log.h"
#include "DbBuffer.h"
namespace msServiceProfiler {
constexpr size_t MAX_POP_SIZE = 2000;
template <typename T>
class MultiThreadBufferManager {
public:
MultiThreadBufferManager(std::function<void(std::unique_ptr<T>)> popFunc, std::function<void()> batchEndFunc)
: popFunc_(popFunc), batchEndFunc_(batchEndFunc)
{
this->thread_ = std::thread(&MultiThreadBufferManager::DumpThread, this);
pPopMarkerBuffer = std::make_unique<std::unique_ptr<T>[]>(MAX_POP_SIZE);
};
~MultiThreadBufferManager()
{
PROF_LOGD("Multi Thread Buffer Manager free");
threadExitFlag_ = true;
if (this->thread_.joinable()) {
this->thread_.join();
}
PROF_LOGD("Multi Thread Buffer Manager free thread over");
std::lock_guard<std::mutex> lock(mtx_);
lifeEndFlag_ = true;
workingDbBuffers_.clear();
disableDbBuffers_.clear();
};
void DumpThread()
{
constexpr int SUITABLE_DUMP_SIZE = 1000;
constexpr int MAX_WAIT_US = 50000;
constexpr int MIN_WAIT_US = 50;
int waitUs = MIN_WAIT_US;
std::set<std::shared_ptr<DbBuffer<T>>> disableDbBuffers;
std::vector<std::shared_ptr<DbBuffer<T>>> workingDbBuffers;
while (!threadExitFlag_) {
std::this_thread::sleep_for(std::chrono::microseconds(waitUs));
{
std::lock_guard<std::mutex> lock(mtx_);
if (workingDbBuffers.size() != workingDbBuffers_.size() ||
disableDbBuffers.size() != disableDbBuffers_.size()) {
disableDbBuffers = disableDbBuffers_;
workingDbBuffers = workingDbBuffers_;
}
}
std::vector<DbBuffer<T> *> freeDbBuffers;
auto popCount = popFromBuffers(workingDbBuffers, disableDbBuffers, freeDbBuffers);
double diff = std::max(std::min((SUITABLE_DUMP_SIZE - popCount) / 400.0, 2.5), -2.5);
int diff_exp = static_cast<int>(exp(diff));
waitUs = std::min(std::max(waitUs * diff_exp, MIN_WAIT_US), MAX_WAIT_US);
{
std::lock_guard<std::mutex> lock(mtx_);
for (auto *pBuffer : freeDbBuffers) {
std::shared_ptr<DbBuffer<T>> pTempBuffer(pBuffer, [](DbBuffer<T> *) {});
workingDbBuffers_.erase(
std::remove(workingDbBuffers_.begin(), workingDbBuffers_.end(), pTempBuffer),
workingDbBuffers_.end());
disableDbBuffers_.erase(pTempBuffer);
}
workingDbBuffers = workingDbBuffers_;
disableDbBuffers = disableDbBuffers_;
}
if (popCount > 0) {
PROF_LOGD("buffer thread pop %d items", popCount);
}
}
}
int popFromBuffers(const std::vector<std::shared_ptr<DbBuffer<T>>> &workingDbBuffers,
std::set<std::shared_ptr<DbBuffer<T>>> &disableDbBuffers, std::vector<DbBuffer<T> *> &freeDbBuffers)
{
int popCount = 0;
std::unique_ptr<T> *pMarkers = pPopMarkerBuffer.get();
for (const auto &pBuffer : workingDbBuffers) {
size_t popSize = pBuffer->Pop(MAX_POP_SIZE, pMarkers);
for (size_t i = 0; i < popSize; ++i) {
if (pMarkers[i] == nullptr) {
continue;
}
popFunc_(std::move(pMarkers[i]));
pMarkers[i] = nullptr;
}
popCount += static_cast<int>(popSize);
if (popSize == 0 && disableDbBuffers.find(pBuffer) != disableDbBuffers.end()) {
freeDbBuffers.push_back(pBuffer.get());
}
}
batchEndFunc_();
return popCount;
}
std::shared_ptr<DbBuffer<T>> Register(uintptr_t pThreadIns)
{
std::lock_guard<std::mutex> lock(mtx_);
auto pBuffer = std::make_shared<DbBuffer<T>>();
mapBuffer_[pThreadIns] = pBuffer;
workingDbBuffers_.push_back(pBuffer);
return pBuffer;
}
void Unregister(uintptr_t pThreadIns)
{
std::lock_guard<std::mutex> lock(mtx_);
if (lifeEndFlag_) {
return;
}
if (mapBuffer_.find(pThreadIns) != mapBuffer_.end()) {
auto pBuffer = mapBuffer_.at(pThreadIns);
mapBuffer_.erase(pThreadIns);
disableDbBuffers_.insert(pBuffer);
}
}
private:
std::mutex mtx_;
std::thread thread_;
bool lifeEndFlag_ = false;
bool threadExitFlag_ = false;
std::map<uintptr_t, std::shared_ptr<DbBuffer<T>>> mapBuffer_{};
std::set<std::shared_ptr<DbBuffer<T>>> disableDbBuffers_{};
std::vector<std::shared_ptr<DbBuffer<T>>> workingDbBuffers_{};
std::unique_ptr<std::unique_ptr<T>[]> pPopMarkerBuffer{};
std::function<void(std::unique_ptr<T>)> popFunc_;
std::function<void()> batchEndFunc_;
};
}
#endif