#include "datasystem/transfer_engine/transfer_engine.h"
#include <chrono>
#include <functional>
#include <thread>
#include <unordered_map>
#include <glog/logging.h>
#include "internal/connection/connection_manager.h"
#include "internal/control_plane/control_plane.h"
#include "internal/backend/mock_data_plane_backend.h"
#include "internal/control_plane/transfer_control_service.h"
#include "internal/log/logging.h"
#include "internal/log/environment_dump.h"
#include "internal/runtime/acl_runtime_helper.h"
#ifdef TRANSFER_ENGINE_ENABLE_P2P_THIRD_PARTY
#include "internal/backend/ascend/p2p_transfer_backend.h"
#endif
#include "internal/memory/registered_memory_table.h"
#include "datasystem/transfer_engine/status_helper.h"
namespace datasystem {
namespace {
constexpr int32_t kRpcOkCode = 0;
constexpr int32_t kConnReadyRetryCount = 50;
constexpr int32_t kConnReadyRetryIntervalMs = 10;
constexpr uint64_t kDefaultRecvWaitTimeoutMs = 10000;
constexpr int32_t kDefaultRpcThreads = 8;
Result RpcCodeToStatus(int32_t code, const std::string &msg)
{
if (code == kRpcOkCode) {
return Result::OK();
}
switch (static_cast<ErrorCode>(code)) {
case ErrorCode::kInvalid:
case ErrorCode::kNotFound:
case ErrorCode::kRuntimeError:
case ErrorCode::kNotReady:
case ErrorCode::kNotAuthorized:
case ErrorCode::kNotSupported:
return TE_MAKE_STATUS(static_cast<ErrorCode>(code), msg);
default:
return TE_MAKE_STATUS(ErrorCode::kRuntimeError, msg);
}
}
Result ParseTargetHostname(const std::string &targetHostname, std::string *peerHost, uint16_t *peerPort)
{
TE_CHECK_PTR_OR_RETURN(peerHost);
TE_CHECK_PTR_OR_RETURN(peerPort);
TE_CHECK_OR_RETURN(!targetHostname.empty(), ErrorCode::kInvalid, "targetHostname is empty");
std::string host;
std::string portStr;
if (targetHostname.front() == '[') {
const size_t close = targetHostname.find(']');
TE_CHECK_OR_RETURN(close != std::string::npos, ErrorCode::kInvalid, "invalid targetHostname");
TE_CHECK_OR_RETURN(close + 2 <= targetHostname.size() && targetHostname[close + 1] == ':',
ErrorCode::kInvalid, "targetHostname missing port");
host = targetHostname.substr(1, close - 1);
portStr = targetHostname.substr(close + 2);
} else {
const size_t sep = targetHostname.rfind(':');
TE_CHECK_OR_RETURN(sep != std::string::npos, ErrorCode::kInvalid, "targetHostname missing host:port");
host = targetHostname.substr(0, sep);
portStr = targetHostname.substr(sep + 1);
}
TE_CHECK_OR_RETURN(!host.empty(), ErrorCode::kInvalid, "targetHostname host is empty");
TE_CHECK_OR_RETURN(!portStr.empty(), ErrorCode::kInvalid, "targetHostname port is empty");
uint64_t portValue = 0;
for (char c : portStr) {
TE_CHECK_OR_RETURN(c >= '0' && c <= '9', ErrorCode::kInvalid, "targetHostname port is invalid");
portValue = portValue * 10 + static_cast<uint64_t>(c - '0');
TE_CHECK_OR_RETURN(portValue <= 65535, ErrorCode::kInvalid, "targetHostname port is invalid");
}
TE_CHECK_OR_RETURN(portValue > 0, ErrorCode::kInvalid, "targetHostname port should be positive");
*peerHost = host;
*peerPort = static_cast<uint16_t>(portValue);
return Result::OK();
}
Result ParseDeviceId(const std::string &deviceName, int32_t *deviceId)
{
TE_CHECK_PTR_OR_RETURN(deviceId);
constexpr char kDevicePrefix[] = "npu:";
constexpr size_t kDevicePrefixLen = sizeof(kDevicePrefix) - 1;
TE_CHECK_OR_RETURN(deviceName.size() > kDevicePrefixLen, ErrorCode::kInvalid, "device_name is invalid");
TE_CHECK_OR_RETURN(deviceName.compare(0, kDevicePrefixLen, kDevicePrefix) == 0,
ErrorCode::kInvalid, "device_name should match npu:${device_id}");
int64_t parsedDeviceId = 0;
for (size_t i = kDevicePrefixLen; i < deviceName.size(); ++i) {
const char c = deviceName[i];
TE_CHECK_OR_RETURN(c >= '0' && c <= '9', ErrorCode::kInvalid,
"device_name should match npu:${device_id}");
parsedDeviceId = parsedDeviceId * 10 + static_cast<int64_t>(c - '0');
TE_CHECK_OR_RETURN(parsedDeviceId <= std::numeric_limits<int32_t>::max(), ErrorCode::kInvalid,
"device_id is out of range");
}
*deviceId = static_cast<int32_t>(parsedDeviceId);
return Result::OK();
}
}
class TransferEngineState {
public:
std::unordered_map<std::string, int32_t> endpointOwnerDeviceCache;
};
TransferEngine::TransferEngine()
#ifdef TRANSFER_ENGINE_ENABLE_P2P_THIRD_PARTY
: TransferEngine(std::make_shared<P2PTransferBackend>())
#else
: TransferEngine(std::make_shared<MockDataPlaneBackend>())
#endif
{
}
TransferEngine::TransferEngine(std::shared_ptr<IDataPlaneBackend> backend)
: connMgr_(std::make_shared<ConnectionManager>()),
registeredMemory_(std::make_shared<RegisteredMemoryTable>()),
backend_(std::move(backend)),
controlService_(nullptr),
controlClient_(std::make_shared<SocketControlClient>()),
controlServer_(std::make_shared<SocketControlServer>()),
state_(std::make_unique<TransferEngineState>())
{
}
TransferEngine::~TransferEngine()
{
(void)Finalize();
}
Result TransferEngine::Initialize(const std::string &localHostname, const std::string &protocol,
const std::string &deviceName)
{
internal::EnsureGlogInitialized();
std::string localHost;
uint16_t localPort = 0;
int32_t deviceId = -1;
TE_RETURN_IF_ERROR(ParseTargetHostname(localHostname, &localHost, &localPort));
TE_RETURN_IF_ERROR(ParseDeviceId(deviceName, &deviceId));
std::lock_guard<std::mutex> lock(apiMutex_);
TE_CHECK_OR_RETURN(!initialized_, ErrorCode::kInvalid, "transfer engine already initialized");
TE_CHECK_OR_RETURN(backend_ != nullptr, ErrorCode::kInvalid, "backend is null");
LOG(INFO) << "transfer engine initialize start"
<< ", local_hostname=" << localHostname << ", protocol=" << protocol
<< ", device_name=" << deviceName << ", local_host=" << localHost << ", local_port=" << localPort
<< ", device_id=" << deviceId
<< ", rpc_threads=" << kDefaultRpcThreads;
localHost_ = localHost;
localPort_ = localPort;
deviceId_ = deviceId;
rpcThreads_ = kDefaultRpcThreads;
controlService_ = CreateTransferControlService(localHost_, localPort_, deviceId_, connMgr_, registeredMemory_,
backend_);
Result startRc = controlServer_->Start(localHost_, localPort_, controlService_, rpcThreads_);
if (startRc.IsError()) {
LOG(ERROR) << "control server start failed"
<< ", local_host=" << localHost_ << ", local_port=" << localPort_
<< ", device_id=" << deviceId_ << ", reason=" << startRc.ToString();
return startRc;
}
initialized_ = true;
LOG(INFO) << "transfer engine initialize success"
<< ", local_host=" << localHost_ << ", local_port=" << localPort_
<< ", device_id=" << deviceId_;
return Result::OK();
}
int32_t TransferEngine::GetRpcPort()
{
std::lock_guard<std::mutex> lock(apiMutex_);
if (!initialized_) {
return -1;
}
return static_cast<int32_t>(localPort_);
}
Result TransferEngine::RegisterMemory(uintptr_t bufferAddrRegisrterch, size_t length)
{
return BatchRegisterMemory({bufferAddrRegisrterch}, {length});
}
Result TransferEngine::BatchRegisterMemory(const std::vector<uintptr_t> &bufferAddrs, const std::vector<size_t> &lengths)
{
TE_CHECK_OR_RETURN(!bufferAddrs.empty(), ErrorCode::kInvalid, "bufferAddrs is empty");
TE_CHECK_OR_RETURN(bufferAddrs.size() == lengths.size(), ErrorCode::kInvalid, "bufferAddrs/lengths size mismatch");
std::lock_guard<std::mutex> lock(apiMutex_);
TE_CHECK_OR_RETURN(initialized_, ErrorCode::kNotReady, "transfer engine not initialized");
TE_CHECK_OR_RETURN(deviceId_ >= 0, ErrorCode::kNotReady, "device_id is invalid");
std::vector<uint64_t> addedBaseAddrs;
addedBaseAddrs.reserve(bufferAddrs.size());
for (size_t i = 0; i < bufferAddrs.size(); ++i) {
TE_CHECK_OR_RETURN(bufferAddrs[i] > 0, ErrorCode::kInvalid, "bufferAddr should be positive");
TE_CHECK_OR_RETURN(lengths[i] > 0, ErrorCode::kInvalid, "length should be positive");
RegisteredRegion region{ static_cast<uint64_t>(bufferAddrs[i]), static_cast<uint64_t>(lengths[i]), deviceId_ };
if (!registeredMemory_->AddRegion(region)) {
for (uint64_t addr : addedBaseAddrs) {
(void)registeredMemory_->RemoveByBaseAddr(addr);
}
return Result(ErrorCode::kInvalid, "failed to add registered region");
}
addedBaseAddrs.push_back(region.baseAddr);
}
LOG(INFO) << "batch register memory success"
<< ", device_id=" << deviceId_ << ", count=" << bufferAddrs.size();
return Result::OK();
}
Result TransferEngine::UnregisterMemory(uintptr_t bufferAddrRegisrterch)
{
return BatchUnregisterMemory({bufferAddrRegisrterch});
}
Result TransferEngine::BatchUnregisterMemory(const std::vector<uintptr_t> &bufferAddrs)
{
std::lock_guard<std::mutex> lock(apiMutex_);
TE_CHECK_OR_RETURN(initialized_, ErrorCode::kNotReady, "transfer engine not initialized");
TE_CHECK_OR_RETURN(!bufferAddrs.empty(), ErrorCode::kInvalid, "bufferAddrs is empty");
for (size_t i = 0; i < bufferAddrs.size(); ++i) {
TE_CHECK_OR_RETURN(bufferAddrs[i] > 0, ErrorCode::kInvalid, "bufferAddr should be positive");
TE_CHECK_OR_RETURN(registeredMemory_->RemoveByBaseAddr(static_cast<uint64_t>(bufferAddrs[i])),
ErrorCode::kNotFound, "region is not registered");
}
LOG(INFO) << "batch unregister memory success, count=" << bufferAddrs.size();
return Result::OK();
}
Result TransferEngine::TransferSyncRead(const std::string &targetHostname, uintptr_t buffer, uintptr_t peerBufferAddress,
size_t length)
{
return BatchTransferSyncRead(targetHostname, {buffer}, {peerBufferAddress}, {length});
}
Result TransferEngine::BatchTransferSyncRead(const std::string &targetHostname, const std::vector<uintptr_t> &buffers,
const std::vector<uintptr_t> &peerBufferAddresses,
const std::vector<size_t> &lengths)
{
internal::DumpProcessEnvironment("batch_transfer_sync_read_begin");
TE_CHECK_OR_RETURN(!buffers.empty(), ErrorCode::kInvalid, "buffers is empty");
TE_CHECK_OR_RETURN(buffers.size() == peerBufferAddresses.size() && buffers.size() == lengths.size(),
ErrorCode::kInvalid, "buffers/peerBufferAddresses/lengths size mismatch");
std::string peerHost;
uint16_t peerPort = 0;
TE_RETURN_IF_ERROR(ParseTargetHostname(targetHostname, &peerHost, &peerPort));
for (size_t i = 0; i < buffers.size(); ++i) {
TE_CHECK_OR_RETURN(buffers[i] > 0, ErrorCode::kInvalid, "buffer should be positive");
TE_CHECK_OR_RETURN(peerBufferAddresses[i] > 0, ErrorCode::kInvalid, "peerBufferAddress should be positive");
TE_CHECK_OR_RETURN(lengths[i] > 0, ErrorCode::kInvalid, "length should be positive");
}
std::string localHostSnapshot;
uint16_t localPortSnapshot = 0;
int32_t deviceIdSnapshot = -1;
uint64_t requestIdStart = 0;
{
std::lock_guard<std::mutex> lock(apiMutex_);
TE_CHECK_OR_RETURN(initialized_, ErrorCode::kNotReady, "transfer engine not initialized");
TE_CHECK_OR_RETURN(!finalizing_, ErrorCode::kNotReady, "transfer engine is finalizing");
localHostSnapshot = localHost_;
localPortSnapshot = localPort_;
deviceIdSnapshot = deviceId_;
requestIdStart = nextRequestId_;
nextRequestId_ += static_cast<uint64_t>(buffers.size());
++inFlightSyncReads_;
}
auto finishGuard = std::unique_ptr<void, std::function<void(void *)>>(
reinterpret_cast<void *>(1), [this](void *) {
std::lock_guard<std::mutex> lock(apiMutex_);
if (inFlightSyncReads_ > 0) {
--inFlightSyncReads_;
}
if (finalizing_ && inFlightSyncReads_ == 0) {
apiCv_.notify_all();
}
});
LOG(INFO) << "batch sync read begin"
<< ", item_count=" << buffers.size() << ", target_hostname=" << targetHostname
<< ", peer=" << peerHost << ":" << peerPort
<< ", request_id_start=" << requestIdStart;
int32_t ownerDeviceId = -1;
Result connRc = BuildConnectionIfNeeded(peerHost, peerPort, &ownerDeviceId);
if (connRc.IsError()) {
LOG(ERROR) << "batch sync read build connection failed, reason=" << connRc.ToString();
return connRc;
}
ConnectionSpec spec;
spec.localHost = localHostSnapshot;
spec.localPort = localPortSnapshot;
spec.localDeviceId = deviceIdSnapshot;
spec.peerHost = peerHost;
spec.peerPort = peerPort;
spec.peerDeviceId = ownerDeviceId;
for (size_t i = 0; i < buffers.size(); ++i) {
Result postRecvRc = backend_->PostRecv(spec, static_cast<uint64_t>(buffers[i]), static_cast<uint64_t>(lengths[i]));
if (postRecvRc.IsError()) {
LOG(ERROR) << "batch sync read post recv failed, item_index=" << i
<< ", reason=" << postRecvRc.ToString();
backend_->AbortConnection(spec);
ConnectionKey key{ deviceIdSnapshot, peerHost, peerPort, ownerDeviceId };
connMgr_->MarkStale(key);
return postRecvRc;
}
}
BatchReadTriggerRequest req;
req.requesterHost = localHostSnapshot;
req.requesterPort = localPortSnapshot;
req.requesterDeviceId = deviceIdSnapshot;
req.ownerDeviceId = ownerDeviceId;
req.items.reserve(buffers.size());
for (size_t i = 0; i < buffers.size(); ++i) {
BatchReadItem one;
one.requestId = requestIdStart + static_cast<uint64_t>(i);
one.remoteAddr = static_cast<uint64_t>(peerBufferAddresses[i]);
one.length = static_cast<uint64_t>(lengths[i]);
req.items.push_back(one);
}
BatchReadTriggerResponse rsp;
Result triggerRc = controlClient_->BatchReadTrigger(peerHost, peerPort, req, &rsp);
if (triggerRc.IsError()) {
LOG(ERROR) << "batch sync read rpc failed, reason=" << triggerRc.ToString();
backend_->AbortConnection(spec);
ConnectionKey key{ deviceIdSnapshot, peerHost, peerPort, ownerDeviceId };
connMgr_->MarkStale(key);
return triggerRc;
}
Result rpcStatus = RpcCodeToStatus(rsp.code, rsp.msg);
if (rpcStatus.IsError()) {
LOG(ERROR) << "batch sync read rpc returned error, failed_item_index=" << rsp.failedItemIndex
<< ", reason=" << rsp.msg;
backend_->AbortConnection(spec);
ConnectionKey key{ deviceIdSnapshot, peerHost, peerPort, ownerDeviceId };
connMgr_->MarkStale(key);
return rpcStatus;
}
for (size_t i = 0; i < buffers.size(); ++i) {
Result waitRc = backend_->WaitRecv(spec, kDefaultRecvWaitTimeoutMs);
if (waitRc.IsError()) {
LOG(ERROR) << "batch sync read wait recv failed, item_index=" << i
<< ", reason=" << waitRc.ToString();
ConnectionKey key{ deviceIdSnapshot, peerHost, peerPort, ownerDeviceId };
connMgr_->MarkStale(key);
return waitRc;
}
}
LOG(INFO) << "batch sync read success, item_count=" << buffers.size()
<< ", target_hostname=" << targetHostname;
return Result::OK();
}
Result TransferEngine::Finalize()
{
{
std::unique_lock<std::mutex> lock(apiMutex_);
if (!initialized_) {
return Result::OK();
}
LOG(INFO) << "transfer engine finalize start"
<< ", local_host=" << localHost_ << ", local_port=" << localPort_
<< ", device_id=" << deviceId_ << ", inflight_sync_reads=" << inFlightSyncReads_;
finalizing_ = true;
apiCv_.wait(lock, [this]() { return inFlightSyncReads_ == 0; });
}
if (controlServer_ != nullptr) {
controlServer_->Stop();
}
{
std::lock_guard<std::mutex> lock(apiMutex_);
{
std::lock_guard<std::mutex> endpointLock(endpointCacheMutex_);
state_->endpointOwnerDeviceCache.clear();
}
initialized_ = false;
finalizing_ = false;
localHost_.clear();
localPort_ = 0;
deviceId_ = -1;
rpcThreads_ = 0;
nextRequestId_ = 1;
controlService_.reset();
}
LOG(INFO) << "transfer engine finalize success";
google::FlushLogFiles(google::GLOG_INFO);
google::FlushLogFiles(google::GLOG_WARNING);
google::FlushLogFiles(google::GLOG_ERROR);
return Result::OK();
}
Result TransferEngine::BuildConnectionIfNeeded(const std::string &peerHost, uint16_t peerPort, int32_t *ownerDeviceId)
{
TE_CHECK_PTR_OR_RETURN(ownerDeviceId);
const std::string endpoint = peerHost + ":" + std::to_string(peerPort);
int32_t cachedOwnerDeviceId = -1;
{
std::lock_guard<std::mutex> lock(endpointCacheMutex_);
const auto cacheIter = state_->endpointOwnerDeviceCache.find(endpoint);
if (cacheIter != state_->endpointOwnerDeviceCache.end()) {
cachedOwnerDeviceId = cacheIter->second;
}
}
if (cachedOwnerDeviceId >= 0) {
ConnectionKey key{ deviceId_, peerHost, peerPort, cachedOwnerDeviceId };
if (connMgr_->HasReadyConnection(key)) {
LOG(INFO) << "reuse cached connection"
<< ", peer=" << peerHost << ":" << peerPort
<< ", owner_device_id=" << cachedOwnerDeviceId;
QueryConnReadyRequest queryReq;
queryReq.requesterHost = localHost_;
queryReq.requesterPort = localPort_;
queryReq.requesterDeviceId = deviceId_;
queryReq.ownerDeviceId = cachedOwnerDeviceId;
QueryConnReadyResponse queryRsp;
Result queryRc = controlClient_->QueryConnReady(peerHost, peerPort, queryReq, &queryRsp);
if (queryRc.IsOk()) {
Result rpcStatus = RpcCodeToStatus(queryRsp.code, queryRsp.msg);
if (rpcStatus.IsOk() && queryRsp.ready) {
*ownerDeviceId = cachedOwnerDeviceId;
return Result::OK();
}
}
LOG(WARNING) << "cached connection stale, rebuilding"
<< ", peer=" << peerHost << ":" << peerPort
<< ", owner_device_id=" << cachedOwnerDeviceId;
connMgr_->MarkStale(key);
std::lock_guard<std::mutex> lock(endpointCacheMutex_);
state_->endpointOwnerDeviceCache.erase(endpoint);
}
}
return BuildConnectionOnce(peerHost, peerPort, ownerDeviceId);
}
Result TransferEngine::BuildConnectionOnce(const std::string &peerHost, uint16_t peerPort, int32_t *ownerDeviceId)
{
TE_CHECK_PTR_OR_RETURN(ownerDeviceId);
std::lock_guard<std::mutex> lock(chainMutex_);
LOG(INFO) << "build connection start"
<< ", peer=" << peerHost << ":" << peerPort << ", device_id=" << deviceId_;
internal::DumpProcessEnvironment("build_connection_once_start");
std::string rootInfo;
Result createRootRc = backend_->CreateRootInfo(&rootInfo);
if (createRootRc.IsError()) {
LOG(ERROR) << "build connection create root info failed"
<< ", peer=" << peerHost << ":" << peerPort << ", reason=" << createRootRc.ToString();
return createRootRc;
}
ExchangeRootInfoRequest exchangeReq;
exchangeReq.requesterHost = localHost_;
exchangeReq.requesterPort = localPort_;
exchangeReq.requesterDeviceId = deviceId_;
exchangeReq.ownerDeviceId = -1;
exchangeReq.rootInfo = rootInfo;
ExchangeRootInfoResponse exchangeRsp;
Result exchangeRc = controlClient_->ExchangeRootInfo(peerHost, peerPort, exchangeReq, &exchangeRsp);
if (exchangeRc.IsError()) {
LOG(ERROR) << "build connection exchange root info rpc failed"
<< ", peer=" << peerHost << ":" << peerPort << ", reason=" << exchangeRc.ToString();
return exchangeRc;
}
Result exchangeRpcStatus = RpcCodeToStatus(exchangeRsp.code, exchangeRsp.msg);
if (exchangeRpcStatus.IsError()) {
LOG(ERROR) << "build connection exchange root info rejected"
<< ", peer=" << peerHost << ":" << peerPort << ", rsp_code=" << exchangeRsp.code
<< ", rsp_msg=" << exchangeRsp.msg;
return exchangeRpcStatus;
}
TE_CHECK_OR_RETURN(exchangeRsp.ownerDeviceId >= 0, ErrorCode::kRuntimeError, "owner_device_id is invalid");
ConnectionSpec spec;
spec.localHost = localHost_;
spec.localPort = localPort_;
spec.localDeviceId = deviceId_;
spec.peerHost = peerHost;
spec.peerPort = peerPort;
spec.peerDeviceId = exchangeRsp.ownerDeviceId;
Result initRecvRc = backend_->InitRecv(spec, rootInfo);
if (initRecvRc.IsError()) {
LOG(ERROR) << "build connection init recv failed"
<< ", peer=" << peerHost << ":" << peerPort
<< ", owner_device_id=" << exchangeRsp.ownerDeviceId << ", reason=" << initRecvRc.ToString();
return initRecvRc;
}
ConnectionKey key{ deviceId_, peerHost, peerPort, exchangeRsp.ownerDeviceId };
connMgr_->MarkRequesterRecvReady(key);
QueryConnReadyRequest queryReq;
queryReq.requesterHost = localHost_;
queryReq.requesterPort = localPort_;
queryReq.requesterDeviceId = deviceId_;
queryReq.ownerDeviceId = exchangeRsp.ownerDeviceId;
QueryConnReadyResponse queryRsp;
for (int32_t i = 0; i < kConnReadyRetryCount; ++i) {
TE_RETURN_IF_ERROR(controlClient_->QueryConnReady(peerHost, peerPort, queryReq, &queryRsp));
TE_RETURN_IF_ERROR(RpcCodeToStatus(queryRsp.code, queryRsp.msg));
if (queryRsp.ready) {
connMgr_->MarkOwnerSendReady(key);
*ownerDeviceId = exchangeRsp.ownerDeviceId;
{
std::lock_guard<std::mutex> lock(endpointCacheMutex_);
state_->endpointOwnerDeviceCache[peerHost + ":" + std::to_string(peerPort)] = exchangeRsp.ownerDeviceId;
}
return Result::OK();
}
std::this_thread::sleep_for(std::chrono::milliseconds(kConnReadyRetryIntervalMs));
}
connMgr_->MarkStale(key);
LOG(WARNING) << "build connection timeout waiting owner ready"
<< ", peer=" << peerHost << ":" << peerPort
<< ", owner_device_id=" << exchangeRsp.ownerDeviceId
<< ", retry_count=" << kConnReadyRetryCount;
return TE_MAKE_STATUS(ErrorCode::kNotReady,
"connection is not ready, peer=" + peerHost + ":" + std::to_string(peerPort) +
", device_id=" + std::to_string(deviceId_) +
", owner_device_id=" + std::to_string(exchangeRsp.ownerDeviceId));
}
std::string TransferEngine::CreateRootInfo() const
{
return "te_root_info_v0_1_0";
}
}