* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* ubs-hcom is licensed under the Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include <arpa/inet.h>
#include <fcntl.h>
#include <netinet/tcp.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#include "hcom_def.h"
#include "hcom_log.h"
#include "net_oob.h"
namespace ock {
namespace hcom {
NResult OOBTCPServer::EnableAutoPortSelection(uint16_t minPort, uint16_t maxPort)
{
if (mStarted) {
NN_LOG_ERROR("Failed to enable auto port selection! oob server already start.");
return NN_ERROR;
}
if (mOobType != NET_OOB_TCP) {
NN_LOG_ERROR("Failed to enable auto port selection! OOB_TYPE is not TCP.");
return NN_ERROR;
}
if (minPort == 0 || maxPort == 0) {
NN_LOG_ERROR("Failed to enable auto port selection!, port range is invalid!");
return NN_ERROR;
}
if (minPort < NN_NO1024) {
NN_LOG_ERROR("Failed to enable auto port selection! minPort is less than 1024.");
return NN_ERROR;
}
if (maxPort < NN_NO1024) {
NN_LOG_ERROR("Failed to enable auto port selection! maxPort is less than 1024.");
return NN_ERROR;
}
if (minPort > maxPort) {
NN_LOG_ERROR("Failed to enable auto port selection! minPort is bigger than maxPort.");
return NN_ERROR;
}
if (mListenPort != 0) {
NN_LOG_WARN("oobPort will be selected automatically!");
}
mMinListenPort = minPort;
mMaxListenPort = maxPort;
mListenPort = mMinListenPort;
mIsAutoPortSelectionEnabled = true;
return NN_OK;
}
NResult OOBTCPServer::GetListenPort(uint16_t &port)
{
if (!mStarted) {
NN_LOG_ERROR("Failed to get listen port, oob server is not start");
return NN_ERROR;
}
port = mListenPort;
return NN_OK;
}
NResult OOBTCPServer::GetListenIp(std::string &ip)
{
if (!mStarted) {
NN_LOG_ERROR("Failed to get listen ip, oob server is not start");
return NN_ERROR;
}
ip = mListenIP;
return NN_OK;
}
NResult OOBTCPServer::GetUdsName(std::string &udsName)
{
if (!mStarted) {
NN_LOG_ERROR("Failed to get uds name, oob server is not start");
return NN_ERROR;
}
if (mOobType != NET_OOB_UDS) {
NN_LOG_ERROR("Failed to get uds name, oob server is not uds");
return NN_ERROR;
}
udsName = mUdsName;
return NN_OK;
}
bool BuildSockAddr(const std::string &ip, uint16_t port, sockaddr_storage &addrStorage, socklen_t &addrLen, int &family)
{
addrStorage = {};
addrLen = 0;
family = AF_UNSPEC;
sockaddr_in addr4{};
if (inet_pton(AF_INET, ip.c_str(), &addr4.sin_addr) == 1) {
addr4.sin_family = AF_INET;
addr4.sin_port = htons(port);
std::memcpy(&addrStorage, &addr4, sizeof(addr4));
addrLen = sizeof(addr4);
family = AF_INET;
return true;
}
sockaddr_in6 addr6{};
if (inet_pton(AF_INET6, ip.c_str(), &addr6.sin6_addr) == 1) {
addr6.sin6_family = AF_INET6;
addr6.sin6_port = htons(port);
std::memcpy(&addrStorage, &addr6, sizeof(addr6));
addrLen = sizeof(addr6);
family = AF_INET6;
return true;
}
return false;
}
NResult OOBTCPServer::BindAndListenCommon(int socketFD)
{
sockaddr_storage addrStorage{};
socklen_t addrLen = 0;
int family = AF_UNSPEC;
if (!BuildSockAddr(mListenIP, mListenPort, addrStorage, addrLen, family)) {
NN_LOG_ERROR("Invalid listen ip: " << mListenIP);
NetFunc::NN_SafeCloseFd(socketFD);
return NN_INVALID_IP;
}
auto ret = ::bind(socketFD, reinterpret_cast<struct sockaddr *>(&addrStorage), addrLen);
if (NN_UNLIKELY(ret < 0)) {
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to bind on " << mListenIP << ":" << mListenPort << ", error "
<< NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE));
NetFunc::NN_SafeCloseFd(socketFD);
return NN_OOB_LISTEN_SOCKET_ERROR;
}
if (NN_UNLIKELY(::listen(socketFD, OOB_DEFAULT_LISTEN_BACKLOG) < 0)) {
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to listen on " << mListenIP << ":" << mListenPort << ", error "
<< NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE));
NetFunc::NN_SafeCloseFd(socketFD);
return NN_OOB_LISTEN_SOCKET_ERROR;
}
return NN_OK;
}
NResult OOBTCPServer::BindAndListenAuto(int &socketFD)
{
bool isBindAndListenSuccess = false;
auto tmpPort = mListenPort;
while (tmpPort <= mMaxListenPort) {
sockaddr_storage addrStorage{};
socklen_t addrLen = 0;
int family = AF_UNSPEC;
if (!BuildSockAddr(mListenIP, tmpPort, addrStorage, addrLen, family)) {
NN_LOG_ERROR("Invalid listen ip: " << mListenIP);
NetFunc::NN_SafeCloseFd(socketFD);
return NN_INVALID_IP;
}
auto ret = ::bind(socketFD, reinterpret_cast<struct sockaddr *>(&addrStorage), addrLen);
if (NN_UNLIKELY(ret < 0)) {
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_DEBUG("Try to bind on " << mListenIP << ":" << tmpPort << " failed, error "
<< NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE));
++tmpPort;
continue;
}
ret = ::listen(socketFD, OOB_DEFAULT_LISTEN_BACKLOG);
if (NN_LIKELY(ret == 0)) {
isBindAndListenSuccess = true;
break;
}
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_DEBUG("Try to listen on " << mListenIP << ":" << tmpPort << " failed, error "
<< NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE));
NetFunc::NN_SafeCloseFd(socketFD);
ret = CreateAndConfigSocket(socketFD);
if (NN_UNLIKELY(ret != NN_OK)) {
NN_LOG_ERROR("Recreate socket fd failed");
return ret;
}
++tmpPort;
}
if (!isBindAndListenSuccess) {
NN_LOG_ERROR("Failed to bind and listen on port range [" << mMinListenPort << ", " << mMaxListenPort << "].");
NetFunc::NN_SafeCloseFd(socketFD);
return NN_OOB_LISTEN_SOCKET_ERROR;
}
mListenPort = tmpPort;
return NN_OK;
}
NResult OOBTCPServer::CreateAndConfigSocket(int &socketFD)
{
sockaddr_storage addrStorage{};
socklen_t addrLen = 0;
int family = AF_UNSPEC;
if (!BuildSockAddr(mListenIP, mListenPort, addrStorage, addrLen, family)) {
NN_LOG_ERROR("Invalid listen ip: " << mListenIP);
NetFunc::NN_SafeCloseFd(socketFD);
return NN_INVALID_IP;
}
auto tmpFD = ::socket(family, SOCK_STREAM, 0);
if (tmpFD < 0) {
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to create listen socket, error "
<< NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE)
<< ", please check if running of fd limit");
return NN_OOB_LISTEN_SOCKET_ERROR;
}
int value = 1;
if (NN_UNLIKELY((value = fcntl(tmpFD, F_GETFL, 0)) == -1)) {
NetFunc::NN_SafeCloseFd(tmpFD);
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to get control value for sock "
<< mIndex.oobSvrIdx << ", error " << NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE));
return NN_OOB_LISTEN_SOCKET_ERROR;
}
if (NN_UNLIKELY((value = fcntl(tmpFD, F_SETFL, uint32_t(value) | O_NONBLOCK)) == -1)) {
NetFunc::NN_SafeCloseFd(tmpFD);
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to set control value for sock "
<< mIndex.oobSvrIdx << ", error " << NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE));
return NN_OOB_LISTEN_SOCKET_ERROR;
}
int flags = 1;
int ret = ::setsockopt(tmpFD, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<void *>(&flags), sizeof(flags));
if (NN_UNLIKELY(ret < 0)) {
NetFunc::NN_SafeCloseFd(tmpFD);
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to set option, error " << NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE));
return NN_OOB_LISTEN_SOCKET_ERROR;
}
socketFD = tmpFD;
return NN_OK;
}
NResult OOBTCPServer::CreateAndStartSocket()
{
int socketFD = 0;
int ret = NN_OK;
ret = CreateAndConfigSocket(socketFD);
if (NN_UNLIKELY(ret != NN_OK)) {
return ret;
}
if (mIsAutoPortSelectionEnabled) {
ret = BindAndListenAuto(socketFD);
} else {
ret = BindAndListenCommon(socketFD);
}
if (NN_LIKELY(ret == NN_OK)) {
mListenFD = socketFD;
}
return ret;
}
NResult OOBTCPServer::Start()
{
if (mStarted) {
return NN_OK;
}
if (mNewConnectionHandler == nullptr) {
NN_LOG_ERROR("Failed to start oob server as new connection callback is not set");
return NN_OOB_CONN_CB_NOT_SET;
}
if ((!enableMultiRail) && (mWorkerLb == nullptr)) {
NN_LOG_ERROR("Failed to start oob server as load balancer is not set");
return NN_INVALID_PARAM;
}
if (mOobType == NET_OOB_UDS) {
return StartForUds();
}
if (mOobType != NET_OOB_TCP || mListenIP.empty() || mListenPort < NN_NO1024) {
NN_LOG_ERROR("Failed to start oob server as invalid type or listen ip "
<< mListenIP << " or port " << mListenPort << ", port range is 1024 ~ 65535)");
return NN_INVALID_PARAM;
}
auto ret = CreateAndStartSocket();
if (NN_UNLIKELY(ret != NN_OK)) {
NN_LOG_ERROR("Failed to create and start oob tcp socket");
return ret;
}
mEs = NetExecutorService::Create(mNewConnCbThreadNum, mNewConnCbQueueCap);
if (NN_UNLIKELY(mEs == nullptr)) {
NetFunc::NN_SafeCloseFd(mListenFD);
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to create oob connection cb thread in oob server, as "
<< NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE));
return NN_ERROR;
}
mEs->SetThreadName("OOBTcpConnHdl");
if (NN_UNLIKELY(!mEs->Start())) {
NetFunc::NN_SafeCloseFd(mListenFD);
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to start oob connection cb thread in oob server, as "
<< NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE));
return NN_ERROR;
}
mThreadStarted.store(false);
std::thread tmpThread(&OOBTCPServer::RunInThread, this);
mAcceptThread = std::move(tmpThread);
std::string thrName = "OOBTcpSvr" + mIndex.ToString();
if (pthread_setname_np(mAcceptThread.native_handle(), thrName.c_str()) != 0) {
NN_LOG_WARN("Invalid to set thread name of oob tcp server");
}
cpu_set_t cpuSet;
if (mCpuId != -1) {
CPU_ZERO(&cpuSet);
CPU_SET(mCpuId, &cpuSet);
if (pthread_setaffinity_np(mAcceptThread.native_handle(), sizeof(cpuSet), &cpuSet) != 0) {
NN_LOG_WARN("Unable to bind net oob server " << thrName << " to cpu " << mCpuId);
}
NN_LOG_INFO("Bind net oob server " << thrName << " to cpu " << mCpuId);
}
while (!mThreadStarted.load()) {
usleep(NN_NO128);
}
mStarted = true;
return NN_OK;
}
NResult OOBTCPServer::Stop()
{
if (!mStarted) {
return NN_OK;
}
mNeedStop = true;
if (mAcceptThread.joinable()) {
mAcceptThread.join();
}
if (mOobType == NET_OOB_UDS && mUdsPerm != 0) {
if (!CanonicalPath(mUdsName)) {
NN_LOG_ERROR("Uds oob file path is invalid");
return NN_INVALID_PARAM;
}
if (!NetFunc::NN_CheckFilePrefix(mUdsName)) {
NN_LOG_ERROR("Uds oob file path is invalid as prefix invalid");
return NN_INVALID_PARAM;
}
unlink(mUdsName.c_str());
}
NetFunc::NN_SafeCloseFd(mListenFD);
mStarted = false;
return NN_OK;
}
NResult OOBTCPServer::AssignUdsAddress(sockaddr_un &address, socklen_t &addressLen)
{
if (mUdsPerm == 0) {
address.sun_path[0] = '\0';
if (strcpy_s(address.sun_path + 1, sizeof(address.sun_path) - 1, mUdsName.c_str()) != EOK) {
NN_LOG_ERROR("strcpy_s uds name error.");
return NN_ERROR;
}
addressLen = sizeof(address.sun_family) + 1 + mUdsName.length();
} else {
size_t index = mUdsName.find_last_of('/');
if (NN_UNLIKELY(index == std::string::npos)) {
NN_LOG_ERROR("Uds oob file path is invalid");
return NN_INVALID_PARAM;
}
std::string udsFilePrefix = mUdsName.substr(0, index + 1);
std::string udsFileName = mUdsName.substr(index + 1, mUdsName.length());
if (!NetFunc::NN_CheckFilePrefix(udsFilePrefix)) {
NN_LOG_ERROR("Uds oob file path is invalid as prefix invalid");
return NN_INVALID_PARAM;
}
if (!CanonicalPath(udsFilePrefix)) {
NN_LOG_ERROR("Uds oob file path is invalid");
return NN_INVALID_PARAM;
}
mUdsName = udsFilePrefix + "/" + udsFileName;
if (::access(mUdsName.c_str(), 0) == 0) {
if (unlink(mUdsName.c_str()) == -1) {
NN_LOG_ERROR("Failed to unlink uds oob file");
return NN_INVALID_PARAM;
}
}
int result = 0;
if ((result = strcpy_s(address.sun_path, sizeof(address.sun_path), mUdsName.c_str())) != EOK) {
NN_LOG_ERROR("strcpy_s uds name error. result :" << result);
return NN_ERROR;
}
addressLen = sizeof(address);
}
return NN_OK;
}
NResult OOBTCPServer::StartForUds()
{
if (mUdsName.empty()) {
NN_LOG_ERROR("Failed to start oob server as invalid UDS file path");
return NN_INVALID_PARAM;
}
if (mUdsName[0] == '/' && mUdsPerm == 0) {
NN_LOG_ERROR(
"Failed to start oob server as invalid UDS file path, first char cannot be '/' for abstract namespace");
return NN_INVALID_PARAM;
}
struct sockaddr_un address {
};
socklen_t addressLen = 0;
NN_ASSERT_LOG_RETURN(sizeof(address.sun_path) - 1 > mUdsName.length(), NN_INVALID_PARAM);
bzero(&address, sizeof(address));
address.sun_family = AF_UNIX;
auto result = AssignUdsAddress(address, addressLen);
if (NN_UNLIKELY(result != NN_OK)) {
return result;
}
auto listenFd = ::socket(AF_UNIX, SOCK_STREAM, 0);
if (listenFd < 0) {
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to create listen socket, error "
<< NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE)
<< ", please check if fd is out of limit");
return NN_OOB_LISTEN_SOCKET_ERROR;
}
if (::bind(listenFd, reinterpret_cast<struct sockaddr *>(&address), addressLen) < 0) {
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to bind uds, error " << NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE));
NetFunc::NN_SafeCloseFd(listenFd);
return NN_OOB_LISTEN_SOCKET_ERROR;
}
if (NN_UNLIKELY(mCheckUdsPerm && (mUdsPerm != NN_NO0600) && (mUdsPerm != NN_NO0))) {
NN_LOG_WARN("File permission is incorrect, The file permission must be set to 0600.");
mUdsPerm = NN_NO0600;
}
chmod(mUdsName.c_str(), mUdsPerm);
if (::listen(listenFd, NN_NO1024) < 0) {
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to listen uds, error " << NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE));
NetFunc::NN_SafeCloseFd(listenFd);
return NN_OOB_LISTEN_SOCKET_ERROR;
}
mListenFD = listenFd;
mThreadStarted.store(false);
std::thread tmpThread(&OOBTCPServer::RunInThread, this);
mAcceptThread = std::move(tmpThread);
std::string thrName = "OOBUdsSvr" + mIndex.ToString();
if (pthread_setname_np(mAcceptThread.native_handle(), thrName.c_str()) != 0) {
NN_LOG_WARN("Invalid to set thread name of oob uds server");
}
while (!mThreadStarted.load()) {
usleep(NN_NO128);
}
mEs = NetExecutorService::Create(mNewConnCbThreadNum, mNewConnCbQueueCap);
if (NN_UNLIKELY(mEs == nullptr)) {
return NN_ERROR;
}
mEs->SetThreadName("OOBUdsConnHdl");
if (NN_UNLIKELY(!mEs->Start())) {
return NN_ERROR;
}
mStarted = true;
return NN_OK;
}
void OOBTCPServer::DealConnectInThread(int fd, const sockaddr_storage &peerAddr, socklen_t peerLen)
{
ConnectResp resp = ConnectResp::OK;
char ipStr[INET6_ADDRSTRLEN] = {0};
uint16_t peerPort = 0;
int family = peerAddr.ss_family;
if (family == AF_INET6) {
const auto *a6 = reinterpret_cast<const sockaddr_in6 *>(&peerAddr);
if (inet_ntop(AF_INET6, &(a6->sin6_addr), ipStr, sizeof(ipStr)) == nullptr) {
NN_LOG_ERROR("Failed to convert ipv6 number to string");
resp = SERVER_INTERNAL_ERROR;
} else {
peerPort = ntohs(a6->sin6_port);
}
} else {
const auto *a4 = reinterpret_cast<const sockaddr_in *>(&peerAddr);
if (inet_ntop(AF_INET, &(a4->sin_addr), ipStr, sizeof(ipStr)) == nullptr) {
NN_LOG_ERROR("Failed to convert ipv4 number to string");
resp = SERVER_INTERNAL_ERROR;
} else {
peerPort = ntohs(a4->sin_port);
}
}
ConnectCbTask *newConnTask = nullptr;
if (resp == ConnectResp::OK) {
newConnTask = new (std::nothrow) ConnectCbTask(mNewConnectionHandler, fd, mWorkerLb);
if (NN_UNLIKELY(newConnTask == nullptr)) {
resp = ConnectResp::CONN_ACCEPT_NEW_TASK_FAIL;
}
}
if (resp == ConnectResp::OK) {
newConnTask->SetIpPort(std::string(ipStr), peerPort, mListenPort);
if (mOobType == NET_OOB_UDS) {
newConnTask->SetUdsName(mUdsName);
}
if (NN_UNLIKELY(!mEs->Execute(newConnTask))) {
delete newConnTask;
resp = ConnectResp::CONN_ACCEPT_QUEUE_FULL;
NN_LOG_WARN("Failed to execute task, queue may be full, please retry");
}
}
if (resp != ConnectResp::OK) {
if (::send(fd, &resp, sizeof(ConnectResp), 0) <= 0) {
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to send connect resp to peer on oob @ "
<< ipStr << ":" << peerPort << ", errno:" << errno
<< " error:" << NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE));
}
}
}
void OOBTCPServer::RunInThread()
{
if (mOobType == NET_OOB_TCP) {
NN_LOG_INFO("OOB server accept thread for " << mListenIP << ":" << mListenPort << " started, load balancer "
<< (mWorkerLb == nullptr ? "null" : mWorkerLb->ToString()));
} else if (mOobType == NET_OOB_UDS) {
NN_LOG_TRACE_INFO("OOB server accept thread for " << mUdsName << " started, load balancer "
<< (mWorkerLb == nullptr ? "null" : mWorkerLb->ToString()));
} else {
NN_LOG_ERROR("Un-reachable path");
}
mThreadStarted.store(true);
int flags = 1;
auto maxRecvTimeout = NetFunc::NN_GetLongEnv("HCOM_CONNECTION_RECV_TIMEOUT_SEC", NN_NO1, NN_NO7200, NN_NO0);
auto maxSendTimeout = NetFunc::NN_GetLongEnv("HCOM_CONNECTION_SEND_TIMEOUT_SEC", NN_NO1, NN_NO7200, NN_NO0);
while (NN_UNLIKELY(mEs == nullptr || !mEs->IsStart())) {
usleep(NN_NO100);
}
while (true) {
try {
if (NN_UNLIKELY(mNeedStop)) {
NN_LOG_INFO("Got stop signal, stop listening");
break;
}
struct pollfd pollEventFd = {};
pollEventFd.fd = mListenFD;
pollEventFd.events = POLLIN;
pollEventFd.revents = 0;
int rc = poll(&pollEventFd, 1, NN_NO500);
if (rc < 0 && errno != EINTR) {
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Get poll event failed, errno "
<< NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE));
break;
}
if (rc == 0) {
continue;
}
sockaddr_storage peerAddr{};
socklen_t peerLen = sizeof(peerAddr);
int fd = ::accept(mListenFD, reinterpret_cast<sockaddr *>(&peerAddr), &peerLen);
if (fd < 0) {
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_WARN("Invalid to accept on new socket with "
<< NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE) << ", ignore and continue");
continue;
}
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<void *>(&flags), sizeof(flags));
if (maxRecvTimeout != NN_NO0) {
struct timeval recvTimeout = {maxRecvTimeout, 0};
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &recvTimeout, sizeof(recvTimeout));
}
if (maxSendTimeout != NN_NO0) {
struct timeval sendTimeout = {maxSendTimeout, 0};
setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &sendTimeout, sizeof(sendTimeout));
}
DealConnectInThread(fd, peerAddr, peerLen);
} catch (std::exception &ex) {
NN_LOG_WARN("Got exception in OOBTCPServer::RunInThread, exception " << ex.what()
<< ", ignore and continue");
} catch (...) {
NN_LOG_WARN("Got unknown exception in OOBTCPServer::RunInThread, ignore and continue");
}
}
NN_LOG_INFO("Working thread for OOBTCPServer at " << mListenIP << ":" << mListenPort << " exiting");
}
OOBTCPConnection::~OOBTCPConnection()
{
NetFunc::NN_SafeCloseFd(mFD);
}
NResult OOBTCPConnection::Send(void *buf, uint32_t size) const
{
if (NN_UNLIKELY(buf == nullptr)) {
NN_LOG_ERROR("Failed to send as buf is nullptr");
return NN_PARAM_INVALID;
}
const unsigned char *p = static_cast<const unsigned char *>(buf);
while (size > 0) {
const ssize_t result = ::send(mFD, p, size, 0);
if (result == -1) {
if (errno == EINTR) {
continue;
} else {
char errBuf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to send data to peer on oob @ "
<< mIpAndPort << ", as errno:" << errno
<< " error:" << NetFunc::NN_GetStrError(errno, errBuf, NET_STR_ERROR_BUF_SIZE));
return NN_OOB_CONN_SEND_ERROR;
}
} else if (result == 0) {
NN_LOG_ERROR("Failed to send data to peer on oob @ " << mIpAndPort << ", reset by peer");
return NN_OOB_CONN_SEND_ERROR;
}
p += result;
size -= static_cast<uint32_t>(result);
}
return NN_OK;
}
NResult OOBTCPConnection::Receive(void *buf, uint32_t size) const
{
if (NN_UNLIKELY(buf == nullptr)) {
NN_LOG_ERROR("Failed to recv as buf is nullptr");
return NN_PARAM_INVALID;
}
unsigned char *p = static_cast<unsigned char *>(buf);
while (size > 0) {
const ssize_t result = ::recv(mFD, p, size, 0);
if (result == -1) {
if (errno == EINTR) {
continue;
} else {
char errBuf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to receive data from peer on oob @ "
<< mIpAndPort << ", as errno:" << errno
<< " error:" << NetFunc::NN_GetStrError(errno, errBuf, NET_STR_ERROR_BUF_SIZE));
return NN_OOB_CONN_RECEIVE_ERROR;
}
} else if (result == 0) {
NN_LOG_ERROR("Failed to receive data from peer on oob @ " << mIpAndPort << ", peer fd closed");
return NN_OOB_CONN_RECEIVE_ERROR;
}
p += result;
size -= static_cast<uint32_t>(result);
}
return NN_OK;
}
NResult OOBTCPConnection::SendMsg(msghdr msg, uint32_t size) const
{
auto result = ::sendmsg(mFD, &msg, 0);
if (NN_LIKELY(result == size)) {
return NN_OK;
} else if (result <= 0) {
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to send msg to peer " << mIpAndPort << " result:" << result << ", as "
<< NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE));
return NN_ERROR;
} else {
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed send msg to pee, the size is un-matched required size "
<< sizeof(msg) << ", send size " << result << ", or connection error, errno "
<< NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE));
return NN_ERROR;
}
return NN_OK;
}
NResult OOBTCPConnection::ReceiveMsg(msghdr msg, uint32_t size) const
{
auto result = ::recvmsg(mFD, &msg, 0);
if (NN_LIKELY(result == size)) {
return NN_OK;
} else if (result <= 0) {
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to receive msg from peer on oob"
<< mIpAndPort << ", as " << NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE));
return NN_ERROR;
} else {
char buf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to receive data from peer, the size is un-matched required size "
<< sizeof(msg) << ", recv size " << result << ", or connection error, errno "
<< NetFunc::NN_GetStrError(errno, buf, NET_STR_ERROR_BUF_SIZE));
return NN_ERROR;
}
return NN_OK;
}
NResult OOBTCPClient::Connect(const std::string &ip, uint32_t port, OOBTCPConnection *&conn)
{
int fd = -1;
auto result = ConnectWithFd(ip, port, fd);
if (result != NN_OK) {
return result;
}
conn = new (std::nothrow) OOBTCPConnection(fd);
if (NN_UNLIKELY(conn == nullptr)) {
NN_LOG_ERROR("Failed to new oob connection, probably out of memory");
NetFunc::NN_SafeCloseFd(fd);
return NN_NEW_OBJECT_FAILED;
}
conn->ListenPort(port);
return NN_OK;
}
NResult OOBTCPClient::ConnectWithFd(const std::string &ip, uint32_t port, int &fd)
{
sockaddr_storage addrStorage{};
socklen_t addrLen = 0;
int family = AF_UNSPEC;
{
sockaddr_in addr4{};
if (inet_pton(AF_INET, ip.c_str(), &addr4.sin_addr) == 1) {
addr4.sin_family = AF_INET;
addr4.sin_port = htons(port);
std::memcpy(&addrStorage, &addr4, sizeof(addr4));
addrLen = sizeof(addr4);
family = AF_INET;
}
}
if (family == AF_UNSPEC) {
sockaddr_in6 addr6{};
if (inet_pton(AF_INET6, ip.c_str(), &addr6.sin6_addr) == 1) {
addr6.sin6_family = AF_INET6;
addr6.sin6_port = htons(port);
std::memcpy(&addrStorage, &addr6, sizeof(addr6));
addrLen = sizeof(addr6);
family = AF_INET6;
}
}
if (family == AF_UNSPEC) {
NN_LOG_ERROR("Failed to connect because ip is invalid: " << ip);
return NN_INVALID_IP;
}
auto tmpFD = ::socket(family, SOCK_STREAM, 0);
if (tmpFD < 0) {
char errBuf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to create socket, errno:" << errno << " error:"
<< NetFunc::NN_GetStrError(errno, errBuf, NET_STR_ERROR_BUF_SIZE)
<< ", please check if fd is out of limit");
return NN_OOB_CLIENT_SOCKET_ERROR;
}
int flags = 1;
setsockopt(tmpFD, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<void *>(&flags), sizeof(flags));
int synCnt = 1;
setsockopt(tmpFD, IPPROTO_TCP, TCP_SYNCNT, &synCnt, sizeof(synCnt));
uint32_t timesRetried = 0;
long maxConnRetryTimes = NN_NO5;
long maxConnRetryInterval = NN_NO20;
ConfigureSocketTimeouts(tmpFD, maxConnRetryTimes, maxConnRetryInterval);
ssize_t result = -1;
ConnectState state = ConnectState::DISCONNECTED;
ConnectResp connectStatus = ConnectResp::OK;
if (family == AF_INET6) {
sockaddr_storage localAddrStorage{};
socklen_t localAddrLen = 0;
sockaddr_in6 localAddr6{};
inet_pton(AF_INET6, mLocalEid.c_str(), &localAddr6.sin6_addr);
localAddr6.sin6_family = AF_INET6;
std::memcpy(&localAddrStorage, &localAddr6, sizeof(localAddr6));
localAddrLen = sizeof(localAddr6);
auto ret = ::bind(tmpFD, reinterpret_cast<struct sockaddr *>(&localAddrStorage), localAddrLen);
}
while (timesRetried < maxConnRetryTimes) {
switch (state) {
case ConnectState::DISCONNECTED:
NN_LOG_INFO("Trying to connect to " << ip << ":" << port);
if (timesRetried != 0) {
sleep((1 << timesRetried) > maxConnRetryInterval ? maxConnRetryInterval : (1 << timesRetried));
}
if (::connect(tmpFD, reinterpret_cast<struct sockaddr *>(&addrStorage), addrLen) == 0) {
state = ConnectState::CONNECTED;
continue;
} else {
char errBuf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Trying to connect to "
<< ip << ":" << port << " errno:" << errno
<< " error:" << NetFunc::NN_GetStrError(errno, errBuf, NET_STR_ERROR_BUF_SIZE)
<< " retry times:" << timesRetried);
}
break;
case ConnectState::CONNECTED:
result = ::recv(tmpFD, &connectStatus, sizeof(ConnectResp), 0);
if (result <= 0 || connectStatus != ConnectResp::OK) {
char errBuf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to receive connection status from peer on oob, as result:"
<< result << " errno:" << errno
<< " error:" << NetFunc::NN_GetStrError(errno, errBuf, NET_STR_ERROR_BUF_SIZE)
<< " connTaskStatus:" << connectStatus);
} else {
fd = tmpFD;
NN_LOG_INFO("Connect to " << ip << ":" << port << " successfully");
return NN_OK;
}
break;
}
timesRetried++;
}
NetFunc::NN_SafeCloseFd(tmpFD);
NN_LOG_ERROR("Failed to connect to " << ip << ":" << port << " after tried " << timesRetried << " times");
return NN_OOB_CLIENT_SOCKET_ERROR;
}
NResult OOBTCPClient::Connect(const std::string &udsName, OOBTCPConnection *&conn)
{
int fd = -1;
auto result = ConnectWithFd(udsName, fd);
if (result != NN_OK) {
return result;
}
conn = new (std::nothrow) OOBTCPConnection(fd);
if (NN_UNLIKELY(conn == nullptr)) {
NN_LOG_ERROR("Failed to new oob connection, probably out of memory");
NetFunc::NN_SafeCloseFd(fd);
return NN_NEW_OBJECT_FAILED;
}
conn->mIsUds = true;
return NN_OK;
}
NResult OOBTCPClient::ConnectWithFd(const std::string &filename, int &fd)
{
if (filename.empty()) {
NN_LOG_ERROR("Invalid name or file path to connect for uds, which is empty");
return NN_OOB_CLIENT_SOCKET_ERROR;
}
struct sockaddr_un address {
};
socklen_t addressLen = 0;
NN_ASSERT_LOG_RETURN(sizeof(address.sun_path) - 1 > filename.length(), NN_INVALID_PARAM);
bzero(&address, sizeof(address));
address.sun_family = AF_UNIX;
bool abstractNs = (filename[0] != '/');
if (abstractNs) {
address.sun_path[0] = '\0';
if (strcpy_s(address.sun_path + 1, sizeof(address.sun_path) - 1, filename.c_str()) != EOK) {
NN_LOG_ERROR("strcpy_s filename error.");
return NN_ERROR;
}
addressLen = sizeof(address.sun_family) + 1 + filename.length();
} else {
if (!CanonicalPath(const_cast<std::string &>(filename))) {
NN_LOG_ERROR("Uds oob file path is invalid");
return NN_INVALID_PARAM;
}
if (strcpy_s(address.sun_path, sizeof(address.sun_path), filename.c_str()) != EOK) {
NN_LOG_ERROR("strcpy_s filename error.");
return NN_ERROR;
}
addressLen = sizeof(address);
}
auto tmpFD = ::socket(AF_UNIX, SOCK_STREAM, 0);
if (tmpFD < 0) {
char errBuf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to create listen socket, errno:"
<< errno << " error:" << NetFunc::NN_GetStrError(errno, errBuf, NET_STR_ERROR_BUF_SIZE)
<< ", please check if fd is out of limit");
return NN_OOB_CLIENT_SOCKET_ERROR;
}
int synCnt = 1;
setsockopt(tmpFD, IPPROTO_TCP, TCP_SYNCNT, &synCnt, sizeof(synCnt));
uint32_t timesRetried = 0;
long maxConnRetryTimes = NN_NO5;
long maxConnRetryInterval = NN_NO20;
ConfigureSocketTimeouts(tmpFD, maxConnRetryTimes, maxConnRetryInterval);
while (timesRetried < maxConnRetryTimes) {
NN_LOG_INFO("Trying to connect to " << filename);
if (::connect(tmpFD, reinterpret_cast<struct sockaddr *>(&address), addressLen) == 0) {
ConnectResp connectStatus = ConnectResp::OK;
ssize_t result = ::recv(tmpFD, &connectStatus, sizeof(ConnectResp), 0);
if (result <= 0 || connectStatus != ConnectResp::OK) {
char errBuf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Failed to receive connection status from peer on oob, as result:"
<< result << " errno:" << errno
<< " error:" << NetFunc::NN_GetStrError(errno, errBuf, NET_STR_ERROR_BUF_SIZE)
<< " connTaskStatus:" << connectStatus);
} else {
fd = tmpFD;
NN_LOG_INFO("Connect to " << filename << " successfully");
return NN_OK;
}
}
if (errno == EINTR) {
continue;
}
sleep(1 << timesRetried > maxConnRetryInterval ? maxConnRetryInterval : 1 << timesRetried);
timesRetried++;
char errBuf[NET_STR_ERROR_BUF_SIZE] = {0};
NN_LOG_ERROR("Trying to connect to " << filename << " errno:" << errno << " error:"
<< NetFunc::NN_GetStrError(errno, errBuf, NET_STR_ERROR_BUF_SIZE)
<< " retry times:" << timesRetried);
}
NetFunc::NN_SafeCloseFd(tmpFD);
NN_LOG_ERROR("Failed to connect to " << filename << " after tried " << timesRetried << " times");
return NN_OOB_CLIENT_SOCKET_ERROR;
}
void OOBTCPClient::ConfigureSocketTimeouts(int &tmpFD, long &maxConnRetryTimes, long &maxConnRetryInterval)
{
maxConnRetryTimes = NetFunc::NN_GetLongEnv("HCOM_CONNECTION_RETRY_TIMES", NN_NO1, NN_NO10, NN_NO5);
maxConnRetryInterval = NetFunc::NN_GetLongEnv("HCOM_CONNECTION_RETRY_INTERVAL_SEC", NN_NO1, NN_NO60, NN_NO20);
auto maxRecvTimeout = NetFunc::NN_GetLongEnv("HCOM_CONNECTION_RECV_TIMEOUT_SEC", NN_NO1, NN_NO7200, NN_NO0);
auto maxSendTimeout = NetFunc::NN_GetLongEnv("HCOM_CONNECTION_SEND_TIMEOUT_SEC", NN_NO1, NN_NO7200, NN_NO0);
if (maxRecvTimeout != NN_NO0) {
struct timeval recvTimeout = {maxRecvTimeout, 0};
setsockopt(tmpFD, SOL_SOCKET, SO_RCVTIMEO, &recvTimeout, sizeof(timeval));
}
if (maxSendTimeout != NN_NO0) {
struct timeval sendTimeout = {maxSendTimeout, 0};
setsockopt(tmpFD, SOL_SOCKET, SO_SNDTIMEO, &sendTimeout, sizeof(timeval));
}
}
}
}