* Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <numeric>
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <utility>
#include "datasystem/common/constants.h"
#include "datasystem/common/flags/flags.h"
#include "datasystem/common/iam/tenant_auth_manager.h"
#include "datasystem/common/perf/perf_manager.h"
#include "datasystem/common/rpc/rpc_auth_key_manager.h"
#include "datasystem/common/rpc/rpc_stub_base.h"
#include "datasystem/common/rpc/rpc_stub_cache_mgr.h"
#include "datasystem/common/util/format.h"
#include "datasystem/common/util/gflag/common_gflags.h"
#include "datasystem/common/util/memory.h"
#include "datasystem/common/util/raii.h"
#include "datasystem/common/util/rpc_util.h"
#include "datasystem/common/util/status_helper.h"
#include "datasystem/protos/stream_posix.stub.rpc.pb.h"
#include "datasystem/stream/stream_config.h"
#include "datasystem/worker/stream_cache/metrics/sc_metrics_monitor.h"
#include "datasystem/worker/stream_cache/page_queue/shared_page_queue.h"
#include "datasystem/worker/stream_cache/remote_worker_manager.h"
#include "datasystem/worker/stream_cache/stream_manager.h"
DS_DEFINE_int32(remote_send_thread_num, 8, "The num of threads used to send elements to remote worker.");
DS_DEFINE_validator(remote_send_thread_num, &Validator::ValidateThreadNum);
namespace datasystem {
namespace worker {
namespace stream_cache {
std::string SendElementView::StreamName() const
{
return streamName_;
}
std::string SendElementView::ProducerName() const
{
return remoteWorker_;
}
std::string SendElementView::ProducerInstanceId() const
{
return "";
}
uint64_t SendElementView::StreamHash() const
{
StreamProducerKey key(ProducerName(), KeyName(), ProducerInstanceId());
return std::hash<StreamProducerKey>{}(key);
}
Status SendElementView::CreateSendElementView(const std::shared_ptr<StreamDataPage> &page,
const std::string &remoteWorker, DataElement &dataElement,
std::shared_ptr<PageQueueBase> obj,
RemoteWorkerManager *remoteWorkerManager,
std::shared_ptr<SendElementView> &out)
{
bool isSharedPage = dataElement.GetStreamNo() != 0;
std::shared_ptr<StreamElementView> elementView = std::make_shared<StreamElementView>();
std::string streamName;
if (isSharedPage) {
streamName = "";
Status rc = remoteWorkerManager->StreamNoToName(dataElement.GetStreamNo(), streamName);
VLOG_IF(SC_NORMAL_LOG_LEVEL, rc.IsError()) << rc.ToString();
} else {
streamName = obj->GetStreamName();
}
elementView->page_ = page;
elementView->streamName_ = streamName;
elementView->begCursor_ = dataElement.id;
elementView->remote_ = dataElement.IsRemote();
elementView->bigElement_ = dataElement.IsBigElement();
if (elementView->bigElement_) {
elementView->bigElementMetaSize_ = dataElement.size;
}
if (elementView->bigElement_ && !elementView->remote_) {
RETURN_IF_NOT_OK(obj->ExtractBigElement(dataElement, elementView->bigElementPage_));
VLOG(SC_NORMAL_LOG_LEVEL) << FormatString("[RW:%s, S:%s] Page<%s> Cursor %zu BigElement<%s>", remoteWorker,
elementView->streamName_, page->GetPageId(), dataElement.id,
elementView->bigElementPage_->GetPageId());
}
(void)elementView->PackDataElement(dataElement, true);
elementView->dataObj_ = obj;
if (isSharedPage) {
auto sharedPageView = std::make_shared<SharedPageElementView>();
sharedPageView->sharedPageName_ = obj->GetStreamName();
sharedPageView->remoteWorker_ = remoteWorker;
sharedPageView->traceId_ = Trace::Instance().GetTraceID();
sharedPageView->dataObj_ = obj;
sharedPageView->elementViews_.emplace_back(elementView);
out = std::static_pointer_cast<SendElementView>(std::move(sharedPageView));
} else {
elementView->remoteWorker_ = remoteWorker;
elementView->traceId_ = Trace::Instance().GetTraceID();
out = std::move(elementView);
}
return Status::OK();
}
Status StreamElementView::ReleasePage()
{
bool expected = true;
if (ref_.compare_exchange_strong(expected, false)) {
RETURN_IF_NOT_OK(page_->ReleasePage(FormatString("RW:%s, S:%s", remoteWorker_, StreamName())));
if (bigElement_) {
RETURN_IF_NOT_OK(dataObj_->DecBigElementPageRefCount(bigElementPage_->GetPageId()));
}
}
return Status::OK();
}
Status StreamElementView::IncRefCount()
{
RETURN_OK_IF_TRUE(ref_);
bool bigElementLocked = false;
Raii raii([&bigElementLocked, this]() {
if (!ref_ && bigElementLocked) {
auto pageId = bigElementPage_->GetPageId();
(void)dataObj_->DecBigElementPageRefCount(pageId);
}
});
if (bigElement_) {
auto pageId = bigElementPage_->GetPageId();
RETURN_IF_NOT_OK(dataObj_->IncBigElementPageRefCount(pageId));
bigElementLocked = true;
}
RETURN_IF_NOT_OK(page_->RefPage(FormatString("RW:%s, S:%s", remoteWorker_, StreamName())));
ref_ = true;
return Status::OK();
}
Status StreamElementView::MoveBufToAlternateMemory()
{
std::unique_lock<std::shared_timed_mutex> lock(mux_);
RETURN_OK_IF_TRUE(!shmEnabled_);
size_t totalSize = std::accumulate(sz_.begin(), sz_.end(), 0ul);
shmUnit_ = std::make_unique<ShmUnit>();
RETURN_IF_NOT_OK(shmUnit_->AllocateMemory(DEFAULT_TENANT_ID, totalSize, true, ServiceType::STREAM));
secondaryAddr_ = reinterpret_cast<uint8_t *>(shmUnit_->pointer);
RETURN_IF_NOT_OK(HugeMemoryCopy(secondaryAddr_, totalSize, buf_, totalSize));
RETURN_IF_NOT_OK(ReleasePage());
shmEnabled_ = false;
localBufSize_ = totalSize;
return Status::OK();
}
Status StreamElementView::MoveBufToShmUnit()
{
std::unique_lock<std::shared_timed_mutex> lock(mux_);
RETURN_OK_IF_TRUE(!shmEnabled_);
size_t totalSize = std::accumulate(sz_.begin(), sz_.end(), 0ul);
shmUnit_ = std::make_unique<ShmUnit>();
auto tenantId = TenantAuthManager::ExtractTenantId(streamName_);
RETURN_IF_NOT_OK(shmUnit_->AllocateMemory(tenantId, totalSize, true, ServiceType::STREAM));
secondaryAddr_ = reinterpret_cast<uint8_t *>(shmUnit_->pointer);
RETURN_IF_NOT_OK(HugeMemoryCopy(secondaryAddr_, totalSize, buf_, totalSize));
RETURN_IF_NOT_OK(ReleasePage());
shmEnabled_ = false;
localBufSize_ = totalSize;
return Status::OK();
}
uint8_t *StreamElementView::GetBufferPointer()
{
std::shared_lock<std::shared_timed_mutex> rlock(mux_);
bool shmEnabled = shmEnabled_.load();
uint8_t *ptr = shmEnabled ? buf_.load() : secondaryAddr_;
return ptr;
}
RemoteAckInfo::AckRange StreamElementView::GetAckRange()
{
return std::make_pair(begCursor_, sz_.size());
}
bool StreamElementView::IsSharedPage()
{
return false;
}
bool StreamElementView::PackDataElement(const DataElement &dataElement, bool skipChecks,
RemoteWorkerManager *remoteworkerManager)
{
(void)remoteworkerManager;
if (!skipChecks) {
if (bigElement_ || dataElement.IsBigElement() || dataElement.ptr + dataElement.size != buf_
|| dataElement.IsRemote() != remote_) {
return false;
}
}
buf_ = dataElement.ptr;
sz_.emplace_back(dataElement.size);
headerBits_.emplace_back(dataElement.HasHeader());
return true;
}
uint64_t StreamElementView::GetElementNum()
{
return sz_.size();
}
void StreamElementView::DiscardBufferFromList(std::list<BaseData> &dataLst, std::list<BaseData>::iterator &iter)
{
auto p = GetAckRange();
LOG(INFO) << FormatString("[S:%s] Discard range [%zu, %zu)", StreamName(), p.first, p.first + p.second);
if (!ref_) {
iter = dataLst.erase(iter);
return;
}
(void)dataObj_->UpdatePageRefIfExist(page_->GetShmView(), FormatString("S:%s", StreamName()), false);
if (bigElement_) {
(void)dataObj_->DecBigElementPageRefCount(bigElementPage_->GetPageId());
}
iter = dataLst.erase(iter);
}
std::string SharedPageElementView::KeyName() const
{
return sharedPageName_;
}
bool SharedPageElementView::IsSharedPage()
{
return true;
}
bool SharedPageElementView::PackDataElement(const DataElement &dataElement, bool skipChecks,
RemoteWorkerManager *remoteWorkerManager)
{
auto isDifferentStream = [this, &dataElement, remoteWorkerManager]() {
if (dataElement.GetStreamNo() == 0) {
return false;
}
std::string streamNameFromNumber;
(void)remoteWorkerManager->StreamNoToName(dataElement.GetStreamNo(), streamNameFromNumber);
auto streamName = elementViews_.back()->StreamName();
return streamNameFromNumber != streamName;
};
if (isDifferentStream()) {
return false;
}
return elementViews_.back()->PackDataElement(dataElement, skipChecks);
}
uint64_t SharedPageElementView::RecordSeqNo(std::function<uint64_t(const std::string &)> fetchAddSeqNo)
{
for (auto &view : elementViews_) {
seqNums_.emplace_back(view->RecordSeqNo(fetchAddSeqNo));
}
return fetchAddSeqNo(sharedPageName_);
}
Status SharedPageElementView::ReleasePage()
{
for (auto &view : elementViews_) {
RETURN_IF_NOT_OK(view->ReleasePage());
}
return Status::OK();
}
Status SharedPageElementView::IncRefCount()
{
for (auto &view : elementViews_) {
RETURN_IF_NOT_OK(view->IncRefCount());
}
return Status::OK();
}
RemoteAckInfo::AckRange SharedPageElementView::GetAckRange()
{
uint64_t begCursor = elementViews_.front()->begCursor_;
return std::make_pair(begCursor, GetElementNum());
}
uint64_t SharedPageElementView::GetElementNum()
{
uint64_t totalNum = 0;
for (auto &view : elementViews_) {
totalNum += view->sz_.size();
}
return totalNum;
}
void SharedPageElementView::DiscardBufferFromList(std::list<BaseData> &dataLst, std::list<BaseData>::iterator &iter)
{
elementViews_.front()->DiscardBufferFromList(dataLst, iter);
}
Status SharedPageElementView::MoveBufToShmUnit()
{
RETURN_IF_NOT_OK(elementViews_.front()->MoveBufToShmUnit());
return Status::OK();
}
RemoteWorker::RemoteWorker(HostPort localAddress, HostPort remoteAddress, std::shared_ptr<AkSkManager> akSkManager,
ClientWorkerSCServiceImpl *scSvc, std::string &workerInstanceId,
std::shared_ptr<WorkerSCAllocateMemory> scAllocateManager, RemoteWorkerManager *manager)
: localWorkerAddr_(std::move(localAddress)),
remoteWorkerAddr_(remoteAddress),
akSkManager_(std::move(akSkManager)),
scSvc_(scSvc),
sharedPageGroup_(std::move(remoteAddress), std::move(scAllocateManager), scSvc),
workerInstanceId_(workerInstanceId), remoteWorkerManager_(manager)
{
}
Status RemoteWorker::Init()
{
return Status::OK();
}
RemoteWorker::~RemoteWorker()
{
LOG(INFO) << "Start Destroy RemoteWorker for remote worker:" << remoteWorkerAddr_.ToString();
auto pages = sharedPageGroup_.GetAllSharedPageName();
for (auto &page : pages) {
remoteWorkerManager_->RemoveStream(page, "");
}
}
Status RemoteWorker::GetAccessor(const std::string &streamName, RemoteStreamInfoTbbMap::accessor &accessor)
{
return remoteConsumers_.GetAccessor(streamName, accessor, LogPrefix());
}
Status RemoteWorker::AddRemoteConsumer(const std::string &streamName, const SubscriptionConfig &subConfig,
const std::string &consumerId, uint64_t windowCount, uint64_t lastAckCursor)
{
if (subConfig.subscriptionType != SubscriptionType::STREAM) {
RETURN_STATUS(StatusCode::K_INVALID,
FormatString("Only support STREAM mode. <%s> mode not supported.", subConfig.subscriptionName));
}
RETURN_IF_NOT_OK_EXCEPT(remoteConsumers_.AddConsumer(streamName, consumerId, windowCount, lastAckCursor),
K_DUPLICATED);
VLOG(SC_NORMAL_LOG_LEVEL) << FormatString("[%s, S:%s, C:%s], Add remote consumer success", LogPrefix(), streamName,
consumerId);
return Status::OK();
}
Status RemoteWorker::DelRemoteConsumer(const std::string &streamName, const std::string &consumerId,
Optional<bool> &mapEmpty)
{
RETURN_IF_NOT_OK(remoteConsumers_.DeleteConsumer(streamName, consumerId, mapEmpty));
VLOG(SC_NORMAL_LOG_LEVEL) << FormatString("[%s, S:%s, C:%s] Delete remote consumer success", LogPrefix(),
streamName, consumerId);
return Status::OK();
}
bool RemoteWorker::HasRemoteConsumers(const std::string &streamName)
{
return remoteConsumers_.HasRemoteConsumers(streamName);
}
bool RemoteWorker::IsStreamSendBlocked(const std::string &streamName)
{
return remoteConsumers_.IsStreamSendBlocked(streamName);
}
uint64_t RemoteWorker::GetMaxWindowCount(const std::string &streamName) const
{
return remoteConsumers_.GetMaxWindowCount(streamName);
}
Status RemoteWorker::DeleteStream(const std::string &streamName, Optional<bool> &mapEmpty)
{
LOG(INFO) << FormatString("[%s] ClearAllRemoteConsumer for stream %s", LogPrefix(), streamName);
return remoteConsumers_.DeleteStream(streamName, mapEmpty);
}
void RemoteWorker::PostRecvCleanup(const std::string &streamName, const Status &status,
PendingFlushList &pendingFlushList, const PushReqPb &pushReq,
const PushRspPb &pushRspPb, std::unordered_map<std::string, StreamRaii> &raii)
{
const std::string &producerId = pushReq.producer_id();
const std::string workerInstanceId = "";
TraceGuard traceGuard = Trace::Instance().SetTraceNewID(pushReq.trace_id());
StreamProducerKey key(streamName, producerId, workerInstanceId);
auto iter = std::find_if(pendingFlushList.begin(), pendingFlushList.end(),
[key](const auto &kv) { return kv.first == key; });
if (status.GetCode() == K_SC_CONSUMER_NOT_FOUND) {
if (iter != pendingFlushList.end()) {
VLOG(SC_INTERNAL_LOG_LEVEL) << "No consumer found: Discarding buffers for stream: " << streamName;
RemoteStreamInfoTbbMap::accessor accessor;
if (GetAccessor(streamName, accessor).IsOk()) {
std::for_each(iter->second.begin(), iter->second.end(), [this, &accessor](const auto &kv) {
auto remoteElementView = std::static_pointer_cast<StreamElementView>(kv.first);
auto p = remoteElementView->GetAckRange();
SyncStreamLastAckCursor(accessor, Optional<RemoteAckInfo::AckRange>(p));
});
}
DiscardBuffers(iter->second);
}
return;
}
if (status.IsError() && status.GetCode() != K_OUT_OF_MEMORY) {
RecordRemoteSendSuccess(false);
return;
}
if (iter != pendingFlushList.end()) {
RemoteStreamInfoTbbMap::accessor accessor;
auto rc = GetAccessor(streamName, accessor);
if (rc.IsError()) {
LOG(ERROR) << FormatString("[%s, S:%s] Stream not found", LogPrefix(), streamName);
return;
}
auto streamMgr = (*(raii.find(streamName)->second))->second;
TraceGuard traceGuard = Trace::Instance().SetTraceNewID(pushReq.trace_id());
PostRecvCleanup(streamName, pushReq, pushRspPb, accessor, iter->second, raii);
accessor.release();
LOG_IF_ERROR(streamMgr->RemoteAck(), FormatString("[%s, S:%s] Remote ack failed", LogPrefix(), streamName));
}
}
void RemoteWorker::PostRecvCleanup(const std::string &streamName, const PushReqPb &rq, const PushRspPb &pushRspPb,
RemoteStreamInfoTbbMap::accessor &accessor, std::list<BaseData> &dataLst,
std::unordered_map<std::string, StreamRaii> &streamRaii)
{
uint64_t releaseSize = 0;
Raii raii([&releaseSize, &streamName, &streamRaii]() {
if (releaseSize > 0) {
auto itr = streamRaii.find(streamName);
if (itr == streamRaii.end()) {
LOG(ERROR) << FormatString(
"Decrease shared memory usage for stream[%s] failed because no worker area was found", streamName);
return;
}
LOG_IF_ERROR(
(*(itr->second))->second->TryDecUsage(releaseSize),
FormatString("Decrease shared memory usage for stream[%s] failed during push element to remote worker",
streamName));
}
});
for (auto i = 0; i < pushRspPb.error_size(); ++i) {
auto &err = pushRspPb.error(i);
auto seqNo = rq.seq(i);
auto it = std::find_if(dataLst.begin(), dataLst.end(), [seqNo](const auto &kv) { return kv.second == seqNo; });
if (it == dataLst.end()) {
LOG(ERROR) << FormatString("[%s, S:%s] Unable to find seqNo %zu", LogPrefix(), streamName, seqNo);
continue;
}
auto status = Status(static_cast<StatusCode>(err.error_code()), err.error_msg());
auto remoteElementView = std::static_pointer_cast<StreamElementView>(it->first);
auto p = remoteElementView->GetAckRange();
auto begCursor = p.first;
auto endCursor = begCursor + p.second;
if (status.IsOk()) {
const int logPerCount = VLOG_IS_ON(SC_INTERNAL_LOG_LEVEL) ? 1 : 1000;
LOG_EVERY_N(INFO, logPerCount) << FormatString(
"[%s, S:%s] Remote send elements [seq:%zu] [%zu, %zu) to remote worker %s is successful. Ref count %zu",
LogPrefix(), streamName, seqNo, begCursor, endCursor, remoteWorkerAddr_.ToString(),
remoteElementView.use_count());
SyncStreamLastAckCursor(accessor, Optional<RemoteAckInfo::AckRange>(p));
remoteElementView->ReleasePage();
releaseSize = std::accumulate(rq.element_meta(i).element_sizes().begin(),
rq.element_meta(i).element_sizes().end(), releaseSize);
if (std::static_pointer_cast<StreamElementView>(it->first)->bigElement_) {
releaseSize += std::static_pointer_cast<StreamElementView>(it->first)->bigElementMetaSize_;
}
it = dataLst.erase(it);
RecordRemoteSendSuccess(true);
continue;
}
LOG(INFO) << FormatString(
"[%s, S:%s, I:%s] Remote send elements [seq:%zu] [%zu, %zu) to remote worker %s gave status: %s",
LogPrefix(), streamName, workerInstanceId_, seqNo, begCursor, endCursor, remoteWorkerAddr_.ToString(),
status.ToString());
if (status.GetCode() != K_OUT_OF_MEMORY) {
RecordRemoteSendSuccess(false);
continue;
}
Status allocRc = remoteElementView->MoveBufToAlternateMemory();
if (allocRc.IsError()) {
LOG(ERROR) << FormatString("[%s, S:%s] Cursor [%zu, %zu) MoveBufToAlternateMemory failed. %s", LogPrefix(),
streamName, begCursor, endCursor, allocRc.ToString());
continue;
}
auto lastAckCursor = std::get<K_ACK>(accessor->second).GetStreamLastAckCursor();
SyncStreamLastAckCursor(accessor, Optional<RemoteAckInfo::AckRange>(p));
if (lastAckCursor < std::get<K_ACK>(accessor->second).GetStreamLastAckCursor()) {
VLOG(SC_INTERNAL_LOG_LEVEL) << FormatString(
"[%s, S:%s] Cursor [%zu, %zu) MoveBufToAlternateMemory. Ref count %zu", LogPrefix(), streamName,
begCursor, endCursor, remoteElementView.use_count());
}
}
}
void RemoteWorker::PostRecvCleanup(const std::string &keyName, const Status &status, PendingFlushList &pendingFlushList,
const SharedPagePushReqPb &pushReq, const PushRspPb &pushRspPb,
std::unordered_map<std::string, StreamRaii> &raii)
{
(void)status;
const std::string &producerId = pushReq.producer_id();
const std::string workerInstanceId = "";
TraceGuard traceGuard = Trace::Instance().SetTraceNewID(pushReq.trace_id());
StreamProducerKey key(keyName, producerId, workerInstanceId);
auto iter = std::find_if(pendingFlushList.begin(), pendingFlushList.end(),
[key](const auto &kv) { return kv.first == key; });
if (iter != pendingFlushList.end()) {
RemoteStreamInfoTbbMap::accessor accessor;
auto rc = GetAccessor(keyName, accessor);
if (rc.IsError()) {
LOG(ERROR) << FormatString("[%s, S:%s] Stream not found", LogPrefix(), keyName);
return;
}
TraceGuard traceGuard = Trace::Instance().SetTraceNewID(pushReq.trace_id());
auto sharedPageQueue = std::static_pointer_cast<SharedPageQueue>(
std::static_pointer_cast<SendElementView>(iter->second.front().first)->dataObj_);
PostRecvCleanup(keyName, pushReq, pushRspPb, accessor, iter->second, raii);
accessor.release();
LOG_IF_ERROR(sharedPageQueue->RemoteAck(), FormatString("[%s, S:%s] Remote ack failed", LogPrefix(), keyName));
}
}
void RemoteWorker::PostRecvCleanup(const std::string &keyName, const SharedPagePushReqPb &rq,
const PushRspPb &pushRspPb, RemoteStreamInfoTbbMap::accessor &accessor,
std::list<BaseData> &dataLst, std::unordered_map<std::string, StreamRaii> &raii)
{
std::unordered_map<std::string, uint64_t> streamName2BytesNumSendSuccess;
for (auto i = 0; i < pushRspPb.error_size(); ++i) {
auto &err = pushRspPb.error(i);
auto seqNo = rq.metas(i).seq();
const std::string &streamName = rq.stream_names(rq.metas(i).stream_index());
auto it = std::find_if(dataLst.begin(), dataLst.end(), [seqNo, &streamName](const auto &kv) {
auto sharedPageView = std::static_pointer_cast<SharedPageElementView>(kv.first);
for (auto seqIter = sharedPageView->seqNums_.begin(), viewIter = sharedPageView->elementViews_.begin();
seqIter != sharedPageView->seqNums_.end(); seqIter++, viewIter++) {
if (*seqIter == seqNo && (*viewIter)->StreamName() == streamName) {
return true;
}
}
return false;
});
if (it == dataLst.end()) {
LOG(ERROR) << FormatString("[%s, S:%s] Unable to find seqNo %zu", LogPrefix(), keyName, seqNo);
continue;
}
auto status = Status(static_cast<StatusCode>(err.error_code()), err.error_msg());
auto remoteElementView = std::static_pointer_cast<SharedPageElementView>(it->first);
auto p = remoteElementView->GetAckRange();
auto begCursor = p.first;
auto endCursor = begCursor + p.second;
if (status.IsOk()) {
const int logPerCount = VLOG_IS_ON(SC_INTERNAL_LOG_LEVEL) ? 1 : 1000;
LOG_EVERY_N(INFO, logPerCount) << FormatString(
"[%s, S:%s] Remote send elements [seq:%zu] [%zu, %zu) to remote worker %s is successful. Ref count %zu",
LogPrefix(), keyName, seqNo, begCursor, endCursor, remoteWorkerAddr_.ToString(),
remoteElementView.use_count());
streamName2BytesNumSendSuccess[streamName] = std::accumulate(
rq.metas(i).element_meta().element_sizes().begin(), rq.metas(i).element_meta().element_sizes().end(),
streamName2BytesNumSendSuccess[streamName]);
if (remoteElementView->elementViews_.back()->bigElement_) {
streamName2BytesNumSendSuccess[streamName] +=
remoteElementView->elementViews_.back()->bigElementMetaSize_;
}
SyncStreamLastAckCursor(accessor, Optional<RemoteAckInfo::AckRange>(p));
remoteElementView->ReleasePage();
it = dataLst.erase(it);
RecordRemoteSendSuccess(true);
continue;
}
LOG(INFO) << FormatString(
"[%s, S:%s, I:%s] Remote send elements [seq:%zu] [%zu, %zu) to remote worker %s gave status: %s",
LogPrefix(), keyName, workerInstanceId_, seqNo, begCursor, endCursor, remoteWorkerAddr_.ToString(),
status.ToString());
if (status.GetCode() == K_SC_CONSUMER_NOT_FOUND) {
VLOG(SC_INTERNAL_LOG_LEVEL) << "No consumer found: Discarding one buffer from shared page: " << keyName;
SyncStreamLastAckCursor(accessor, Optional<RemoteAckInfo::AckRange>(p));
DiscardBufferHelper(dataLst, it);
}
if (status.GetCode() != K_OUT_OF_MEMORY) {
RecordRemoteSendSuccess(false);
continue;
}
Status allocRc = remoteElementView->MoveBufToShmUnit();
if (allocRc.IsError()) {
LOG(ERROR) << FormatString("[%s, S:%s] Cursor [%zu, %zu) MoveBufToShmUnit failed. %s", LogPrefix(),
streamName, begCursor, endCursor, allocRc.ToString());
continue;
}
SyncStreamLastAckCursor(accessor, Optional<RemoteAckInfo::AckRange>(p));
}
for (const auto &kv : streamName2BytesNumSendSuccess) {
auto itr = raii.find(kv.first);
if (itr == raii.end()) {
LOG(ERROR) << FormatString(
"Decrease shared memory usage for stream[%s] failed because no worker area was found", kv.first);
continue;
}
LOG_IF_ERROR(
(*(itr->second))->second->TryDecUsage(kv.second),
FormatString("Decrease shared memory usage for stream[%s] failed during push element to remote worker",
kv.first));
}
}
void RemoteWorker::DiscardBuffers(std::list<BaseData> &dataLst)
{
auto iter = dataLst.begin();
while (iter != dataLst.end()) {
DiscardBufferHelper(dataLst, iter);
}
dataLst.clear();
}
void RemoteWorker::DiscardBufferHelper(std::list<BaseData> &dataLst, std::list<BaseData>::iterator &iter)
{
auto remoteElementView = std::static_pointer_cast<SendElementView>(iter->first);
remoteElementView->DiscardBufferFromList(dataLst, iter);
}
Status RemoteWorker::ParseProducerPendingFlushList(const std::string &streamName, const std::string &producerId,
std::list<BaseData> &dataLst, std::vector<PushReq> &requests,
std::vector<std::vector<MemView>> &payloads,
std::unordered_map<std::string, StreamRaii> &raii,
std::list<std::shared_ptr<SharedPageElementView>> &moveList,
std::unordered_set<std::shared_ptr<SharedPageQueue>> &needAckList)
{
RETURN_OK_IF_TRUE(dataLst.empty());
uint64_t firstSeqNo = dataLst.begin()->second;
VLOG(SC_INTERNAL_LOG_LEVEL) << FormatString("Processing pending send Flush List [%s, %s, %zu]", streamName,
producerId, firstSeqNo);
uint64_t windowCount = GetMaxWindowCount(streamName);
auto firstCursor = std::numeric_limits<uint64_t>::max();
auto lastCursor = std::numeric_limits<uint64_t>::min();
auto it = dataLst.begin();
bool sharedPage = std::static_pointer_cast<StreamElementView>(it->first)->IsSharedPage();
std::shared_ptr<SharedPageQueue> backupSharedPage =
sharedPage ? std::static_pointer_cast<SharedPageQueue>(
std::static_pointer_cast<StreamElementView>(it->first)->dataObj_)
: nullptr;
bool needAck = true;
while (it != dataLst.end() && windowCount-- > 0) {
std::variant<PushReqPb, SharedPagePushReqPb> pushReq;
std::vector<MemView> elements;
if (!sharedPage) {
PushReqPb pushReqPb;
RETURN_IF_NOT_OK(FillExclusivePushReqHelper(streamName, producerId, firstSeqNo, dataLst, it, firstCursor,
lastCursor, pushReqPb, elements, raii));
pushReq = std::move(pushReqPb);
} else {
SharedPagePushReqPb pushReqPb;
Status rc = FillSharedPushReqHelper(producerId, dataLst, it, firstCursor, lastCursor, pushReqPb, elements,
raii, moveList);
if (rc.GetCode() == K_SC_STREAM_NOT_FOUND) {
RemoteStreamInfoTbbMap::accessor accessor;
if (GetAccessor(streamName, accessor).IsOk()) {
auto remoteElementView = std::static_pointer_cast<StreamElementView>(it->first);
auto p = remoteElementView->GetAckRange();
SyncStreamLastAckCursor(accessor, Optional<RemoteAckInfo::AckRange>(p));
}
DiscardBufferHelper(dataLst, it);
continue;
} else if (rc.GetCode() == K_NOT_READY) {
continue;
}
RETURN_IF_NOT_OK(rc);
pushReq = std::move(pushReqPb);
}
requests.emplace_back(std::move(pushReq), streamName);
payloads.push_back(std::move(elements));
needAck = false;
}
if (needAck) {
needAckList.emplace(backupSharedPage);
}
VLOG(SC_INTERNAL_LOG_LEVEL) << FormatString("[%s, S:%s] Ack range [%zu, %zu]. Window size %zu", LogPrefix(),
streamName, firstCursor, lastCursor, requests.size());
return Status::OK();
}
Status RemoteWorker::FillExclusivePushReqHelper(const std::string &streamName, const std::string &producerId,
uint64_t firstSeqNo, std::list<BaseData> &dataLst,
std::list<BaseData>::iterator &it, uint64_t &firstCursor,
uint64_t &lastCursor, PushReqPb &pushReqPb,
std::vector<MemView> &elements,
std::unordered_map<std::string, StreamRaii> &raii)
{
RETURN_IF_NOT_OK(LockStreamManagerHelper(streamName, raii));
pushReqPb.set_stream_name(streamName);
pushReqPb.set_producer_id(producerId);
pushReqPb.set_worker_addr(localWorkerAddr_.ToString());
pushReqPb.set_first_seq(firstSeqNo);
pushReqPb.set_worker_instance_id(workerInstanceId_);
TraceGuard traceGuard = Trace::Instance().SetTraceNewID(it->first->traceId_);
pushReqPb.set_trace_id(Trace::Instance().GetTraceID());
size_t chunkSz = 0;
const size_t zmqChunkSz = static_cast<size_t>(FLAGS_zmq_chunk_sz);
do {
auto seqNo = it->second;
auto streamElementView = std::static_pointer_cast<StreamElementView>(it->first);
auto &eleSzs = streamElementView->sz_;
size_t payloadSz = std::accumulate(eleSzs.begin(), eleSzs.end(), 0ul);
if ((chunkSz > 0 && ((payloadSz > zmqChunkSz) || chunkSz > zmqChunkSz - payloadSz))) {
break;
}
chunkSz += payloadSz;
auto *ele = pushReqPb.mutable_element_meta()->Add();
ele->mutable_element_sizes()->Add(eleSzs.begin(), eleSzs.end());
auto &headerBits = streamElementView->headerBits_;
ele->mutable_header_bits()->Add(headerBits.begin(), headerBits.end());
pushReqPb.mutable_seq()->Add(seqNo);
elements.emplace_back(streamElementView->GetBufferPointer(), payloadSz);
const int logPerCount = VLOG_IS_ON(SC_INTERNAL_LOG_LEVEL) ? 1 : 1000;
LOG_EVERY_N(INFO, logPerCount) << FormatString(
"[%s, S:%s, I:%s] Remote send elements [seq:%zu] [%zu, %zu) to remote worker %s, page: %s", LogPrefix(),
streamName, workerInstanceId_, seqNo, streamElementView->begCursor_,
streamElementView->begCursor_ + streamElementView->sz_.size(), remoteWorkerAddr_.ToString(),
streamElementView->page_->GetPageId());
firstCursor = std::min(firstCursor, streamElementView->begCursor_);
lastCursor = std::max(lastCursor, streamElementView->begCursor_ + streamElementView->sz_.size() - 1);
++it;
} while (it != dataLst.end());
pushReqPb.set_chunk_size(chunkSz);
RETURN_IF_NOT_OK(akSkManager_->GenerateSignature(pushReqPb));
return Status::OK();
}
Status RemoteWorker::FillSharedPushReqHelper(const std::string &producerId, std::list<BaseData> &dataLst,
std::list<BaseData>::iterator &it, uint64_t &firstCursor,
uint64_t &lastCursor, SharedPagePushReqPb &pushReqPb,
std::vector<MemView> &elements,
std::unordered_map<std::string, StreamRaii> &raii,
std::list<std::shared_ptr<SharedPageElementView>> &moveList)
{
std::unordered_map<std::string, uint64_t> streamIndexMapping;
std::unordered_map<std::string, bool> streamBlockInfoMap;
pushReqPb.set_producer_id(producerId);
pushReqPb.set_worker_addr(localWorkerAddr_.ToString());
pushReqPb.set_worker_instance_id(workerInstanceId_);
TraceGuard traceGuard = Trace::Instance().SetTraceNewID(it->first->traceId_);
pushReqPb.set_trace_id(Trace::Instance().GetTraceID());
bool requestReady = false;
size_t chunkSz = 0;
const size_t zmqChunkSz = static_cast<size_t>(FLAGS_zmq_chunk_sz);
do {
auto sharedPageElementView = std::static_pointer_cast<SharedPageElementView>(it->first);
auto streamElementView = sharedPageElementView->elementViews_.front();
auto seqNo = sharedPageElementView->seqNums_.front();
const std::string &streamName = streamElementView->streamName_;
RETURN_IF_NOT_OK(LockStreamManagerHelper(streamName, raii));
auto &eleSzs = streamElementView->sz_;
size_t payloadSz = std::accumulate(eleSzs.begin(), eleSzs.end(), 0ul);
auto streamBlockInfo = streamBlockInfoMap.find(streamName);
if (streamBlockInfo == streamBlockInfoMap.end()) {
streamBlockInfo = streamBlockInfoMap.emplace(streamName, IsStreamSendBlocked(streamName)).first;
}
if (streamBlockInfo->second) {
++it;
moveList.emplace_back(sharedPageElementView);
continue;
}
auto iter = streamIndexMapping.find(streamName);
if (iter == streamIndexMapping.end()) {
iter = streamIndexMapping.emplace(streamName, streamIndexMapping.size()).first;
pushReqPb.mutable_stream_names()->Add(streamName.c_str());
}
if ((chunkSz > 0 && ((payloadSz > zmqChunkSz) || (chunkSz + payloadSz) > zmqChunkSz))) {
break;
}
chunkSz += payloadSz;
auto *meta = pushReqPb.mutable_metas()->Add();
meta->set_stream_index(iter->second);
meta->set_seq(seqNo);
auto *ele = meta->mutable_element_meta();
ele->mutable_element_sizes()->Add(eleSzs.begin(), eleSzs.end());
auto &headerBits = streamElementView->headerBits_;
ele->mutable_header_bits()->Add(headerBits.begin(), headerBits.end());
elements.emplace_back(streamElementView->GetBufferPointer(), payloadSz);
const int logPerCount = VLOG_IS_ON(SC_INTERNAL_LOG_LEVEL) ? 1 : 1000;
LOG_EVERY_N(INFO, logPerCount) << FormatString(
"[%s, S:%s, I:%s] Remote send elements [seq:%zu] [%zu, %zu) to remote worker %s, page: %s", LogPrefix(),
streamName, workerInstanceId_, seqNo, streamElementView->begCursor_,
streamElementView->begCursor_ + streamElementView->sz_.size(), remoteWorkerAddr_.ToString(),
streamElementView->page_->GetPageId());
firstCursor = std::min(firstCursor, streamElementView->begCursor_);
lastCursor = std::max(lastCursor, streamElementView->begCursor_ + streamElementView->sz_.size() - 1);
++it;
requestReady = true;
} while (it != dataLst.end());
CHECK_FAIL_RETURN_STATUS(requestReady, K_NOT_READY, "All element views are skipped, request is not ready");
RETURN_IF_NOT_OK(akSkManager_->GenerateSignature(pushReqPb));
return Status::OK();
}
Status RemoteWorker::LockStreamManagerHelper(const std::string &streamName,
std::unordered_map<std::string, StreamRaii> &raii)
{
if (raii.find(streamName) == raii.end()) {
StreamRaii rlock = std::make_unique<StreamManagerMap::const_accessor>();
RETURN_IF_NOT_OK(scSvc_->GetStreamManager(streamName, *rlock));
std::shared_ptr<StreamManager> streamMgr = (*rlock)->second;
RETURN_IF_NOT_OK(streamMgr->CheckIfStreamActive());
raii.emplace(streamName, std::move(rlock));
}
return Status::OK();
}
Status RemoteWorker::ProcessEndOfStream(const std::shared_ptr<StreamManager> &streamMgr, std::list<BaseData> dataLst,
const std::string &streamName, const std::string &producerId)
{
(void)streamMgr;
(void)producerId;
RemoteStreamInfoTbbMap::accessor accessor;
Status rc = GetAccessor(streamName, accessor);
if (rc.IsOk()) {
std::get<K_ACK>(accessor->second).Reset();
accessor.release();
}
DiscardBuffers(dataLst);
RETURN_IF_NOT_OK_EXCEPT(remoteConsumers_.ToggleStreamBlocking(streamName, false), K_SC_STREAM_NOT_FOUND);
return Status::OK();
}
Status RemoteWorker::ParsePendingFlushList(const PendingFlushList &pendingFlushList, std::vector<PushReq> &requests,
std::vector<std::vector<MemView>> &payloads,
std::unordered_map<std::string, StreamRaii> &raii,
std::list<std::shared_ptr<SharedPageElementView>> &moveList,
std::unordered_set<std::shared_ptr<SharedPageQueue>> &needAckList)
{
for (const auto &ele : pendingFlushList) {
const StreamProducerKey key = ele.first;
std::list<BaseData> &dataLst = ele.second;
const std::string &streamName = key.firstKey_;
const std::string &producerId = key.producerId_;
if (IsStreamSendBlocked(streamName)) {
VLOG(SC_NORMAL_LOG_LEVEL) << FormatString("[%s] Ignore stream %s producer %s", LogPrefix(), streamName,
producerId);
continue;
}
if (dataLst.empty()) {
continue;
}
Status rc = ParseProducerPendingFlushList(streamName, producerId, dataLst, requests, payloads, raii, moveList,
needAckList);
if (rc.GetCode() == K_SC_STREAM_NOT_FOUND || rc.GetCode() == K_SC_STREAM_DELETE_IN_PROGRESS
|| rc.GetCode() == K_SC_STREAM_IN_RESET_STATE) {
continue;
}
RETURN_IF_NOT_OK(rc);
}
return Status::OK();
}
int RemoteWorker::BatchFlushAsyncWrite(const std::shared_ptr<WorkerWorkerSCService_Stub> &stub,
std::vector<PushReq> &requests, std::vector<std::vector<MemView>> &payloads)
{
int numReqSent = 0;
for (size_t i = 0; i < requests.size(); ++i) {
Status &status = requests.at(i).rc_;
auto &pushReq = requests.at(i).req_;
const auto visitor = [&](auto &&pushReqPb) {
TraceGuard traceGuard = Trace::Instance().SetTraceNewID(pushReqPb.trace_id());
if constexpr (std::is_same_v<std::decay_t<decltype(pushReqPb)>, PushReqPb>) {
VLOG(SC_DEBUG_LOG_LEVEL) << FormatString("Calling PushElementsCursorsAsyncWrite for %s with %zu PV",
pushReqPb.stream_name(), pushReqPb.element_meta_size());
status = stub->PushElementsCursorsAsyncWrite(pushReqPb, requests.at(i).tag_, payloads.at(i));
} else {
VLOG(SC_DEBUG_LOG_LEVEL) << FormatString(
"Calling PushSharedPageCursorsAsyncWrite for shared page with %zu PV", pushReqPb.metas_size());
status = stub->PushSharedPageCursorsAsyncWrite(pushReqPb, requests.at(i).tag_, payloads.at(i));
}
};
PerfPoint point(PerfKey::REMOTE_WORKER_SEND_ONE_STREAM);
std::visit(visitor, pushReq);
numReqSent++;
}
VLOG(SC_DEBUG_LOG_LEVEL) << FormatString("Number of outstanding PushElementsCursorsAsyncWrite request %d",
numReqSent);
return numReqSent;
}
void RemoteWorker::BatchFlushAsyncRead(const std::shared_ptr<WorkerWorkerSCService_Stub> &stub,
PendingFlushList &pendingFlushList, std::vector<PushReq> &requests,
std::unordered_map<std::string, StreamRaii> &raii)
{
size_t numAsync = requests.size();
for (size_t i = 0; i < numAsync; ++i) {
Status &status = requests.at(i).rc_;
if (status.IsError()) {
continue;
}
const auto visitor = [&](auto &&pushReq) {
TraceGuard traceGuard = Trace::Instance().SetTraceNewID(pushReq.trace_id());
PushRspPb pushRspPb;
PerfPoint point(PerfKey::REMOTE_WORKER_MAIN_RECV);
INJECT_POINT("RemoteWorker.SleepBeforeAsyncRead", [](uint64_t timeoutMs) -> void {
std::this_thread::sleep_for(std::chrono::milliseconds(timeoutMs));
return;
});
if constexpr (std::is_same_v<std::decay_t<decltype(pushReq)>, PushReqPb>) {
status = stub->PushElementsCursorsAsyncRead(requests.at(i).tag_, pushRspPb, RpcRecvFlags::NONE);
} else {
status = stub->PushSharedPageCursorsAsyncRead(requests.at(i).tag_, pushRspPb, RpcRecvFlags::NONE);
}
point.Record();
INJECT_POINT_NO_RETURN("RemoteWorker.BatchFlushAsyncRead.rpc.timeout", [&status]() {
status = { K_RPC_UNAVAILABLE, "Fake worker not responding" };
});
VLOG(SC_DEBUG_LOG_LEVEL) << FormatString("PushElementsCursorsAsyncRead rc for stream %s: %s",
requests.at(i).keyName_, status.ToString());
PostRecvCleanup(requests.at(i).keyName_, status, pendingFlushList, pushReq, pushRspPb, raii);
};
std::visit(visitor, requests.at(i).req_);
}
}
void RemoteWorker::HandleBlockedElements(std::list<std::shared_ptr<SharedPageElementView>> &moveList,
std::unordered_set<std::shared_ptr<SharedPageQueue>> &needAckList)
{
std::unordered_set<std::string> oomList;
for (auto &sharedPageElementView : moveList) {
const auto &keyName = sharedPageElementView->KeyName();
if (oomList.find(keyName) != oomList.end()) {
continue;
}
Status allocRc = sharedPageElementView->MoveBufToShmUnit();
if (allocRc.IsError()) {
auto p = sharedPageElementView->GetAckRange();
LOG(WARNING) << FormatString("[%s, S:%s] Cursor [%zu, %zu) MoveBufToShmUnit failed. %s", LogPrefix(),
sharedPageElementView->streamName_, p.first, p.first + p.second,
allocRc.ToString());
if (allocRc.GetCode() == K_OUT_OF_MEMORY) {
oomList.emplace(keyName);
}
continue;
}
RemoteStreamInfoTbbMap::accessor accessor;
if (GetAccessor(keyName, accessor).IsOk()) {
auto p = sharedPageElementView->GetAckRange();
SyncStreamLastAckCursor(accessor, Optional<RemoteAckInfo::AckRange>(p));
}
}
for (auto &sharedPageQueue : needAckList) {
LOG_IF_ERROR(sharedPageQueue->RemoteAck(),
FormatString("[%s, S:%s] Remote ack failed", LogPrefix(), sharedPageQueue->GetPageQueueId()));
}
}
Status RemoteWorker::BatchAsyncFlushEntry(PendingFlushList &pendingFlushList)
{
INJECT_POINT("RemoteWorker.BatchAsyncFlushEntry.Sleep", [](int sleepSecond) {
std::this_thread::sleep_for(std::chrono::seconds(sleepSecond));
return Status::OK();
});
std::vector<PushReq> requests;
std::vector<std::vector<MemView>> payloads;
std::unordered_map<std::string, StreamRaii> raii;
std::list<std::shared_ptr<SharedPageElementView>> moveList;
std::unordered_set<std::shared_ptr<SharedPageQueue>> needAckList;
RETURN_IF_NOT_OK(ParsePendingFlushList(pendingFlushList, requests, payloads, raii, moveList, needAckList));
std::shared_ptr<RpcStubBase> stub;
RETURN_IF_NOT_OK(RpcStubCacheMgr::Instance().GetStub(remoteWorkerAddr_, StubType::WORKER_WORKER_SC_SVC, stub));
auto derivedStub = std::dynamic_pointer_cast<WorkerWorkerSCService_Stub>(stub);
RETURN_RUNTIME_ERROR_IF_NULL(derivedStub);
auto numRequestSent = BatchFlushAsyncWrite(derivedStub, requests, payloads);
HandleBlockedElements(moveList, needAckList);
RETURN_OK_IF_TRUE(numRequestSent == 0);
BatchFlushAsyncRead(derivedStub, pendingFlushList, requests, raii);
for (auto &req : requests) {
if (req.rc_.IsError()) {
return req.rc_;
}
}
return Status::OK();
}
Status RemoteWorker::GetStreamLastAckCursor(const std::string &streamName, uint64_t &cursor)
{
RETURN_IF_NOT_OK(remoteConsumers_.GetStreamLastAckCursor(streamName, cursor));
VLOG(SC_NORMAL_LOG_LEVEL) << FormatString("[%s, S:%s] remoteAck = %zu", LogPrefix(), streamName, cursor);
return Status::OK();
}
void RemoteWorker::SyncStreamLastAckCursor(RemoteStreamInfoTbbMap::accessor &accessor,
Optional<RemoteAckInfo::AckRange> ackRange)
{
remoteConsumers_.SyncStreamLastAckCursor(accessor, ackRange,
FormatString("%s, S:%s", LogPrefix(), accessor->first));
}
std::string RemoteWorker::LogPrefix() const
{
return FormatString("RW:%s", remoteWorkerAddr_.ToString());
}
bool RemoteWorker::ExistsRemoteConsumer()
{
return !remoteConsumers_.Empty();
}
void RemoteWorker::GetOrCreateSharedPageQueue(const std::string &namespaceUri,
std::shared_ptr<SharedPageQueue> &pageQueue)
{
sharedPageGroup_.GetOrCreateSharedPageQueue(namespaceUri, pageQueue);
}
RemoteWorkerManager::RemoteWorkerManager(ClientWorkerSCServiceImpl *scSvc, std::shared_ptr<AkSkManager> akSkManager,
std::shared_ptr<WorkerSCAllocateMemory> scAllocateManager)
: akSkManager_(std::move(akSkManager)), scSvc_(scSvc), scAllocateManager_(scAllocateManager)
{
}
RemoteWorkerManager::~RemoteWorkerManager()
{
remoteWorkerDict_.clear();
if (dataMap_) {
dataMap_->Stop();
}
}
Status RemoteWorkerManager::Init()
{
dataMap_ = std::make_unique<BufferPool>(
FLAGS_remote_send_thread_num, "ScPushToRemote",
std::bind(&RemoteWorkerManager::BatchAsyncFlushEntry, this, std::placeholders::_1, std::placeholders::_2));
RETURN_IF_NOT_OK(dataMap_->Init());
dataPool_ = std::make_unique<StreamDataPool>();
RETURN_IF_NOT_OK(dataPool_->Init());
workerInstanceId_ = GetStringUuid();
return Status::OK();
}
Status RemoteWorkerManager::GetRemoteWorker(const std::string &address, std::shared_ptr<RemoteWorker> &remoteWorker)
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
auto iter = remoteWorkerDict_.find(address);
CHECK_FAIL_RETURN_STATUS(iter != remoteWorkerDict_.end(), StatusCode::K_NOT_FOUND,
FormatString("Remote worker:<%s> does not exist", address));
RETURN_RUNTIME_ERROR_IF_NULL(iter->second);
remoteWorker = iter->second;
return Status::OK();
}
uint64_t RemoteWorkerManager::GetLastAckCursor(const std::string &streamName)
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
if (remoteWorkerDict_.empty()) {
return 0;
}
uint64_t lastAckCursor = std::numeric_limits<uint64_t>::max();
for (auto &ele : remoteWorkerDict_) {
auto &remoteWorker = ele.second;
uint64_t cursor;
Status rc = remoteWorker->GetStreamLastAckCursor(streamName, cursor);
if (rc.IsOk()) {
lastAckCursor = std::min<uint64_t>(lastAckCursor, cursor);
}
}
const int logPerCount = VLOG_IS_ON(SC_NORMAL_LOG_LEVEL) ? 1 : 1000;
LOG_EVERY_N(INFO, logPerCount) << FormatString("[S:%s] Remote consumer(s) lastAckCursor = %zu", streamName,
lastAckCursor);
return lastAckCursor;
}
void RemoteWorkerManager::RemoveStream(const std::string &keyName, const std::string &sharedPageName)
{
dataMap_->RemoveStream(keyName, sharedPageName);
}
void RemoteWorkerManager::PurgeBuffer(const std::shared_ptr<StreamManager> &streamMgr)
{
dataMap_->PurgeBuffer(streamMgr->GetStreamName(),
std::bind(&RemoteWorkerManager::ProcessEndOfStream, this, streamMgr, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
}
Status RemoteWorkerManager::ProcessEndOfStream(const std::shared_ptr<StreamManager> &streamMgr,
std::list<BaseData> dataLst, const std::string &streamName,
const std::string &producerId)
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
RETURN_OK_IF_TRUE(remoteWorkerDict_.empty());
std::vector<Status> status(remoteWorkerDict_.size());
size_t i = 0;
auto iter = remoteWorkerDict_.begin();
while (iter != remoteWorkerDict_.end()) {
auto rw = iter->second;
status.at(i) = rw->ProcessEndOfStream(streamMgr, dataLst, streamName, producerId);
++iter;
++i;
}
auto rc = std::find_if(status.begin(), status.end(), [](auto &kv) { return kv.IsError(); });
if (rc != status.end()) {
return (*rc);
}
return Status::OK();
}
Status RemoteWorkerManager::StreamNoToName(uint64_t streamNo, std::string &streamName)
{
return scSvc_->StreamNoToName(streamNo, streamName);
}
Status RemoteWorkerManager::SendElementsView(const std::shared_ptr<SendElementView> &eleView)
{
const std::string &streamName = eleView->StreamName();
auto &remoteWorker = eleView->remoteWorker_;
std::shared_ptr<RemoteWorker> rw;
if (!eleView->remote_) {
VLOG(SC_INTERNAL_LOG_LEVEL) << FormatString("[RW:%s, S:%s] Flush %zu elements remotely", remoteWorker,
streamName, eleView->GetElementNum());
RETURN_IF_NOT_OK(eleView->IncRefCount());
INJECT_POINT("RemoteWorkerManager.SendElementsView.PostIncRefCount");
dataMap_->Insert(eleView);
} else {
RETURN_IF_NOT_OK(GetRemoteWorker(remoteWorker, rw));
RemoteStreamInfoTbbMap::accessor accessor;
RETURN_IF_NOT_OK(rw->GetAccessor(streamName, accessor));
RemoteAckInfo::AckRange p = eleView->GetAckRange();
VLOG(SC_INTERNAL_LOG_LEVEL) << FormatString("[%s, S:%s] Ack cursor [%zu, %zu)", rw->LogPrefix(), streamName,
p.first, p.first + p.second);
rw->SyncStreamLastAckCursor(accessor, Optional<RemoteAckInfo::AckRange>(p));
}
return Status::OK();
}
Status RemoteWorkerManager::BatchAsyncFlushEntry(int myId, const PendingFlushList &pendingFlushList)
{
(void)myId;
auto traceGuard = Trace::Instance().SetTraceUUID();
std::unordered_map<std::string, std::pair<std::shared_ptr<RemoteWorker>, PendingFlushList>> flushMap;
for (const auto &ele : pendingFlushList) {
const StreamProducerKey key = ele.first;
std::list<BaseData> &dataLst = ele.second;
const std::string &streamName = key.firstKey_;
const std::string &remoteWorker = key.producerId_;
auto it = flushMap.find(remoteWorker);
if (it == flushMap.end()) {
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
auto iter = remoteWorkerDict_.find(remoteWorker);
if (iter == remoteWorkerDict_.end()) {
RemoteWorker::DiscardBuffers(dataLst);
continue;
}
it = flushMap.emplace(remoteWorker, std::make_pair(iter->second, PendingFlushList())).first;
}
auto &rw = it->second.first;
if (rw->HasRemoteConsumers(streamName)) {
it->second.second.push_back(ele);
} else {
RemoteWorker::DiscardBuffers(dataLst);
continue;
}
}
RETURN_OK_IF_TRUE(flushMap.empty());
std::vector<Status> status(flushMap.size());
size_t i = 0;
auto it = flushMap.begin();
while (it != flushMap.end()) {
auto &rw = it->second.first;
status.at(i) = rw->BatchAsyncFlushEntry(it->second.second);
++it;
++i;
}
auto rc = std::find_if(status.begin(), status.end(), [](auto &kv) { return kv.IsError(); });
if (rc != status.end()) {
return (*rc);
}
return Status::OK();
}
bool RemoteWorkerManager::HasRemoteConsumers(const std::string &streamName)
{
std::shared_lock<std::shared_timed_mutex> rlock(mutex_);
return std::any_of(remoteWorkerDict_.begin(), remoteWorkerDict_.end(),
[&streamName](const auto &kv) { return kv.second->HasRemoteConsumers(streamName); });
}
Status RemoteWorkerManager::DeleteStream(const std::string &streamName)
{
RETURN_IF_NOT_OK_EXCEPT(dataPool_->RemoveStreamObject(streamName, {}), K_SC_STREAM_NOT_FOUND);
std::lock_guard<std::shared_timed_mutex> lock(mutex_);
auto iter = remoteWorkerDict_.begin();
while (iter != remoteWorkerDict_.end()) {
RETURN_RUNTIME_ERROR_IF_NULL(iter->second);
auto &rw = iter->second;
Optional<bool> mapEmpty(false);
RETURN_IF_NOT_OK_EXCEPT(rw->DeleteStream(streamName, mapEmpty), K_SC_STREAM_NOT_FOUND);
if (mapEmpty.value()) {
LOG(INFO) << "Erase remote worker " << rw->remoteWorkerAddr_.ToString() << " from remoteWorkerDict_";
iter = remoteWorkerDict_.erase(iter);
} else {
++iter;
}
}
return Status::OK();
}
Status RemoteWorkerManager::DoneScanning(const std::string &streamName)
{
RETURN_IF_NOT_OK_EXCEPT(dataPool_->RemoveStreamObject(streamName, {}), K_SC_STREAM_NOT_FOUND);
return Status::OK();
}
std::string RemoteWorkerManager::GetSCRemoteSendSuccessRate()
{
return remoteSendRateVec_.BlockingGetRateToStringAndClean();
}
Status RemoteWorkerManager::ToggleStreamBlocking(const std::string &workerAddr, const std::string &streamName,
bool enable)
{
if (enable) {
INJECT_POINT("RemoteWorker.EnableStreamBlocking.sleep");
}
std::shared_ptr<RemoteWorker> rw;
RETURN_IF_NOT_OK(GetRemoteWorker(workerAddr, rw));
VLOG(SC_NORMAL_LOG_LEVEL) << (enable ? "Blocking" : "Unblocking") << " Producer for stream: " << streamName
<< " From remote worker: " << workerAddr;
RemoteStreamInfoTbbMap::accessor accessor;
RETURN_IF_NOT_OK(rw->GetAccessor(streamName, accessor));
std::get<K_BLOCKED>(accessor->second) = enable;
if (ScMetricsMonitor::Instance()->IsEnabled()) {
uint64_t numRemoteConsumers = std::get<K_CONSUMER_ID>(accessor->second).size();
accessor.release();
StreamManagerMap::const_accessor streamMgrAccessor;
RETURN_IF_NOT_OK(scSvc_->GetStreamManager(streamName, streamMgrAccessor));
if (enable) {
streamMgrAccessor->second->GetSCStreamMetrics()->IncrementMetric(StreamMetric::NumRemoteConsumersBlocking,
numRemoteConsumers);
} else {
streamMgrAccessor->second->GetSCStreamMetrics()->DecrementMetric(StreamMetric::NumRemoteConsumersBlocking,
numRemoteConsumers);
}
}
return Status::OK();
}
Status RemoteWorkerManager::DelRemoteConsumer(const std::string &workerAddr, const std::string &streamName,
const std::string &consumerId)
{
std::vector<std::string> dest;
{
std::unique_lock<std::shared_timed_mutex> lock(mutex_);
auto iter = remoteWorkerDict_.find(workerAddr);
CHECK_FAIL_RETURN_STATUS(iter != remoteWorkerDict_.end(), K_NOT_FOUND,
FormatString("Remote worker:<%s> does not exist", workerAddr));
auto &rw = iter->second;
Optional<bool> mapEmpty(false);
RETURN_IF_NOT_OK(rw->DelRemoteConsumer(streamName, consumerId, mapEmpty));
if (mapEmpty.value()) {
LOG(INFO) << "Erase remote worker " << workerAddr << " from remoteWorkerDict_";
(void)remoteWorkerDict_.erase(iter);
}
dest = GetRemoteWorkers(streamName);
}
RETURN_IF_NOT_OK_EXCEPT(dataPool_->RemoveStreamObject(streamName, dest), K_SC_STREAM_NOT_FOUND);
return Status::OK();
}
Status RemoteWorkerManager::AddRemoteConsumer(const std::shared_ptr<StreamManager> &streamMgr,
const HostPort &localWorkerAddress, const HostPort &remoteWorkerAddress,
const std::string &streamName, const SubscriptionConfig &subConfig,
const std::string &consumerId, uint64_t lastAckCursor)
{
StreamFields streamFields;
streamMgr->GetStreamFields(streamFields);
std::shared_ptr<SharedPageQueue> sharedPage;
std::vector<std::string> dest;
{
std::shared_ptr<RemoteWorker> rw;
std::lock_guard<std::shared_timed_mutex> lock(mutex_);
auto iter = remoteWorkerDict_.find(remoteWorkerAddress.ToString());
if (iter == remoteWorkerDict_.end()) {
auto remoteWorker = std::make_shared<RemoteWorker>(localWorkerAddress, remoteWorkerAddress, akSkManager_,
scSvc_, workerInstanceId_, scAllocateManager_, this);
RETURN_IF_NOT_OK(remoteWorker->Init());
remoteWorker->RegisterRecordRemoteSendRateCallBack(
[this](int successNum, int totalNum) { remoteSendRateVec_.BlockingEmplaceBack(successNum, totalNum); });
iter = remoteWorkerDict_.emplace(remoteWorkerAddress.ToString(), remoteWorker).first;
}
rw = iter->second;
RETURN_IF_NOT_OK(
rw->AddRemoteConsumer(streamName, subConfig, consumerId, streamMgr->GetMaxWindowCount(), lastAckCursor));
if (StreamManager::EnableSharedPage(streamFields.streamMode_)) {
rw->GetOrCreateSharedPageQueue(streamName, sharedPage);
streamMgr->SetSharedPageQueue(sharedPage);
const std::string &keyName = sharedPage->GetStreamName();
auto lastAppendCursor = sharedPage->GetLastAppendCursor();
lastAckCursor = lastAppendCursor;
uint64_t cursor;
Status rc = rw->GetStreamLastAckCursor(keyName, cursor);
if (rc.IsOk()) {
lastAckCursor = std::min<uint64_t>(lastAckCursor, cursor);
}
RETURN_IF_NOT_OK(rw->AddRemoteConsumer(keyName, SubscriptionConfig(), keyName, 1, 0));
}
dest = GetRemoteWorkers(streamName);
}
if (!StreamManager::EnableSharedPage(streamFields.streamMode_)) {
RETURN_IF_NOT_OK_EXCEPT(dataPool_->AddStreamObject(streamMgr, streamName, dest, lastAckCursor), K_DUPLICATED);
} else {
RETURN_IF_NOT_OK_EXCEPT(dataPool_->AddSharedPageObject(sharedPage, streamName, dest, lastAckCursor),
K_DUPLICATED);
}
return Status::OK();
}
Status RemoteWorkerManager::ClearAllRemoteConsumer(const std::string &streamName, bool forceClose)
{
if (forceClose) {
LOG(INFO) << "Client has crashed cleaning up the stream: " << streamName;
RETURN_IF_NOT_OK(DeleteStream(streamName));
}
return Status::OK();
}
Status RemoteWorkerManager::ResetStreamScanList(const std::string &streamName)
{
RETURN_IF_NOT_OK_EXCEPT(dataPool_->ResetStreamScanPosition(streamName), K_SC_STREAM_NOT_FOUND);
return Status::OK();
}
std::vector<std::string> RemoteWorkerManager::GetRemoteWorkers(const std::string &streamName)
{
std::vector<std::string> v;
std::for_each(remoteWorkerDict_.begin(), remoteWorkerDict_.end(), [&v, &streamName](const auto &kv) {
auto &rw = kv.second;
if (rw->HasRemoteConsumers(streamName)) {
v.emplace_back(kv.first);
}
});
return v;
}
void RemoteAckInfo::SyncStreamLastAckCursor(Optional<AckRange> ackRange, const std::string &logPrefix)
{
VLOG(SC_INTERNAL_LOG_LEVEL) << FormatString("[%s] Last remote ack cursor %zu", logPrefix, lastAckCursor_);
if (!ackQue_.empty()) {
VLOG(SC_INTERNAL_LOG_LEVEL) << FormatString("[%s] Most recent ack cursor %zu", logPrefix, ackQue_.top().first);
}
if (ackRange) {
auto begCursor = ackRange.value().first;
if (lastAckCursor_ < begCursor) {
ackQue_.push(ackRange.value());
}
}
while (!ackQue_.empty() && lastAckCursor_ + 1 == ackQue_.top().first) {
auto ele = ackQue_.top();
ackQue_.pop();
lastAckCursor_ = ele.first + ele.second - 1;
VLOG(SC_INTERNAL_LOG_LEVEL) << FormatString("[%s] Move up ack cursor to %zu", logPrefix, lastAckCursor_);
}
}
uint64_t RemoteAckInfo::GetStreamLastAckCursor() const
{
return lastAckCursor_;
}
void RemoteAckInfo::Reset()
{
lastAckCursor_ = 0;
while (!ackQue_.empty()) {
ackQue_.pop();
}
}
RemoteAckInfo::RemoteAckInfo(uint64_t cursor) : lastAckCursor_(cursor)
{
}
Status RemoteConsumerMap::AddConsumer(const std::string &streamName, const std::string &consumerId,
uint64_t windowCount, uint64_t lastAckCursor)
{
std::shared_lock<std::shared_timed_mutex> lock(consumerMutex_);
RemoteStreamInfoTbbMap::accessor accessor;
if (streamConsumers_.find(accessor, streamName)) {
auto ret = std::get<K_CONSUMER_ID>(accessor->second).emplace(consumerId);
CHECK_FAIL_RETURN_STATUS(
ret.second, K_DUPLICATED,
FormatString("[S:%s] Add remote consumer error. Duplicate consumer id %s", streamName, consumerId));
} else {
std::set<std::string> remoteConsumerId;
remoteConsumerId.emplace(consumerId);
auto RemoteConsumer =
std::make_tuple(false, RemoteAckInfo(lastAckCursor), windowCount, std::move(remoteConsumerId));
streamConsumers_.emplace(accessor, streamName, std::move(RemoteConsumer));
}
return Status::OK();
}
Status RemoteConsumerMap::DeleteConsumer(const std::string &streamName, const std::string &consumerId,
Optional<bool> &mapEmpty)
{
std::unique_lock<std::shared_timed_mutex> lock(consumerMutex_);
RemoteStreamInfoTbbMap::accessor accessor;
bool find = streamConsumers_.find(accessor, streamName);
CHECK_FAIL_RETURN_STATUS(find, StatusCode::K_SC_STREAM_NOT_FOUND,
FormatString("[S:%s] Stream not found", streamName));
CHECK_FAIL_RETURN_STATUS(std::get<K_CONSUMER_ID>(accessor->second).erase(consumerId) == 1,
StatusCode::K_SC_CONSUMER_NOT_FOUND,
FormatString("[S:%s, C:%s] Consumer not belong to Stream", streamName, consumerId));
if (std::get<K_CONSUMER_ID>(accessor->second).empty()) {
(void)streamConsumers_.erase(accessor);
}
if (mapEmpty) {
*mapEmpty = streamConsumers_.empty();
}
return Status::OK();
}
Status RemoteConsumerMap::DeleteStream(const std::string &streamName, Optional<bool> &mapEmpty)
{
std::unique_lock<std::shared_timed_mutex> lock(consumerMutex_);
RemoteStreamInfoTbbMap::accessor accessor;
bool find = streamConsumers_.find(accessor, streamName);
CHECK_FAIL_RETURN_STATUS(find, StatusCode::K_SC_STREAM_NOT_FOUND,
FormatString("Can not find stream:<%s>", streamName));
(void)streamConsumers_.erase(accessor);
if (mapEmpty) {
*mapEmpty = streamConsumers_.empty();
}
return Status::OK();
}
bool RemoteConsumerMap::HasRemoteConsumers(const std::string &streamName)
{
std::shared_lock<std::shared_timed_mutex> lock(consumerMutex_);
RemoteStreamInfoTbbMap::const_accessor accessor;
return streamConsumers_.find(accessor, streamName);
}
Status RemoteConsumerMap::ToggleStreamBlocking(const std::string &streamName, bool enable)
{
std::shared_lock<std::shared_timed_mutex> lock(consumerMutex_);
LOG(INFO) << (enable ? "Block" : "Unblock") << " Producer for stream " << streamName;
RemoteStreamInfoTbbMap::accessor accessor;
bool find = streamConsumers_.find(accessor, streamName);
CHECK_FAIL_RETURN_STATUS(find, StatusCode::K_SC_STREAM_NOT_FOUND,
FormatString("Can not find stream:<%s>", streamName));
std::get<K_BLOCKED>(accessor->second) = enable;
return Status::OK();
}
bool RemoteConsumerMap::IsStreamSendBlocked(const std::string &streamName)
{
std::shared_lock<std::shared_timed_mutex> lock(consumerMutex_);
RemoteStreamInfoTbbMap::const_accessor accessor;
return streamConsumers_.find(accessor, streamName) && std::get<K_BLOCKED>(accessor->second);
}
Status RemoteConsumerMap::GetStreamLastAckCursor(const std::string &streamName, uint64_t &cursor)
{
std::shared_lock<std::shared_timed_mutex> lock(consumerMutex_);
RemoteStreamInfoTbbMap::accessor accessor;
if (streamConsumers_.find(accessor, streamName)) {
cursor = std::get<K_ACK>(accessor->second).GetStreamLastAckCursor();
return Status::OK();
}
RETURN_STATUS(K_SC_CONSUMER_NOT_FOUND, FormatString("Can not find stream:<%s>", streamName));
}
void RemoteConsumerMap::SyncStreamLastAckCursor(RemoteStreamInfoTbbMap::accessor &accessor,
Optional<RemoteAckInfo::AckRange> ackRange,
const std::string &logPrefix)
{
std::get<K_ACK>(accessor->second).SyncStreamLastAckCursor(ackRange, logPrefix);
}
bool RemoteConsumerMap::Empty() const
{
std::unique_lock<std::shared_timed_mutex> lock(consumerMutex_);
return streamConsumers_.empty();
}
uint64_t RemoteConsumerMap::GetMaxWindowCount(const std::string &streamName) const
{
std::shared_lock<std::shared_timed_mutex> lock(consumerMutex_);
RemoteStreamInfoTbbMap::accessor accessor;
if (streamConsumers_.find(accessor, streamName)) {
return std::get<K_WINDOW_COUNT>(accessor->second);
}
return 1;
}
Status RemoteConsumerMap::GetAccessor(const std::string &streamName, RemoteStreamInfoTbbMap::accessor &accessor,
const std::string &logPrefix)
{
std::shared_lock<std::shared_timed_mutex> lock(consumerMutex_);
auto success = streamConsumers_.find(accessor, streamName);
CHECK_FAIL_RETURN_STATUS(success, K_SC_STREAM_NOT_FOUND,
FormatString("[%s, S:%s] Stream not found", logPrefix, streamName));
return Status::OK();
}
}
}
}