* -------------------------------------------------------------------------
* This file is part of the Vision SDK project.
* Copyright (c) 2025 Huawei Technologies Co.,Ltd.
*
* Vision SDK is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
* Description: Automatic sending of empty buffer upon timeout.
* Author: MindX SDK
* Create: 2021
* History: NA
*/
#include "MxPlugins/MxpiBufferStablizer/MxpiBufferStablizer.h"
#include <unistd.h>
#include <sys/time.h>
#include <functional>
#include "MxBase/Log/Log.h"
#include "MxBase/DeviceManager/DeviceManager.h"
#include "MxPlugins/MxpiPluginsUtils/MxpiPluginsUtils.h"
namespace {
const int INIT_DEVICEID = -1;
const int INIT_DATASIZE = 1;
const int CHECKFREQUENCY = 10;
const float SEC2MS = 1000.0;
const uint32_t MAX_INTERVAL = 100000;
}
using namespace MxBase;
using namespace MxTools;
using namespace MxPlugins;
APP_ERROR MxpiBufferStablizer::Init(std::map<std::string, std::shared_ptr<void>> &configParamMap)
{
LogInfo << "Begin to initialize MxpiBufferStablizer(" << elementName_ << ").";
configParamMap_ = &configParamMap;
auto ret = InitAndRefreshProps();
if (ret != APP_ERR_OK) {
LogError << "MxpiBufferStablizer init and refresh props failed." << GetErrorInfo(ret);
return ret;
}
CreateThread();
LogInfo << "End to initialize MxpiBufferStablizer(" << elementName_ << ").";
return APP_ERR_OK;
}
APP_ERROR MxpiBufferStablizer::DeInit()
{
LogInfo << "Begin to deinitialize MxpiBufferStablizer(" << elementName_ << ").";
runningFlag_ = false;
realCount_ = 0;
condition_.notify_all();
if (thread_.joinable()) {
thread_.join();
}
LogInfo << "End to deinitialize MxpiBufferStablizer(" << elementName_ << ").";
return APP_ERR_OK;
}
APP_ERROR MxpiBufferStablizer::Process(std::vector<MxpiBuffer *> &mxpiBuffer)
{
APP_ERROR ret = APP_ERR_OK;
LogDebug << "Begin to process MxpiBufferStablizer(" << elementName_ << ").";
ret = CheckMxpiBufferIsValid(mxpiBuffer);
if (ret != APP_ERR_OK) {
return ret;
}
if (dataSourceKeys_.size() < 1) {
LogError << "Invalid dataSourceKeys_, size must not be equal to 0!" << GetErrorInfo(APP_ERR_COMM_INVALID_PARAM);
return APP_ERR_COMM_INVALID_PARAM;
}
ret = InitAndRefreshProps();
if (ret != APP_ERR_OK) {
LogError << "MxpiBufferStablizer init and refresh props failed." << GetErrorInfo(ret);
return ret;
}
realCount_++;
mtx_.lock();
MxpiFrame inputMxpiFrame = MxpiBufferManager::GetDeviceDataInfo(*mxpiBuffer[0]);
frameId_ = inputMxpiFrame.frameinfo().frameid();
channelId_ = inputMxpiFrame.frameinfo().channelid();
LogDebug << "Current buffer with channelId(" << channelId_ << ") frameId(" << frameId_
<< ") is sent to next plugin in element(" << elementName_ << ").";
MxpiMetadataManager manager(*mxpiBuffer[0]);
if (!dataSourceKeys_[0].empty()) {
auto lastMetadata = manager.GetMetadata(dataSourceKeys_[0]);
if (lastMetadata != nullptr) {
ret = manager.AddProtoMetadata(elementName_, lastMetadata);
if (ret != APP_ERR_OK) {
errorInfo_ << "Add proto metadata failed in Process." << GetErrorInfo(ret);
SendMxpiErrorInfo(*mxpiBuffer[0], elementName_, ret, errorInfo_.str());
mtx_.unlock();
condition_.notify_all();
return ret;
}
}
}
if (allowDelay_) {
SendData(0, *mxpiBuffer[0]);
} else {
if (numSent_ > 0) {
MxpiBufferManager::DestroyBuffer(mxpiBuffer[0]);
numSent_--;
} else {
SendData(0, *mxpiBuffer[0]);
}
}
mtx_.unlock();
condition_.notify_all();
LogDebug << "End to process MxpiBufferStablizer(" << elementName_ << ").";
return ret;
}
std::vector<std::shared_ptr<void>> MxpiBufferStablizer::DefineProperties()
{
std::vector<std::shared_ptr<void>> properties;
auto tolerance = std::make_shared<ElementProperty<uint>>(ElementProperty<uint> {
UINT, "tolerance", "tolerance", "tolerance time for input buffer, unit:ms", 10000, 1, UINT32_MAX
});
auto frequency = std::make_shared<ElementProperty<uint>>(ElementProperty<uint> {
UINT, "frequency", "frequency", "time interval of auto buffer after tolerance, unit:ms", 100, 1, UINT32_MAX
});
auto allowDelay = std::make_shared<ElementProperty<uint>>(ElementProperty<uint> {
UINT, "allowDelay", "allowDelay", "delayed buffer is allowed to be sent, 0:false, 1:true", 0, 0, 1
});
properties = { tolerance, frequency, allowDelay };
return properties;
}
MxpiPortInfo MxpiBufferStablizer::DefineInputPorts()
{
MxpiPortInfo inputPortInfo;
std::vector<std::vector<std::string>> value = {{"ANY"}};
GenerateStaticInputPortsInfo(value, inputPortInfo);
return inputPortInfo;
}
MxpiPortInfo MxpiBufferStablizer::DefineOutputPorts()
{
MxpiPortInfo outputPortInfo;
std::vector<std::vector<std::string>> value = {{"ANY"}};
GenerateStaticOutputPortsInfo(value, outputPortInfo);
return outputPortInfo;
}
APP_ERROR MxpiBufferStablizer::InitAndRefreshProps()
{
ConfigParamLock();
std::vector<std::string> parameterNamesPtr = {"tolerance", "frequency", "allowDelay"};
auto ret = CheckConfigParamMapIsValid(parameterNamesPtr, *configParamMap_);
if (ret != APP_ERR_OK) {
LogError << "Config parameter map is invalid." << GetErrorInfo(ret);
ConfigParamUnlock();
return ret;
}
tolerance_ = *std::static_pointer_cast<uint32_t>((*configParamMap_)["tolerance"]);
frequency_ = *std::static_pointer_cast<uint32_t>((*configParamMap_)["frequency"]);
allowDelay_ = *std::static_pointer_cast<bool>((*configParamMap_)["allowDelay"]);
ConfigParamUnlock();
return APP_ERR_OK;
}
void MxpiBufferStablizer::CreateThread()
{
std::function<void()> timeCall = std::bind(&MxpiBufferStablizer::TimeCall, this);
thread_ = std::thread(timeCall);
}
void MxpiBufferStablizer::TimeCall(void)
{
MxBase::DeviceManager *deviceManager = MxBase::DeviceManager::GetInstance();
MxBase::DeviceContext deviceContext;
deviceContext.devId = deviceId_;
while (runningFlag_) {
std::unique_lock<std::mutex> lock(mtx_);
condition_.wait(lock);
lock.unlock();
auto interval = std::min(MAX_INTERVAL, tolerance_ / CHECKFREQUENCY);
uint64_t lastCount = realCount_;
auto startTime = std::chrono::high_resolution_clock::now();
while (runningFlag_) {
usleep(interval);
auto endTime = std::chrono::high_resolution_clock::now();
if (std::chrono::duration<double, std::milli>(endTime - startTime).count() <= tolerance_) {
continue;
}
if (realCount_ == lastCount) {
break;
}
lastCount = realCount_;
startTime = std::chrono::high_resolution_clock::now();
}
while (realCount_ == lastCount && runningFlag_) {
LogWarn << "Null buffer with channelId(" << channelId_ << ") frameId(" << frameId_
<< ") is sent to next plugin in element(" << elementName_ << ").";
usleep(static_cast<int>(frequency_ * SEC2MS));
mtx_.lock();
APP_ERROR ret = deviceManager->SetDevice(deviceContext);
if (ret != APP_ERR_OK) {
LogError << "Element(" << elementName_ << ") Failed to set deviceId."
<< GetErrorInfo(ret);
}
numSent_++;
InputParam bufferParam = {};
bufferParam.key = "";
bufferParam.deviceId = INIT_DEVICEID;
bufferParam.dataSize = INIT_DATASIZE;
MxpiFrameInfo *outputFrameInfo = &(bufferParam.mxpiFrameInfo);
outputFrameInfo->set_frameid(++frameId_);
outputFrameInfo->set_channelid(channelId_);
MxpiBuffer* outputMxpiBuffer = MxpiBufferManager::CreateHostBuffer(bufferParam);
if (outputMxpiBuffer == nullptr) {
LogError << "Create null buffer failed." << GetErrorInfo(APP_ERR_COMM_INVALID_POINTER);
mtx_.unlock();
break;
}
mtx_.unlock();
SendData(0, *outputMxpiBuffer);
}
}
}
namespace {
MX_PLUGIN_GENERATE(MxpiBufferStablizer)
}