* Copyright (C) 2021 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef HDC_SUPPORT_UART
#include "uart.h"
#ifdef HOST_MAC
#include <fcntl.h>
#include <sys/ioctl.h>
#include <IOKit/serial/ioss.h>
#define B1500000 1500000
#define B921600 921600
#endif
using namespace std::chrono;
namespace Hdc {
ExternInterface HdcUARTBase::defaultInterface;
void ExternInterface::SetTcpOptions(uv_tcp_t *tcpHandle)
{
return Base::SetTcpOptions(tcpHandle);
}
int ExternInterface::SendToStream(uv_stream_t *handleStream, const uint8_t *buf, const int len)
{
return Base::SendToStream(handleStream, buf, len);
}
int ExternInterface::SendToPollFd(int fd, const uint8_t *buf, const int len)
{
return Base::SendToPollFd(fd, buf, len);
}
int ExternInterface::UvTcpInit(uv_loop_t *loop, uv_tcp_t *tcp, int socketFd)
{
if (uv_tcp_init(loop, tcp) == 0) {
return uv_tcp_open(tcp, socketFd);
} else {
return -1;
}
}
int ExternInterface::UvRead(uv_stream_t *stream, uv_alloc_cb allocCallBack, uv_read_cb readCallBack)
{
return uv_read_start(stream, allocCallBack, readCallBack);
}
int ExternInterface::StartWorkThread(uv_loop_t *loop, uv_work_cb pFuncWorkThread,
uv_after_work_cb pFuncAfterThread, void *pThreadData)
{
return Base::StartWorkThread(loop, pFuncWorkThread, pFuncAfterThread, pThreadData);
}
void ExternInterface::TryCloseHandle(const uv_handle_t *handle, uv_close_cb closeCallBack)
{
return Base::TryCloseHandle(handle, closeCallBack);
}
bool ExternInterface::TimerUvTask(uv_loop_t *loop, void *data, uv_timer_cb cb)
{
return Base::TimerUvTask(loop, data, cb);
}
bool ExternInterface::UvTimerStart(uv_timer_t *handle, uv_timer_cb cb, uint64_t timeout,
uint64_t repeat)
{
return uv_timer_start(handle, cb, timeout, repeat);
}
bool ExternInterface::DelayDo(uv_loop_t *loop, const int delayMs, const uint8_t flag, string msg,
void *data, DelayCB cb)
{
return Base::DelayDo(loop, delayMs, flag, msg, data, cb);
}
HdcUARTBase::HdcUARTBase(HdcSessionBase &sessionBaseIn, ExternInterface &interfaceIn)
: externInterface(interfaceIn), sessionBase(sessionBaseIn)
{
uartOpened = false;
}
HdcUARTBase::~HdcUARTBase(void) {}
#ifndef _WIN32
int HdcUARTBase::GetUartSpeed(int speed)
{
switch (speed) {
case UART_SPEED2400:
return (B2400);
break;
case UART_SPEED4800:
return (B4800);
break;
case UART_SPEED9600:
return (B9600);
break;
case UART_SPEED115200:
return (B115200);
break;
case UART_SPEED921600:
return (B921600);
break;
case UART_SPEED1500000:
return (B1500000);
default:
return (B921600);
break;
}
}
int HdcUARTBase::GetUartBits(int bits)
{
switch (bits) {
case UART_BIT1:
return (CS7);
break;
case UART_BIT2:
return (CS8);
break;
default:
return (CS8);
break;
}
}
#if defined(HOST_MAC)
int HdcUARTBase::SetSerial(int fd, int nSpeed, int nBits, char nEvent, int nStop)
{
WRITE_LOG(LOG_DEBUG, "mac SetSerial rate = %d", nSpeed);
struct termios options;
struct termios oldttys1;
if (tcgetattr(fd, &oldttys1) != 0) {
constexpr int bufSize = 1024;
char buf[bufSize] = { 0 };
strerror_r(errno, buf, bufSize);
WRITE_LOG(LOG_DEBUG, "tcgetattr failed with %s\n", buf);
return ERR_GENERIC;
}
if (memcpy_s(&options, sizeof(options), &oldttys1, sizeof(options)) != EOK) {
return ERR_GENERIC;
}
cfmakeraw(&options);
options.c_cc[VMIN] = 0;
options.c_cc[VTIME] = 10;
cfsetspeed(&options, B19200);
options.c_cflag |= GetUartBits(nBits);
options.c_cflag &= ~PARENB;
speed_t speed = nSpeed;
if (ioctl(fd, IOSSIOSPEED, &speed) == -1) {
WRITE_LOG(LOG_DEBUG, "set speed errno %d", errno);
}
if ((tcsetattr(fd, TCSANOW, &options)) != 0) {
WRITE_LOG(LOG_DEBUG, "com set error errno = %d", errno);
return ERR_GENERIC;
}
if (ioctl(fd, IOSSIOSPEED, &speed) == -1) {
WRITE_LOG(LOG_DEBUG, "set speed errno %d", errno);
}
WRITE_LOG(LOG_DEBUG, " SetSerial OK rate = %d", nSpeed);
return RET_SUCCESS;
}
#else
int HdcUARTBase::SetSerial(int fd, int nSpeed, int nBits, char nEvent, int nStop)
{
struct termios newttys1;
struct termios oldttys1;
if (tcgetattr(fd, &oldttys1) != 0) {
constexpr int bufSize = 1024;
char buf[bufSize] = { 0 };
strerror_r(errno, buf, bufSize);
WRITE_LOG(LOG_DEBUG, "tcgetattr failed with %s\n", buf);
return ERR_GENERIC;
}
bzero(&newttys1, sizeof(newttys1));
newttys1.c_cflag = static_cast<tcflag_t>(GetUartSpeed(nSpeed));
newttys1.c_cflag |= (CLOCAL | CREAD);
newttys1.c_cflag &= ~CSIZE;
newttys1.c_lflag &= ~ICANON;
newttys1.c_cflag |= static_cast<tcflag_t>(GetUartBits(nBits));
switch (nEvent) {
case 'O':
newttys1.c_cflag |= PARENB;
newttys1.c_iflag |= (INPCK | ISTRIP);
newttys1.c_cflag |= PARODD;
break;
case 'E':
newttys1.c_cflag |= PARENB;
newttys1.c_iflag |= (INPCK | ISTRIP);
newttys1.c_cflag &= ~PARODD;
break;
case 'N':
newttys1.c_cflag &= ~PARENB;
break;
default:
break;
}
if (nStop == UART_STOP1) {
newttys1.c_cflag &= ~CSTOPB;
} else if (nStop == UART_STOP2) {
newttys1.c_cflag |= CSTOPB;
}
newttys1.c_cc[VTIME] = 0;
newttys1.c_cc[VMIN] = 0;
if (tcflush(fd, TCIOFLUSH)) {
WRITE_LOG(LOG_DEBUG, " tcflush error.");
return ERR_GENERIC;
}
if ((tcsetattr(fd, TCSANOW, &newttys1)) != 0) {
WRITE_LOG(LOG_DEBUG, "com set error errno = %d", errno);
return ERR_GENERIC;
}
WRITE_LOG(LOG_DEBUG, " SetSerial OK rate = %d", nSpeed);
return RET_SUCCESS;
}
#endif
#endif
ssize_t HdcUARTBase::ReadUartDev(std::vector<uint8_t> &readBuf, size_t expectedSize, HdcUART &uart)
{
ssize_t totalBytesRead = 0;
uint8_t uartReadBuffer[MAX_UART_SIZE_IOBUF];
#ifdef _WIN32
DWORD bytesRead = 0;
#else
ssize_t bytesRead = 0;
#endif
do {
bytesRead = 0;
#ifdef _WIN32
BOOL bReadStatus = ReadFile(uart.devUartHandle, uartReadBuffer, sizeof(uartReadBuffer),
&bytesRead, &uart.ovRead);
if (!bReadStatus) {
if (GetLastError() == ERROR_IO_PENDING) {
bytesRead = 0;
DWORD dwMilliseconds = READ_GIVE_UP_TIME_OUT_TIME_MS;
if (expectedSize == 0) {
dwMilliseconds = INFINITE;
}
if (!GetOverlappedResultEx(uart.devUartHandle, &uart.ovRead, &bytesRead,
dwMilliseconds, FALSE)) {
DWORD error = GetLastError();
if (error == ERROR_OPERATION_ABORTED) {
totalBytesRead += bytesRead;
WRITE_LOG(LOG_DEBUG, "%s error cancel read. %u %zd", __FUNCTION__,
bytesRead, totalBytesRead);
return totalBytesRead;
} else if (error == WAIT_TIMEOUT) {
totalBytesRead += bytesRead;
WRITE_LOG(LOG_DEBUG, "%s error timeout. %u %zd", __FUNCTION__, bytesRead,
totalBytesRead);
return totalBytesRead;
} else {
WRITE_LOG(LOG_DEBUG, "%s error wait io:%d.", __FUNCTION__, GetLastError());
}
return -1;
}
} else {
WRITE_LOG(LOG_DEBUG, "%s err:%d. ", __FUNCTION__, GetLastError());
return -1;
}
}
#else
int ret = 0;
fd_set readFds;
FD_ZERO(&readFds);
FD_SET(uart.devUartHandle, &readFds);
const constexpr int msTous = 1000;
const constexpr int sTous = 1000 * msTous;
struct timeval tv;
tv.tv_sec = 0;
if (expectedSize == 0) {
tv.tv_usec = WAIT_RESPONSE_TIME_OUT_MS * msTous;
tv.tv_sec = tv.tv_usec / sTous;
tv.tv_usec = tv.tv_usec % sTous;
WRITE_LOG(LOG_DEBUG, "time = %d %d", tv.tv_sec, tv.tv_sec);
#ifdef HDC_HOST
ret = select(uart.devUartHandle + 1, &readFds, nullptr, nullptr, &tv);
#else
ret = select(uart.devUartHandle + 1, &readFds, nullptr, nullptr, nullptr);
#endif
} else {
tv.tv_usec = READ_GIVE_UP_TIME_OUT_TIME_MS * msTous;
tv.tv_sec = tv.tv_usec / sTous;
tv.tv_usec = tv.tv_usec % sTous;
ret = select(uart.devUartHandle + 1, &readFds, nullptr, nullptr, &tv);
}
if (ret == 0 and expectedSize == 0) {
if (uart.ioCancel) {
WRITE_LOG(LOG_DEBUG, "%s:uart select time out and io cancel", __FUNCTION__);
uart.ioCancel = true;
return totalBytesRead;
} else {
continue;
}
} else if (ret == 0) {
WRITE_LOG(LOG_DEBUG, "%s:uart select time out!", __FUNCTION__);
return totalBytesRead;
} else if (ret < 0) {
WRITE_LOG(LOG_DEBUG, "%s:uart select error! %d", __FUNCTION__, errno);
return -1;
} else {
bytesRead = read(uart.devUartHandle, uartReadBuffer, sizeof(uartReadBuffer));
if (bytesRead <= 0) {
WRITE_LOG(LOG_WARN, "%s:read failed! %zd:%d", __FUNCTION__, bytesRead, errno);
return -1;
}
}
#endif
if (bytesRead > 0) {
readBuf.insert(readBuf.end(), uartReadBuffer, uartReadBuffer + bytesRead);
totalBytesRead += bytesRead;
}
} while (readBuf.size() < expectedSize or
bytesRead == 0);
return totalBytesRead;
}
ssize_t HdcUARTBase::WriteUartDev(uint8_t *data, const size_t length, HdcUART &uart)
{
ssize_t totalBytesWrite = 0;
WRITE_LOG(LOG_ALL, "%s %d data %x %x", __FUNCTION__, length, *(data + sizeof(UartHead)),
*(data + sizeof(UartHead) + 1));
do {
#ifdef _WIN32
DWORD bytesWrite = 0;
BOOL bWriteStat = WriteFile(uart.devUartHandle, data + totalBytesWrite,
length - totalBytesWrite, &bytesWrite, &uart.ovWrite);
if (!bWriteStat) {
if (GetLastError() == ERROR_IO_PENDING) {
if (!GetOverlappedResult(uart.devUartHandle, &uart.ovWrite, &bytesWrite, TRUE)) {
WRITE_LOG(LOG_DEBUG, "%s error wait io:%d. bytesWrite %zu", __FUNCTION__,
GetLastError(), bytesWrite);
return -1;
}
} else {
WRITE_LOG(LOG_DEBUG, "%s err:%d. bytesWrite %zu", __FUNCTION__, GetLastError(),
bytesWrite);
return -1;
}
}
#else
ssize_t bytesWrite = 0;
bytesWrite = write(uart.devUartHandle, data + totalBytesWrite, length - totalBytesWrite);
if (bytesWrite < 0) {
if (errno == EINTR or errno == EAGAIN) {
WRITE_LOG(LOG_WARN, "EINTR/EAGAIN, try again");
continue;
} else {
constexpr int bufSize = 1024;
char buf[bufSize] = { 0 };
strerror_r(errno, buf, bufSize);
WRITE_LOG(LOG_FATAL, "write fatal errno %d:%s", errno, buf);
return -1;
}
} else {
tcdrain(uart.devUartHandle);
}
#endif
totalBytesWrite += bytesWrite;
} while (totalBytesWrite < signed(length));
return totalBytesWrite;
}
int HdcUARTBase::UartToHdcProtocol(uv_stream_t *stream, uint8_t *data, int dataSize)
{
HSession hSession = (HSession)stream->data;
int fd = hSession->dataFd[STREAM_MAIN];
fd_set fdSet;
struct timeval timeout = {3, 0};
FD_ZERO(&fdSet);
FD_SET(fd, &fdSet);
int index = 0;
int childRet = 0;
while (index < dataSize) {
childRet = select(fd + 1, NULL, &fdSet, NULL, &timeout);
if (childRet <= 0) {
constexpr int bufSize = 1024;
char buf[bufSize] = { 0 };
#ifdef _WIN32
strerror_s(buf, bufSize, errno);
#else
strerror_r(errno, buf, bufSize);
#endif
WRITE_LOG(LOG_FATAL, "%s select error:%d [%s][%d]", __FUNCTION__, errno,
buf, childRet);
break;
}
childRet = send(fd, (const char *)data + index, dataSize - index, 0);
if (childRet < 0) {
constexpr int bufSize = 1024;
char buf[bufSize] = { 0 };
#ifdef _WIN32
strerror_s(buf, bufSize, errno);
#else
strerror_r(errno, buf, bufSize);
#endif
WRITE_LOG(LOG_FATAL, "%s senddata err:%d [%s]", __FUNCTION__, errno, buf);
break;
}
index += childRet;
}
if (index != dataSize) {
WRITE_LOG(LOG_FATAL, "%s partialsenddata err:%d [%d]", __FUNCTION__, index, dataSize);
return ERR_IO_FAIL;
}
return index;
}
RetErrCode HdcUARTBase::DispatchToWorkThread(HSession hSession, uint8_t *readBuf, int readBytes)
{
if (hSession == nullptr) {
return ERR_SESSION_NOFOUND;
}
if (!UartSendToHdcStream(hSession, readBuf, readBytes)) {
return ERR_IO_FAIL;
}
return RET_SUCCESS;
}
void HdcUARTBase::DispatchPackageData(HSession hSession, std::vector<uint8_t> &data,
size_t packetSize, uint32_t packageIndex)
{
if (hSession->hUART->dispatchedPackageIndex == packageIndex) {
WRITE_LOG(LOG_WARN, "dup package %u, skip send to session logic", packageIndex);
} else {
hSession->hUART->dispatchedPackageIndex = packageIndex;
RetErrCode ret = DispatchToWorkThread(hSession, data.data(), packetSize);
if (ret == RET_SUCCESS) {
WRITE_LOG(LOG_DEBUG, "%s DispatchToWorkThread successful", __FUNCTION__);
} else {
WRITE_LOG(LOG_FATAL, "%s DispatchToWorkThread fail %d. request free session in other side",
__FUNCTION__, ret);
ResponseUartTrans(hSession->sessionId, ++hSession->hUART->packageIndex, PKG_OPTION_FREE);
}
}
}
size_t HdcUARTBase::PackageProcess(vector<uint8_t> &data, HSession hSession)
{
while (data.size() >= sizeof(UartHead)) {
size_t packetSize = 0;
uint32_t sessionId = 0;
uint32_t packageIndex = 0;
if (ValidateUartPacket(data, sessionId, packageIndex, packetSize) != RET_SUCCESS) {
WRITE_LOG(LOG_WARN, "package error. clean the read buffer.");
data.clear();
} else if (packetSize == sizeof(UartHead)) {
WRITE_LOG(LOG_ALL, "headonly Package(%zu). dont send to session, erase it", packetSize);
} else {
if (data.size() >= packetSize) {
if (hSession == nullptr) {
#ifdef HDC_HOST
hSession = GetSession(sessionId);
#else
hSession = GetSession(sessionId, true);
#endif
}
if (hSession == nullptr) {
WRITE_LOG(LOG_WARN, "have not found session (%s). skip it",
Hdc::MaskSessionIdToString(sessionId).c_str());
} else {
DispatchPackageData(hSession, data, packetSize, packageIndex);
}
} else {
WRITE_LOG(LOG_DEBUG, "valid package, however size not enough. expect %zu", packetSize);
return packetSize;
}
}
if (data.size() >= packetSize) {
data.erase(data.begin(), data.begin() + packetSize);
} else {
}
WRITE_LOG(LOG_DEBUG, "PackageProcess data.size():%d left", data.size());
}
return data.size() > 1 ? sizeof(UartHead) : 0;
}
bool HdcUARTBase::SendUARTRaw(HSession hSession, uint8_t *data, const size_t length)
{
struct UartHead *uartHeader = (struct UartHead *)data;
#ifndef HDC_HOST
HdcUART deamonUart;
deamonUart.devUartHandle = uartHandle;
if (uartHeader->IsResponsePackage()) {
ssize_t sendBytes = WriteUartDev(data, length, deamonUart);
return sendBytes > 0;
}
#endif
if (hSession == nullptr) {
hSession = GetSession(uartHeader->sessionId);
if (hSession == nullptr) {
WRITE_LOG(LOG_WARN, "%s hSession not found:%s", __FUNCTION__,
Hdc::MaskSessionIdToString(uartHeader->sessionId).c_str());
return false;
}
}
hSession->ref++;
#ifdef HDC_HOST
ssize_t sendBytes = WriteUartDev(data, length, *hSession->hUART);
#else
ssize_t sendBytes = WriteUartDev(data, length, deamonUart);
#endif
WRITE_LOG(LOG_DEBUG, "%s length:%d, sendBytes %zu", __FUNCTION__, length, sendBytes);
if (sendBytes < 0) {
WRITE_LOG(LOG_DEBUG, "%s send fail. try to freesession", __FUNCTION__);
OnTransferErrorRaw(hSession);
}
hSession->ref--;
return sendBytes > 0;
}
bool HdcUARTBase::UartSendToHdcStream(HSession hSession, uint8_t *data, size_t size)
{
int ret = RET_SUCCESS;
if (size < sizeof(UartHead)) {
WRITE_LOG(LOG_FATAL, "%s buf size too small %zu", __FUNCTION__, size);
return ERR_BUF_SIZE;
}
UartHead *head = reinterpret_cast<UartHead *>(data);
WRITE_LOG(LOG_DEBUG, "%s uartHeader:%s data: %x %x", __FUNCTION__,
head->ToDebugString().c_str(), *(data + sizeof(UartHead)),
*(data + sizeof(UartHead) + 1));
if (head->sessionId != hSession->sessionId) {
if (hSession->serverOrDaemon && !hSession->hUART->resetIO) {
WRITE_LOG(LOG_FATAL, "%s sessionId not matched, reset sessionId:%s.", __FUNCTION__,
Hdc::MaskSessionIdToString(head->sessionId).c_str());
SendUartSoftReset(hSession, head->sessionId);
hSession->hUART->resetIO = true;
ret = ERR_IO_SOFT_RESET;
}
} else {
hSession->hUART->streamSize += head->dataSize;
WRITE_LOG(LOG_ALL, "%s stream wait session read size: %zu", __FUNCTION__,
hSession->hUART->streamSize.load());
if (UartToHdcProtocol(reinterpret_cast<uv_stream_t *>(&hSession->dataPipe[STREAM_MAIN]),
data + sizeof(UartHead), head->dataSize) < 0) {
ret = ERR_IO_FAIL;
WRITE_LOG(LOG_FATAL, "%s Error uart send to stream", __FUNCTION__);
}
}
return ret == RET_SUCCESS;
}
void HdcUARTBase::NotifyTransfer()
{
WRITE_LOG(LOG_DEBUG, "%s", __FUNCTION__);
transfer.Request();
}
here we have a HandleOutputPkg vector
It is used to maintain the data reliability of the link layer
It consists of the following part
Log data to send (caller thread) --> RequestSendPackage
Send recorded data (loop sending thread) --> SendPkgInUARTOutMap
Process the returned reply data (loop reading thread) --> ProcessResponsePackage
Send reply packet (loop reading thread) --> ResponseUartTrans
The key scenarios are as follows:
Package is sent from side A to side B
Here we call the complete data package
package is divided into head and data
The response information is in the header.
data contains binary data.
case 1: Normal Process
package
A --> B
ACK
A <-- B
case 2: packet is incorrect
At least one header must be received
For this the B side needs to have an accept timeout.
There is no new data within a certain period of time as the end of the packet.
(This mechanism is not handled in HandleOutputPkg retransmission)
incorrect
A --> B
B sends NAK and A resends the packet.
NAK
A <-- B
package resend
A --> B
case 3: packet is complete lost()
package(complete lost)
A -x-> B
The A side needs to resend the Package after a certain timeout
A --> B
Until the B side has a data report (ACK or NAK), or the number of retransmissions reaches the upper
limit.
*/
void HdcUARTBase::RequestSendPackage(uint8_t *data, const size_t length, bool queue)
{
UartHead *head = reinterpret_cast<UartHead *>(data);
bool response = head->IsResponsePackage();
if (queue) {
slots.Wait(head->sessionId);
}
std::lock_guard<std::mutex> lock(mapOutPkgsMutex);
std::string pkgId = head->ToPkgIdentityString(response);
auto it = std::find_if(outPkgs.begin(), outPkgs.end(), HandleOutputPkgKeyFinder(pkgId));
if (it == outPkgs.end()) {
head->UpdateCheckSum();
outPkgs.emplace_back(pkgId, head->sessionId, data, length, response,
head->option & PKG_OPTION_ACK);
WRITE_LOG(LOG_DEBUG, "UartPackageManager: add pkg %s (pkgs size %zu)",
head->ToDebugString().c_str(), outPkgs.size());
} else {
WRITE_LOG(LOG_FATAL, "UartPackageManager: add pkg %s fail, %s has already been exist.",
head->ToDebugString().c_str(), pkgId.c_str());
}
NotifyTransfer();
}
void HdcUARTBase::ProcessResponsePackage(const UartHead &head)
{
std::lock_guard<std::mutex> lock(mapOutPkgsMutex);
bool ack = head.option & PKG_OPTION_ACK;
std::string pkgId = head.ToPkgIdentityString();
WRITE_LOG(LOG_ALL, "UartPackageManager: got response pkgId:%s ack:%d.", pkgId.c_str(), ack);
auto it = std::find_if(outPkgs.begin(), outPkgs.end(), HandleOutputPkgKeyFinder(pkgId));
if (it != outPkgs.end()) {
if (ack) {
slots.Free(it->sessionId);
outPkgs.erase(it);
WRITE_LOG(LOG_DEBUG, "UartPackageManager: erase pkgId:%s.", pkgId.c_str());
} else {
it->pkgStatus = PKG_WAIT_SEND;
WRITE_LOG(LOG_WARN, "UartPackageManager: resend pkgId:%s.", pkgId.c_str());
}
} else {
WRITE_LOG(LOG_FATAL, "UartPackageManager: hasn't found pkg for pkgId:%s.", pkgId.c_str());
for (auto pkg : outPkgs) {
WRITE_LOG(LOG_ALL, "UartPackageManager: pkgId:%s.", pkg.key.c_str());
}
}
NotifyTransfer();
return;
}
void HdcUARTBase::SendPkgInUARTOutMap()
{
std::lock_guard<std::mutex> lock(mapOutPkgsMutex);
if (outPkgs.empty()) {
WRITE_LOG(LOG_ALL, "UartPackageManager: No pkgs needs to be sent.");
return;
}
std::unordered_set<uint32_t> hasWaitPkg;
auto it = outPkgs.begin();
while (it != outPkgs.end()) {
if (it->pkgStatus == PKG_WAIT_SEND) {
if (!it->response and hasWaitPkg.find(it->sessionId) != hasWaitPkg.end()) {
it++;
continue;
}
WRITE_LOG(LOG_DEBUG, "UartPackageManager: send pkg %s", it->ToDebugString().c_str());
if (!SendUARTRaw(nullptr, it->msgSendBuf.data(), it->msgSendBuf.size())) {
WRITE_LOG(LOG_WARN, "SendUARTRaw failed!");
break;
}
if (it->response) {
WRITE_LOG(LOG_DEBUG, "UartPackageManager: erase pkg %s",
it->ToDebugString().c_str());
it = outPkgs.erase(it);
continue;
} else {
it->pkgStatus = PKG_WAIT_RESPONSE;
it->sendTimePoint = steady_clock::now();
hasWaitPkg.emplace(it->sessionId);
transfer.Sent();
}
} else if (it->pkgStatus == PKG_WAIT_RESPONSE) {
auto elapsedTime = duration_cast<milliseconds>(steady_clock::now() - it->sendTimePoint);
WRITE_LOG(LOG_DEBUG, "UartPackageManager: pkg:%s is wait ACK. elapsedTime %lld",
it->ToDebugString().c_str(), (long long)elapsedTime.count());
if (elapsedTime.count() >= WAIT_RESPONSE_TIME_OUT_MS) {
if (it->retryChance > 0) {
WRITE_LOG(LOG_WARN, "UartPackageManager: pkg:%s try resend it.",
it->ToDebugString().c_str());
it->pkgStatus = PKG_WAIT_SEND;
it->retryChance--;
NotifyTransfer();
break;
} else {
WRITE_LOG(LOG_WARN, "UartPackageManager: reach max retry ,free the session %s",
Hdc::MaskSessionIdToString(it->sessionId).c_str());
OnTransferErrorRaw(GetSession(it->sessionId));
break;
}
}
hasWaitPkg.emplace(it->sessionId);
}
it++;
}
WRITE_LOG(LOG_DEBUG, "UartPackageManager: send finish, have %zu pkgs", outPkgs.size());
}
void HdcUARTBase::ClearUARTOutMapRaw(uint32_t sessionId)
{
size_t erased = 0;
auto it = outPkgs.begin();
while (it != outPkgs.end()) {
if (it->sessionId == sessionId) {
if (!it->response) {
slots.Free(it->sessionId);
}
it = outPkgs.erase(it);
erased++;
} else {
it++;
}
}
WRITE_LOG(LOG_DEBUG, "%s UartPackageManager clean for sessionId %s, erased %zu",
__FUNCTION__, Hdc::MaskSessionIdToString(sessionId).c_str(), erased);
NotifyTransfer();
}
void HdcUARTBase::ClearUARTOutMap(uint32_t sessionId)
{
std::lock_guard<std::mutex> lock(mapOutPkgsMutex);
ClearUARTOutMapRaw(sessionId);
}
void HdcUARTBase::EnsureAllPkgsSent()
{
slots.WaitFree();
if (!outPkgs.empty()) {
std::this_thread::sleep_for(1000ms);
}
WRITE_LOG(LOG_DEBUG, "%s done.", __FUNCTION__);
}
RetErrCode HdcUARTBase::ValidateUartPacket(vector<uint8_t> &data, uint32_t &sessionId,
uint32_t &packageIndex, size_t &packetSize)
{
constexpr auto maxBufFactor = 1;
struct UartHead *head = (struct UartHead *)data.data();
WRITE_LOG(LOG_DEBUG, "%s %s", __FUNCTION__, head->ToDebugString().c_str());
if (memcmp(head->flag, PACKET_FLAG.c_str(), PACKET_FLAG.size()) != 0) {
WRITE_LOG(LOG_FATAL, "%s,PACKET_FLAG not correct %x %x", __FUNCTION__, head->flag[0],
head->flag[1]);
return ERR_BUF_CHECK;
}
if (!head->ValidateHead()) {
WRITE_LOG(LOG_FATAL, "%s head checksum not correct", __FUNCTION__);
return ERR_BUF_CHECK;
}
if (head->dataSize > MAX_UART_SIZE_IOBUF * maxBufFactor - sizeof(UartHead)) {
WRITE_LOG(LOG_FATAL, "%s dataSize too larger:%d", __FUNCTION__, head->dataSize);
return ERR_BUF_OVERFLOW;
}
sessionId = head->sessionId;
packetSize = head->dataSize + sizeof(UartHead);
packageIndex = head->packageIndex;
if ((head->option & PKG_OPTION_RESET)) {
WRITE_LOG(LOG_WARN, "%s host side want restart daemon, restart old sessionId:%s",
__FUNCTION__, Hdc::MaskSessionIdToString(head->sessionId).c_str());
ResetOldSession(head->sessionId);
return ERR_IO_SOFT_RESET;
}
if ((head->option & PKG_OPTION_FREE)) {
WRITE_LOG(LOG_WARN, "%s other side tell us the session need free:%s", __FUNCTION__,
Hdc::MaskSessionIdToString(head->sessionId).c_str());
Restartession(GetSession(head->sessionId));
}
if (data.size() >= packetSize) {
if (!head->ValidateData()) {
WRITE_LOG(LOG_FATAL, "%s data checksum not correct", __FUNCTION__);
return ERR_BUF_CHECK;
}
if (head->IsResponsePackage()) {
ProcessResponsePackage(*head);
} else {
ResponseUartTrans(head->sessionId, head->packageIndex, PKG_OPTION_ACK);
}
}
return RET_SUCCESS;
}
void HdcUARTBase::ResponseUartTrans(uint32_t sessionId, uint32_t packageIndex,
UartProtocolOption option)
{
UartHead uartHeader(sessionId, option, 0, packageIndex);
WRITE_LOG(LOG_DEBUG, "%s option:%u", __FUNCTION__, option);
RequestSendPackage(reinterpret_cast<uint8_t *>(&uartHeader), sizeof(UartHead), false);
}
int HdcUARTBase::SendUARTData(HSession hSession, uint8_t *data, const size_t length)
{
constexpr int maxIOSize = MAX_UART_SIZE_IOBUF;
WRITE_LOG(LOG_DEBUG, "SendUARTData hSession:%s, total length:%d",
Hdc::MaskSessionIdToString(hSession->sessionId).c_str(), length);
const int packageDataMaxSize = maxIOSize - sizeof(UartHead);
size_t offset = 0;
uint8_t *sendDataBuf = new(std::nothrow) uint8_t[MAX_UART_SIZE_IOBUF];
if (sendDataBuf == nullptr) {
WRITE_LOG(LOG_DEBUG, "Failed to allocate memory for sendDataBuf");
return ERR_BUF_ALLOC;
}
do {
UartHead *head = (UartHead *)sendDataBuf;
if (memset_s(head, sizeof(UartHead), 0, sizeof(UartHead)) != EOK) {
delete[] sendDataBuf;
return ERR_BUF_RESET;
}
if (memcpy_s(head->flag, sizeof(head->flag), PACKET_FLAG.c_str(), PACKET_FLAG.size()) !=
EOK) {
delete[] sendDataBuf;
return ERR_BUF_COPY;
}
head->sessionId = hSession->sessionId;
head->packageIndex = ++hSession->hUART->packageIndex;
int RemainingDataSize = static_cast<int>(length - offset);
if (RemainingDataSize > packageDataMaxSize) {
head->dataSize = static_cast<uint16_t>(packageDataMaxSize);
} else {
head->dataSize = static_cast<uint16_t>(RemainingDataSize);
head->option = head->option | PKG_OPTION_TAIL;
}
#ifdef UART_FULL_LOG
WRITE_LOG(LOG_FULL, "offset %d length %d", offset, length);
#endif
uint8_t *payload = sendDataBuf + sizeof(UartHead);
if (EOK !=
memcpy_s(payload, packageDataMaxSize, (uint8_t *)data + offset, head->dataSize)) {
WRITE_LOG(LOG_FATAL, "memcpy_s failed max %zu , need %zu",
packageDataMaxSize, head->dataSize);
delete[] sendDataBuf;
return ERR_BUF_COPY;
}
offset += head->dataSize;
uint32_t packageFullSize = sizeof(UartHead) + head->dataSize;
WRITE_LOG(LOG_ALL, "SendUARTData =============> %s", head->ToDebugString().c_str());
RequestSendPackage(sendDataBuf, packageFullSize);
} while (offset != length);
delete[] sendDataBuf;
return offset;
}
void HdcUARTBase::ReadDataFromUARTStream(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
{
HSession hSession = (HSession)stream->data;
HdcUARTBase *hUARTBase = (HdcUARTBase *)hSession->classModule;
std::lock_guard<std::mutex> lock(hUARTBase->workThreadProcessingData);
constexpr int bufSize = 1024;
char buffer[bufSize] = { 0 };
if (nread < 0) {
uv_err_name_r(nread, buffer, bufSize);
}
WRITE_LOG(LOG_DEBUG, "%s sessionId:%s, nread:%zd %s streamSize %zu", __FUNCTION__,
Hdc::MaskSessionIdToString(hSession->sessionId).c_str(), nread, buffer,
hSession->hUART->streamSize.load());
HdcSessionBase *hSessionBase = (HdcSessionBase *)hSession->classInstance;
if (nread <= 0 or nread > signed(hSession->hUART->streamSize)) {
WRITE_LOG(LOG_FATAL, "%s nothing need to do ! because no data here", __FUNCTION__);
return;
}
if (hSessionBase->FetchIOBuf(hSession, hSession->ioBuf, nread) < 0) {
WRITE_LOG(LOG_FATAL, "%s FetchIOBuf failed , free the other side session", __FUNCTION__);
hUARTBase->ResponseUartTrans(hSession->sessionId, ++hSession->hUART->packageIndex,
PKG_OPTION_FREE);
WRITE_LOG(LOG_FATAL, "%s FetchIOBuf failed , free the session", __FUNCTION__);
hSessionBase->FreeSession(hSession->sessionId);
}
hSession->hUART->streamSize -= nread;
}
bool HdcUARTBase::ReadyForWorkThread(HSession hSession)
{
if (externInterface.UvTcpInit(&hSession->childLoop, &hSession->dataPipe[STREAM_WORK],
hSession->dataFd[STREAM_WORK])) {
WRITE_LOG(LOG_FATAL, "%s init child TCP failed", __FUNCTION__);
return false;
}
hSession->dataPipe[STREAM_WORK].data = hSession;
HdcSessionBase *pSession = (HdcSessionBase *)hSession->classInstance;
externInterface.SetTcpOptions(&hSession->dataPipe[STREAM_WORK]);
if (externInterface.UvRead((uv_stream_t *)&hSession->dataPipe[STREAM_WORK],
pSession->AllocCallback, &HdcUARTBase::ReadDataFromUARTStream)) {
WRITE_LOG(LOG_FATAL, "%s child TCP read failed", __FUNCTION__);
return false;
}
WRITE_LOG(LOG_DEBUG, "%s finish", __FUNCTION__);
return true;
}
void HdcUARTBase::Restartession(const HSession session)
{
if (session != nullptr) {
WRITE_LOG(LOG_FATAL, "%s:%s", __FUNCTION__, session->ToDebugString().c_str());
ClearUARTOutMap(session->sessionId);
sessionBase.FreeSession(session->sessionId);
}
}
void HdcUARTBase::StopSession(HSession hSession)
{
if (hSession != nullptr) {
WRITE_LOG(LOG_WARN, "%s:%s", __FUNCTION__, hSession->ToDebugString().c_str());
ClearUARTOutMap(hSession->sessionId);
} else {
WRITE_LOG(LOG_FATAL, "%s: clean null session", __FUNCTION__);
}
}
void HdcUARTBase::TransferStateMachine::Wait()
{
std::unique_lock<std::mutex> lock(mutex);
WRITE_LOG(LOG_ALL, "%s", __FUNCTION__);
if (timeout) {
auto waitTimeout = std::chrono::duration_cast<std::chrono::milliseconds>(
timeoutPoint - std::chrono::steady_clock::now());
WRITE_LOG(LOG_ALL, "wait timeout %lld", waitTimeout.count());
if (cv.wait_for(lock, waitTimeout, [=] { return requested; }) == false) {
timeout = false;
WRITE_LOG(LOG_ALL, "timeout");
}
} else {
cv.wait(lock, [=] { return requested; });
}
requested = false;
}
HdcUART::HdcUART()
{
#ifdef _WIN32
(void)memset_s(&ovWrite, sizeof(OVERLAPPED), 0, sizeof(OVERLAPPED));
ovWrite.hEvent = CreateEvent(NULL, false, false, NULL);
(void)memset_s(&ovRead, sizeof(OVERLAPPED), 0, sizeof(OVERLAPPED));
ovRead.hEvent = CreateEvent(NULL, false, false, NULL);
#endif
}
HdcUART::~HdcUART()
{
#ifdef _WIN32
CloseHandle(ovWrite.hEvent);
ovWrite.hEvent = NULL;
CloseHandle(ovRead.hEvent);
ovRead.hEvent = NULL;
#endif
}
}
#endif