* Copyright (c) Huawei Technologies Co., Ltd. 2026. All rights reserved.
* ubs-comm is licensed under the Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include <iostream>
#include "common/ubsocket_common_includes.h"
#include "common/ubsocket_global_setting.h"
#include "common/ubsocket_signal_handler.h"
#include "common/ubsocket_version.h"
#include "core/ubsocket_event_epoll.h"
#include "core/umq/umq_backend.h"
#include "core/umq/umq_setting.h"
#include "include/ubsocket.h"
#include "profiling/probe/probe_manager.h"
#include "profiling/statistics/statistics.h"
#include "profiling/statistics/ubsocket_print_stats_mgr.h"
#include "ubsocket_struct_helper.h"
#include "under_api/dl_api.h"
using namespace ock::ubs;
UBS_API int ubsocket_init_options(u_init_options_t *options)
{
if (options == nullptr) {
errno = EINVAL;
return UBS_ERROR;
}
UBS_SLOG_DEBUG(*options);
options->allowed_protocol = UBS_PROTOCOL_TCP;
options->async_acceptor_thread_count = 0;
options->async_connector_thread_count = 0;
options->async_epoll_thread_count = 1;
options->lock_ops = nullptr;
options->rw_lock_ops = nullptr;
options->sem_ops = nullptr;
return UBS_OK;
}
void ZeroCopyPrepare()
{
uint32_t type = GlobalSetting::UBS_ALLOWED_PROTOCOL;
if (type == UBS_PROTOCOL_UB_RM_RTP || type == UBS_PROTOCOL_UB_RC_RTP) {
g_zcopy_allocator = new (std::nothrow) umq::UmqZeroCopyAllocator();
} else {
UBS_VLOG_WARN("unknown zcopy allocator type");
return;
}
UbsZcopyAdapter adapter;
if (!adapter.Intercept(GlobalSetting::UBS_BRPC_ALLOC_SYM_STR.c_str(),
GlobalSetting::UBS_BRPC_DEALLOC_SYM_STR.c_str())) {
UBS_VLOG_WARN("Failed to hook brpc allocator, fallback to TCP mode");
GlobalSetting::UBS_NATIVE_TCP_MODE = true;
}
UBS_VLOG_INFO("Successfully hooked brpc zero-copy allocator");
}
UBS_API int ubsocket_init(u_init_options_t *options)
{
if (options == nullptr) {
errno = EINVAL;
return UBS_ERROR;
}
std::lock_guard<std::mutex> guard(GlobalSetting::MUTEX);
if (GlobalSetting::UBS_INITED) {
return UBS_OK;
}
UBS_VLOG_DEBUG("init global setting");
GlobalSetting::AddRules();
GlobalSetting::LoadEnv();
GlobalSetting::UBS_ALLOWED_PROTOCOL = options->allowed_protocol;
if (options->async_acceptor_thread_count > 0) {
GlobalSetting::UBS_ACCEPTOR_ASYNC_THREAD_COUNT = options->async_acceptor_thread_count;
}
if (options->async_connector_thread_count > 0) {
GlobalSetting::UBS_CONNECTOR_ASYNC_THREAD_COUNT = options->async_connector_thread_count;
}
if (options->async_epoll_thread_count > 0) {
GlobalSetting::UBS_EPOLL_ASYNC_THREAD_COUNT = options->async_epoll_thread_count;
}
auto result = GlobalSetting::VerifySetting();
if (result != UBS_OK) {
UBS_VLOG_ERR("initialize failed as options are invalid");
errno = EINVAL;
return UBS_ERROR;
}
UBS_VLOG_DEBUG("load under api");
result = DlApi::Load(LOAD_LIBC | LOAD_UMQ);
if (result != UBS_OK) {
errno = EBADF;
return UBS_ERROR;
}
UBS_VLOG_DEBUG("register mutex and sem ops");
(void)LockRegistry::RegisterDefaultOps();
if (options->lock_ops != nullptr) {
result = LockRegistry::RegisterLockOps(options->lock_ops);
if (result != UBS_OK) {
errno = EBADF;
return UBS_ERROR;
}
}
if (options->rw_lock_ops != nullptr) {
result = LockRegistry::RegisterRwLockOps(options->rw_lock_ops);
if (result != UBS_OK) {
errno = EBADF;
return UBS_ERROR;
}
}
if (options->sem_ops != nullptr) {
result = LockRegistry::RegisterSemOps(options->sem_ops);
if (result != UBS_OK) {
errno = EBADF;
return UBS_ERROR;
}
}
ArraySet<Socket>::GetInstance().Init();
g_socket_epoll_lock = LockRegistry::RW_LOCK_OPS.create();
ArraySet<EventPoll>::GetInstance().Init();
if (GlobalSetting::USE_BRPC_ZCOPY) {
ZeroCopyPrepare();
}
result = umq::UmqBackend::Init();
if (result != UBS_OK) {
UBS_VLOG_ERR("umq backend init failed, ret: %d\n", result);
return UBS_ERROR;
}
std::signal(SIGUSR2, ubsocket_handle_signal);
if (GlobalSetting::UBS_PROF_ENABLE) {
result = Profiling::Init(ProfilingTPId::UBSOCKET_PROF_COUNT, GlobalSetting::UBS_PROF_DUMP_PATH.c_str(),
GlobalSetting::UBS_PROF_DUMP_INTERVAL_MIN);
if (result != UBS_OK) {
UBS_VLOG_WARN("Profiling is not initialize \n");
}
}
if (GlobalSetting::UBS_CLI_ENABLED) {
(void)Statistics::GlobalStatsMgr::GetGlobalStatsMgr(umq::UmqSetting::UMQ_TRANS_MODE);
}
if (GlobalSetting::UBS_PROBE_ENABLED) {
(void)Statistics::ProbeManager::GetInstance().Start(GlobalSetting::UBS_PROBE_MS, GlobalSetting::UBS_PROBE_BATCH,
-1);
}
GlobalSetting::UBS_INITED = true;
if (GlobalSetting::UBS_TRACE_ENABLED) {
umq_trans_mode_t transMode = umq::UmqSetting::UMQ_TRANS_MODE;
Statistics::PrintStatsMgr::StartStatsCollection(GlobalSetting::UBS_TRACE_TIME,
GlobalSetting::UBS_TRACE_FILE_PATH,
GlobalSetting::UBS_TRACE_FILE_SIZE, transMode);
}
return UBS_OK;
}
void ubsocket_uninit()
{
if (GlobalSetting::UBS_PROF_ENABLE) {
Profiling::Uninit();
}
if (GlobalSetting::UBS_TRACE_ENABLED) {
Statistics::PrintStatsMgr::StopStatsCollection();
}
umq::UmqBackend::UnInit();
return;
}
UBS_API const char *ubsocket_version()
{
std::cout << "full version: " << UBS_LIB_VERSION_FULL << std::endl;
return UBS_LIB_VERSION;
}
void UmqLogger(int level, char *log_msg)
{
auto new_level = umq_log_level::UMQ_LOG_LEVEL_DEBUG - level;
if (new_level <= LogLevel::LEVEL_ERR) {
UBS_LOG_STREAM_RAW(new_level, log_msg);
} else {
static const char *OTHER_LEVEL[] = {"EMERG", "ALERT", "CRIT"};
UBS_LOG_STREAM_RAW(LogLevel::LEVEL_ERR, OTHER_LEVEL[level % (sizeof(OTHER_LEVEL))] << ", " << log_msg);
}
}
void UmqExtLogger(int level, const char *file, const char *function, int line, char *log_msg)
{
auto new_level = umq_log_level::UMQ_LOG_LEVEL_DEBUG - level;
if (new_level <= LogLevel::LEVEL_ERR) {
UBS_LOG_STREAM_EXT_RAW(new_level, file, function, line, log_msg);
} else {
static const char *OTHER_LEVEL[] = {"EMERG", "ALERT", "CRIT"};
UBS_LOG_STREAM_EXT_RAW(LogLevel::LEVEL_ERR, file, function, line,
OTHER_LEVEL[level % (sizeof(OTHER_LEVEL))] << ", " << log_msg);
}
}
UBS_API int ubsocket_set_logger(void (*func)(int level, const char *msg, const char *filename, int line))
{
Logger::Instance().SetExternalLogFunction(func);
umq_log_config_t log_cfg = {
.log_flag = UMQ_LOG_FLAG_LEVEL | UMQ_LOG_FLAG_EXT_FUNC,
.func = nullptr,
.ext_func = UmqExtLogger,
.level = static_cast<umq_log_level_t>(umq_log_level::UMQ_LOG_LEVEL_DEBUG - Logger::Instance().GetLogLevel())};
return umq_log_config_set(&log_cfg);
}
UBS_API int ubsocket_set_log_level(int level)
{
Logger::Instance().SetLogLevel(level);
umq_log_config_t log_cfg = {.log_flag = UMQ_LOG_FLAG_LEVEL,
.func = nullptr,
.ext_func = nullptr,
.level = static_cast<umq_log_level_t>(umq_log_level::UMQ_LOG_LEVEL_DEBUG - level)};
return umq_log_config_set(&log_cfg);
}
UBS_API void *ubsocket_iobuf_allocate(size_t size)
{
uint32_t type = GlobalSetting::UBS_ALLOWED_PROTOCOL;
if (type == UBS_PROTOCOL_UB_RM_RTP || type == UBS_PROTOCOL_UB_RC_RTP) {
if (g_zcopy_allocator == nullptr) {
g_zcopy_allocator = new (std::nothrow) umq::UmqZeroCopyAllocator();
}
} else {
UBS_VLOG_WARN("unknown zcopy allocator type");
return nullptr;
}
return blockmem_allocate_zero_copy(size);
}
UBS_API void ubsocket_iobuf_deallocate(void *addr)
{
blockmem_deallocate_zero_copy(addr);
}