* -------------------------------------------------------------------------
* This file is part of the Vision SDK project.
* Copyright (c) 2025 Huawei Technologies Co.,Ltd.
*
* Vision SDK is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
* Description: Determines the plug-in set in the pipeline and creates an inference flow.
* Author: MindX SDK
* Create: 2020
* History: NA
*/
#ifndef MXSM_STREAM_H
#define MXSM_STREAM_H
#include <nlohmann/json.hpp>
#include <gst/gst.h>
#include <gst/app/gstappsrc.h>
#include <map>
#include <string>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include "MxStream/StreamManager/MxsmElement.h"
#include "MxBase/ErrorCode/ErrorCode.h"
#include "MxBase/BlockingQueue/BlockingQueue.h"
#include "MxStream/StreamManager/MxsmDataType.h"
#include "MxTools/PluginToolkit/buffer/MxpiBufferManager.h"
#include "MxStream/DataType/StateInfo.h"
namespace MxStream {
enum MXST_TRANSMISSION_MODE {
MXST_TRANSMISSION_DEFAULT,
MXST_TRANSMISSION_NORMAL,
MXST_TRANSMISSION_UNIQUE_ID,
MXST_TRANSMISSION_UNIQUE_ID_MULTI
};
enum MXST_RESULT_STATUS {
MXST_RESULT_STATUS_DEFAULT,
MXST_RESULT_STATUS_DROP
};
enum INPUT_OUTPUT_ELEMENT {
INPUT_ELEMENT,
OUTPUT_ELEMENT
};
struct CallbackData {
MXST_TRANSMISSION_MODE* transMode;
std::map<uint64_t, MxstProtobufAndBuffer *>* outputMap;
std::map<uint64_t, std::vector<MxstProtobufAndBuffer *>>* multiOutputMap;
std::map<uint64_t, MXST_RESULT_STATUS>* resultStatusMap;
MxBase::BlockingQueue<MxstProtobufAndBuffer *>* outputQueue;
std::mutex* mutex;
std::mutex* resultStatusMapMutex;
std::condition_variable* outputMapCond;
std::mutex* appsrcEnoughDataMutex;
std::condition_variable* appsrcEnoughDataCond;
bool* appsrcEnoughDataFlag;
std::vector<GstElement *>* multiAppsinkVec;
std::map<uint64_t, std::shared_ptr<std::condition_variable>>* uniqueIdCvMap;
};
struct ElementAndOrder {
std::string elementName;
int order;
};
* @description: create or destroy a Stream
*/
class MxsmStream {
public:
MxsmStream() : transMode_(MXST_TRANSMISSION_NORMAL), gstStream_(nullptr), isBusWatched(false), uniqueId_(0),
isTransModeInitialized_(false){};
~MxsmStream();
* @description: create and run a single Stream from json object
* @param StreamName: the name of the Stream
* @param StreamObject: json object which descripes a Stream
* @return: APP_ERROR
*/
APP_ERROR CreateStream(const std::string& streamName, const nlohmann::json& streamObject);
*/
* @description: stop and destroy a single Stream from json object
* @param: void
* @return: APP_ERROR
*/
APP_ERROR DestroyStream();
* @description: send data to the input plugin of the Stream
* @param inPluginId: the index of the input plugin
* @param dataBuffer: the data to be sent
* @return: APP_ERROR
*/
APP_ERROR SendData(int inPluginId, MxstDataInput& dataBuffer);
APP_ERROR SendData(const std::string elementName, MxstDataInput& dataBuffer);
* @description: get result from the output plugin of the Stream
* @param outPluginId: the index of the output plugin
* @return: MxstDataOutput
*/
MxstDataOutput* GetResult(int outPluginId, const uint32_t& msTimeOut = DELAY_TIME);
* @description: send data to the input plugin of the Stream
* @param inPluginId: the index of the input plugin
* @param dataBuffer: the data to be sent
* @return: APP_ERROR
*/
APP_ERROR SendDataWithUniqueId(int inPluginId, MxstDataInput& dataBuffer, uint64_t& uniqueId);
APP_ERROR SendDataWithUniqueId(const std::string elementName, MxstDataInput& dataBuffer, uint64_t& uniqueId);
APP_ERROR SendMultiDataWithUniqueId(std::vector<int> inPluginIdVec, std::vector<MxstDataInput>& dataBufferVec,
uint64_t& uniqueId);
* @description: get result from the output plugin of the Stream
* @param outPluginId: the index of the output plugin
* @return: MxstDataOutput
*/
MxstDataOutput* GetResultWithUniqueId(uint64_t uniqueId, unsigned int timeOutMs = DELAY_TIME);
std::vector<MxstDataOutput*> GetMultiResultWithUniqueId(uint64_t uniqueId, unsigned int timeOutMs = DELAY_TIME);
void DropUniqueId(uint64_t uniqueId);
APP_ERROR SendProtobuf(int inPluginId, std::vector<MxstProtobufIn>& protoVec);
APP_ERROR SendProtobuf(const std::string elementName, std::vector<MxstProtobufIn>& protoVec);
std::vector<MxstProtobufOut> GetProtobuf(int outPluginId, const std::vector<std::string>& keyVec);
APP_ERROR SendData(const std::string& elementName,
std::vector<MxstMetadataInput>& metadataVec, MxstBufferInput& dataBuffer);
MxstBufferAndMetadataOutput GetResult(const std::string& elementName,
const std::vector<std::string>& dataSourceVec, const uint32_t& msTimeOut = DELAY_TIME);
void DestroyAppSinkBufQueVec();
APP_ERROR SetElementProperty(const std::string& elementName,
const std::string& propertyName,
const std::string& propertyValue,
bool dataSourceMeta = false);
std::shared_ptr<MxstDataOutput> GetResultSP(int outPluginId, const uint32_t& msTimeOut = DELAY_TIME);
std::shared_ptr<MxstDataOutput> GetResultWithUniqueIdSP(uint64_t uniqueId, uint32_t timeOutMs = DELAY_TIME);
std::vector<std::shared_ptr<MxstDataOutput>> GetMultiResultWithUniqueIdSP(uint64_t uniqueId,
uint32_t timeOutMs = DELAY_TIME);
public:
std::map<uint64_t, MxStream::MxstProtobufAndBuffer *> outputMap_;
std::map<uint64_t, std::vector<MxstProtobufAndBuffer *>> multiOutputMap_;
std::map<uint64_t, MXST_RESULT_STATUS> resultStatusMap_;
std::map<std::string, std::vector<std::pair<std::string, int>>> dataSourceMap_;
MXST_TRANSMISSION_MODE transMode_;
private:
* @description: gstreamer callback function for handling bus message
* @param bus: the bus of the Stream
* @param msg: the message sent from the bus
* @param userData: user data sent to the callback function
* @return: APP_ERROR
*/
static APP_ERROR BusWatchCallbackFunc(GstBus* bus, GstMessage* msg, gpointer userData);
* @description: create a element and add to the Stream
* @param elementName: the name of the element
* @param elementObject: the json object of a element
* @return: APP_ERROR
*/
APP_ERROR AddElementToStream(const std::string& elementName, nlohmann::json elementObject);
* @description: link elements defined in the Stream
* @param: void
* @return: APP_ERROR
*/
APP_ERROR LinkElements();
* @description: check exsitence of a given element
* @param elementName: the name of the element
* @return: APP_ERROR
*/
APP_ERROR HaveElement(const std::string& elementName);
* @description: find unlinked elments which are not allowed
* @param: void
* @return: APP_ERROR
*/
APP_ERROR FindUnlinkedElements();
* @description: pull appsink sample
* @param: sink the pointer of appsink element
* @param: data the pointer of appsinkBufQueVec element
* @return: APP_ERROR
*/
static APP_ERROR PullSinkSampleCallback(GstElement *sink, void *data);
static APP_ERROR PullSinkSaveNormalResult(CallbackData& callbackData, MxstProtobufAndBuffer& mxstOutput);
static APP_ERROR PullSinkSaveUniqueResult(CallbackData& callbackData,
MxstProtobufAndBuffer& mxstOutput, MxTools::MxpiBuffer& mxpiBuffer);
static APP_ERROR PullSinkSaveUniqueResultMulti(CallbackData& callbackData, MxstProtobufAndBuffer& mxstOutput,
MxTools::MxpiBuffer& mxpiBuffer);
static APP_ERROR GenerateOutputData(GstBuffer& buffer, MxstDataOutput& mxstDataOutput, GstSample& sample);
APP_ERROR GetStreamDeviceId(const nlohmann::json& streamObject);
std::string GetNextElementName(const std::string& elementName);
APP_ERROR DealApp(std::unique_ptr<MxsmElement>& newElement, const std::string& elementName);
APP_ERROR DealAppsink(std::unique_ptr<MxsmElement>& newElement, const std::string& elementName);
APP_ERROR CheckSendFuncTransMode(MXST_TRANSMISSION_MODE targetTransMode, const std::string& funcName);
APP_ERROR CheckGetFuncTransMode(MXST_TRANSMISSION_MODE targetTransMode, const std::string& funcName);
APP_ERROR DropDataWhenDestroyStream(GstPad* element);
APP_ERROR RemoveAppSinkAndDropData(const GstElement* appsink);
APP_ERROR SendEosAndFlushEvent();
static MxstProtobufAndBuffer* CreateOutputBuffer();
APP_ERROR AddElement(const nlohmann::json& streamObject);
APP_ERROR ReorderElements(std::vector<std::pair<std::string, ElementAndOrder>>& previousElement);
APP_ERROR BuildPreviousElementPair(std::vector<std::pair<std::string, ElementAndOrder>>& previousElement,
const std::string& srcElementName, const int& currentOrder, const std::string& destElementName);
bool IsInOutElementNameCorrect(const std::string& elementName, INPUT_OUTPUT_ELEMENT status);
APP_ERROR SendProtobufComm(std::vector<MxstProtobufIn>& protoVec, MxTools::MxpiBuffer* &mxpiBuffer);
APP_ERROR SendDataComm(MxstDataInput& dataBuffer, MxTools::MxpiBuffer* &mxpiBuffer);
APP_ERROR SendDataWithUniqueIdComm(MxstDataInput& dataBuffer, uint64_t& uniqueId,
MxTools::MxpiBuffer* &mxpiBuffer);
APP_ERROR SendDataWithUniqueIdCommMulti(MxstDataInput& dataBuffer,
uint64_t& uniqueId, MxTools::MxpiBuffer* &mxpiBuffer, bool setUniqueIdFlag);
APP_ERROR SendProtoAndBufferComm(MxTools::MxpiBuffer* &mxpiBuffer,
std::vector<MxstMetadataInput>& protoVec, MxstBufferInput& dataBuffer);
MxstBufferAndMetadataOutput& GenerateMetaAndBufferOutput(const std::vector<std::string> &dataSourceVec,
MxstBufferAndMetadataOutput &bufferAndMetaOut,
MxstProtobufAndBuffer *protobufAndBuffer) const;
MxstBufferAndMetadataOutput& CheckRequirementsForGetResult(
const std::string& elementName, MxstBufferAndMetadataOutput& bufferAndMetaOut);
bool IsAppsrcEnoughData();
std::shared_ptr<MxstFrameExternalInfo> CreateExternalInfo(const MxstDataInput &dataBuffer, uint64_t &uniqueId);
std::shared_ptr<MxstFrameExternalInfo> CreateExternalInfoMulti(const MxstDataInput &dataBuffer, uint64_t &uniqueId,
bool setUniqueIdFlag);
void EraseElements();
void LoopRun();
APP_ERROR IsInPluginIdVecValid(std::vector<int> inPluginIdVec);
APP_ERROR CreateStreamCore(const std::string& streamName, const nlohmann::json& streamObject);
APP_ERROR DestroyStreamCore();
bool IsValidNumber(const std::string& text);
APP_ERROR GetElementAndOrder(std::unique_ptr<MxsmElement>& newElement);
APP_ERROR AppendElementToSourceMap(std::unique_ptr<MxsmElement> &newElement, std::string elementName,
int elementOrder);
APP_ERROR SetDataSourceProperty();
private:
GstElement* gstStream_;
bool isBusWatched;
std::map<std::string, std::unique_ptr<MxsmElement>> elementMap_;
std::vector<GstAppSrc *> appsrcVec_;
std::map<std::string, GstAppSrc *> appsrcMap_;
std::vector<GstElement *> appsinkVec_ = {};
std::vector<GstElement *> multiAppsinkVec_ = {};
std::map<std::string, uint32_t> appsinkIndexMap_;
std::vector<MxBase::BlockingQueue<MxstProtobufAndBuffer *> *> appsinkBufQueVec_ = {};
std::timed_mutex sendDataMutex_;
uint64_t uniqueId_;
std::string streamDeviceId_;
bool isTransModeInitialized_;
std::mutex dataMapMutex_;
std::string streamName_;
std::mutex resultStatusMapMutex_;
std::condition_variable outputMapCond_;
std::mutex appsrcEnoughDataMutex_;
std::condition_variable appsrcEnoughDataCond_;
bool appsrcEnoughDataFlag_ = false;
std::map<uint64_t, std::shared_ptr<std::condition_variable>> uniqueIdCvMap_;
StreamState streamState_ = STREAM_STATE_NEW;
std::thread threadLoop_;
GMainLoop *loop_ = nullptr;
std::vector<CallbackData *> callbackDataVec_;
private:
MxsmStream(const MxsmStream &) = delete;
MxsmStream(const MxsmStream &&) = delete;
MxsmStream& operator=(const MxsmStream &) = delete;
MxsmStream& operator=(const MxsmStream &&) = delete;
};
}
#endif