* Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "datasystem/worker/hash_ring/hash_ring.h"
#include <algorithm>
#include <csignal>
#include <cstdint>
#include <iterator>
#include <shared_mutex>
#include <sstream>
#include <string>
#include <thread>
#include <unordered_set>
#include <utility>
#include <google/protobuf/util/message_differencer.h>
#include "datasystem/common/iam/tenant_auth_manager.h"
#include "datasystem/common/inject/inject_point.h"
#include "datasystem/common/kvstore/etcd/etcd_constants.h"
#include "datasystem/common/log/log_helper.h"
#include "datasystem/common/log/trace.h"
#include "datasystem/common/log/spdlog/provider.h"
#include "datasystem/common/util/container_util.h"
#include "datasystem/common/util/format.h"
#include "datasystem/common/util/hash_algorithm.h"
#include "datasystem/common/util/meta_route_tool.h"
#include "datasystem/common/util/net_util.h"
#include "datasystem/common/util/random_data.h"
#include "datasystem/common/util/status_helper.h"
#include "datasystem/common/util/strings_util.h"
#include "datasystem/common/util/uuid_generator.h"
#include "datasystem/common/util/validator.h"
#include "datasystem/master/meta_addr_info.h"
#include "datasystem/protos/hash_ring.pb.h"
#include "datasystem/utils/status.h"
#include "datasystem/worker/cluster_manager/worker_health_check.h"
#include "datasystem/worker/hash_ring/hash_ring_allocator.h"
#include "datasystem/worker/hash_ring/hash_ring_event.h"
#include "datasystem/worker/hash_ring/hash_ring_tools.h"
DS_DECLARE_string(etcd_address);
DS_DECLARE_string(metastore_address);
DS_DECLARE_string(master_address);
DS_DECLARE_bool(enable_distributed_master);
DS_DECLARE_uint32(add_node_wait_time_s);
DS_DECLARE_bool(auto_del_dead_node);
namespace datasystem {
namespace worker {
static constexpr int MAX_CANDIDATE_WORKER_NUM = 5;
const HashRing::HashFunction HashRing::hashFunction_ = MurmurHash3_32;
HashRing::HashRing(std::string workerAddr, EtcdStore *etcdStore)
: workerAddr_(std::move(workerAddr)),
etcdStore_(etcdStore),
state_(INIT),
enableDistributedMaster_(FLAGS_enable_distributed_master)
{
}
HashRing::~HashRing()
{
exitFlag_ = true;
LOG(INFO) << "HashRing exit";
}
Status HashRing::InitMasterAddress()
{
if (!FLAGS_master_address.empty()) {
return Status::OK();
}
Status status = etcdStore_->CAS(ETCD_MASTER_ADDRESS_TABLE, MASTER_ADDRESS_KEY, "", workerAddr_);
if (status.IsError()) {
RETURN_IF_NOT_OK_PRINT_ERROR_MSG(
etcdStore_->Get(ETCD_MASTER_ADDRESS_TABLE, MASTER_ADDRESS_KEY, FLAGS_master_address),
"Failed to get master address from etcd, and cas error:" + status.ToString());
} else {
FLAGS_master_address = workerAddr_;
}
LOG(INFO) << FormatString("The master address:%s of worker:%s", FLAGS_master_address, workerAddr_);
return Status::OK();
}
void HashRing::RestoreScalingTaskIfNeeded(bool isRestartScenario)
{
if (!IsWorkable()) {
return;
}
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
if (state_ == PRE_LEAVING) {
SubmitMigrateDataTaskIfNeed(ringInfo_);
}
if (ringInfo_.add_node_info().empty() && ringInfo_.del_node_info().empty()) {
return;
}
taskExecutor_->RestoreScalingTask(ringInfo_, isRestartScenario);
}
Status HashRing::InitWithoutEtcd(const std::string &hashRing)
{
FLAGS_master_address = workerAddr_;
startUpState_ = StartUpState::RESTART;
HashRingPb hashRingPb;
if (!hashRingPb.ParseFromString(hashRing)) {
return Status(K_RUNTIME_ERROR, "Failed to parse HashRingPb from string");
}
auto workerInRing = hashRingPb.workers().find(workerAddr_);
CHECK_FAIL_RETURN_STATUS(
workerInRing != hashRingPb.workers().end(), K_RUNTIME_ERROR,
"The etcd is not writable and the local worker is not in the persistent ring. Please check the etcd.");
workerUuid_ = workerInRing->second.worker_uuid();
taskExecutor_ = std::make_unique<HashRingTaskExecutor>(workerAddr_, workerUuid_, etcdStore_);
RETURN_IF_NOT_OK(UpdateWhenNodeRestart(hashRing, hashRingPb));
RestoreScalingTaskIfNeeded(true);
if (state_.load() != NO_INIT) {
hashRingHealthCheck_ = std::make_unique<HashRingHealthCheck>(this);
hashRingHealthCheck_->Init();
}
return Status::OK();
}
Status HashRing::InitWithEtcd()
{
INJECT_POINT("HashRing.Init.ChangeDefaultHashTokenNum", [](int hashTokenNum) {
HashRingAllocator::defaultHashTokenNum = hashTokenNum;
return Status::OK();
});
LOG(INFO) << "HashRing start to init for worker:" << workerAddr_;
if (FLAGS_etcd_address.empty() && FLAGS_metastore_address.empty()) {
return Status::OK();
}
RETURN_IF_NOT_OK(InitMasterAddress());
Timer timer;
int waitTimeMs = 60'000;
INJECT_POINT_NO_RETURN("HashRing.InitWithEtcd.MotifyWaitTimeMs",
[&waitTimeMs](int timeoutMs) { waitTimeMs = timeoutMs; });
const int retryIntervalMs = 200;
Status status;
std::string oldVersionRingVal;
TryGetOldRing(oldVersionRingVal);
RangeSearchResult res;
while (status = etcdStore_->CAS(ETCD_RING_PREFIX, "",
std::bind(&HashRing::InitRing, this, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, oldVersionRingVal),
res),
status.GetCode() == K_TRY_AGAIN && timer.ElapsedMilliSecond() < waitTimeMs) {
std::this_thread::sleep_for(std::chrono::milliseconds(retryIntervalMs));
};
RETURN_IF_NOT_OK_PRINT_ERROR_MSG(status, "InitRing failed");
baselineModRevisionOfRing_ = res.modRevision;
taskExecutor_ = std::make_unique<HashRingTaskExecutor>(workerAddr_, workerUuid_, etcdStore_);
timer_ = std::make_unique<Timer>();
const std::string &backendAddress = !FLAGS_metastore_address.empty() ? FLAGS_metastore_address : FLAGS_etcd_address;
(void)etcdStore_->Delete(ETCD_RING_PREFIX, backendAddress);
if (IsCentralized()) {
state_ = NO_INIT;
}
bool isRestart = false;
RETURN_IF_NOT_OK(IsRestart(isRestart));
LOG(INFO) << "HashRing finished init for worker:" << workerAddr_ << ", isRestart:" << isRestart
<< ", isRunning: " << IsRunning() << ", isWorkable:" << IsWorkable()
<< ", baseline: " << baselineModRevisionOfRing_.load() << ", workerId: " << workerUuid_;
RestoreScalingTaskIfNeeded(true);
if (state_.load() != NO_INIT) {
hashRingHealthCheck_ = std::make_unique<HashRingHealthCheck>(this);
hashRingHealthCheck_->Init();
}
return Status::OK();
}
Status HashRing::UpdateWhenNodeRestart(const std::string &oldValue, const HashRingPb &oldRing)
{
if (oldRing.cluster_has_init() || IsCentralized()) {
RETURN_IF_NOT_OK(UpdateRing(oldValue, -1, true));
LOG(INFO) << "The hash ring started successfully after restoration.";
}
return Status::OK();
}
void HashRing::TryGetOldRing(std::string &oldVersionRingVal)
{
const std::string &backendAddress = !FLAGS_metastore_address.empty() ? FLAGS_metastore_address : FLAGS_etcd_address;
while (!exitFlag_) {
auto status = etcdStore_->Get(ETCD_RING_PREFIX, backendAddress, oldVersionRingVal);
if (status.GetCode() == K_NOT_FOUND || status.IsOk()) {
break;
} else {
static const int INTERVAL_MS_GET_KEY = 200;
std::this_thread::sleep_for(std::chrono::milliseconds(INTERVAL_MS_GET_KEY));
}
}
}
Status HashRing::InitRing(const std::string &oldValue, std::unique_ptr<std::string> &newValue, bool &retry,
const std::string &oldVersionRingVal)
{
(void)retry;
HashRingPb oldRing;
CHECK_FAIL_RETURN_STATUS(oldRing.ParseFromString(oldValue.empty() ? oldVersionRingVal : oldValue), K_RUNTIME_ERROR,
"ParseFromString failed");
auto lines = SplitRingJson(FormatString("InitRing for worker %s, the old ring is ", workerAddr_), oldRing);
std::for_each(lines.begin(), lines.end(), [](const std::string &line) { VLOG(1) << line; });
static const int INTERVAL_MS = 1000;
if (oldRing.del_node_info().find(workerAddr_) != oldRing.del_node_info().end()) {
LOG(INFO) << "The scale down of this node is being executed, retry...";
auto intrevalMs = INTERVAL_MS;
INJECT_POINT("HashRing.InitRing.CheckQuickly", [&](uint32_t timeMs) {
intrevalMs = timeMs;
return Status::OK();
});
std::this_thread::sleep_for(std::chrono::milliseconds(intrevalMs));
return Status(K_TRY_AGAIN, "The scale down of this node is being executed, retry...");
}
RETURN_IF_NOT_OK(UpdateWhenNodeRestart(oldValue, oldRing));
HashRingPb newRing = oldRing;
INJECT_POINT("worker.InitRing", [this, &newRing] {
newRing.mutable_workers()->erase(workerAddr_);
return Status::OK();
});
newRing.set_cluster_id("");
auto workerInRing = newRing.mutable_workers()->find(workerAddr_);
if (workerInRing == newRing.mutable_workers()->end()) {
startUpState_ = StartUpState::START;
workerUuid_ = GetStringUuid();
INJECT_POINT("HashRing.InitRing.CustomWorkerId", [this](const std::string &customizedWorkerId) {
workerUuid_ = customizedWorkerId;
return Status::OK();
});
WorkerPb workerPb;
workerPb.set_worker_uuid(workerUuid_);
workerPb.set_state(WorkerPb::INITIAL);
(void)newRing.mutable_workers()->insert({ workerAddr_, workerPb });
} else {
startUpState_ = StartUpState::RESTART;
workerUuid_ = workerInRing->second.worker_uuid();
if (workerInRing->second.need_scale_down() && workerInRing->second.state() == WorkerPb::ACTIVE) {
LOG(INFO) << "Cancel the unstarted scale-in task of " << workerAddr_;
workerInRing->second.set_need_scale_down(false);
}
}
if (workerUuid_.empty()) {
std::for_each(lines.begin(), lines.end(), [](const std::string &line) { LOG(INFO) << line; });
RETURN_STATUS(K_RUNTIME_ERROR, "worker id can not be empty!");
}
if (newRing.workers().at(workerAddr_).state() == WorkerPb::INITIAL) {
isNewNode_ = true;
}
if (!google::protobuf::util::MessageDifferencer::Equals(newRing, oldRing)) {
newValue = std::make_unique<std::string>(newRing.SerializeAsString());
auto lines = SplitRingJson(FormatString("InitRing for worker %s, the new ring is", workerAddr_), newRing);
std::for_each(lines.begin(), lines.end(), [](const std::string &line) { VLOG(1) << line; });
}
return Status::OK();
}
bool HashRing::HashTokensIsReady(const HashRingPb &ring) const
{
int workerNum = ring.workers_size();
int hashTokenNum = workerNum * HashRingAllocator::defaultHashTokenNum;
int sum = 0;
for (const auto &worker : ring.workers()) {
sum += worker.second.hash_tokens_size();
}
return sum == hashTokenNum;
}
void HashRing::GetActiveWorkersDbNames(std::vector<std::string> &activeDbNames)
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
for (const auto &info : workerAddr2UuidMap_) {
activeDbNames.emplace_back(info.second);
}
}
void HashRing::TryFirstInit()
{
if (HashTokensIsReady(ringInfo_)) {
return;
}
VLOG(LOG_LEVEL) << "Trying to generate hash tokens.";
auto status = etcdStore_->CAS(
ETCD_RING_PREFIX, "",
[this](const std::string &oldValue, std::unique_ptr<std::string> &newValue, bool & ) {
HashRingPb oldRing;
if (!oldRing.ParseFromString(oldValue)) {
LOG(WARNING) << "Failed to parse HashRingPb from string. give up and wait for next time if needed.";
return Status::OK();
}
if (oldRing.cluster_has_init()) {
VLOG(LOG_LEVEL) << "cluster has init.";
return Status::OK();
}
std::set<std::string> sortedWorkers = GetKeysFromPairsContainer(oldRing.workers());
if (sortedWorkers.empty()) {
LOG(WARNING) << "Not found worker from HashRingPb. give up and wait for next time if needed.";
return Status::OK();
}
std::set<std::string> candidateWorkers(
sortedWorkers.begin(),
std::next(sortedWorkers.begin(),
std::min(sortedWorkers.size(), static_cast<size_t>(MAX_CANDIDATE_WORKER_NUM))));
if (!ContainsKey(candidateWorkers, workerAddr_)) {
return Status::OK();
}
HashRingPb newRing = HashRingAllocator::GenerateAllHashTokens(oldRing, sortedWorkers);
newValue = std::make_unique<std::string>(newRing.SerializeAsString());
VLOG(LOG_LEVEL) << "TryFirstInit generates new ring.";
return Status::OK();
});
if (status.IsError()) {
LOG(ERROR) << "Update hash ring failed: " << status.GetMsg();
return;
}
return;
}
void HashRing::TryAdd()
{
INJECT_POINT("worker.HashRing.TryAdd", [] {});
LOG_IF_ERROR(AddNode(ringInfo_), "Add node to working hash ring failed.");
}
void HashRing::EraseUnFinishMigrateDataWorker(HashRingPb &oldRing, std::unique_ptr<std::string> &newValue)
{
HashRingPb newRing = oldRing;
for (const auto &worker : oldRing.workers()) {
if (worker.second.hash_tokens().empty() && worker.second.state() == WorkerPb::LEAVING) {
newRing.mutable_workers()->erase(worker.first);
}
}
newValue = std::make_unique<std::string>(newRing.SerializeAsString());
}
void HashRing::GenerateVoluntaryScaleDownChangingInfo()
{
INJECT_POINT("InspectAndProcessPeriodically.skip", []() { return; });
std::unordered_set<std::string> allScaleDownWorkers;
uint32_t runningWorkerSize = 0;
auto needRemoveWorkers = GetLeavingWorkers(ringInfo_, allScaleDownWorkers, runningWorkerSize);
if (needRemoveWorkers.empty()) {
return;
}
auto funcHandler = [&](const std::string &oldValue, std::unique_ptr<std::string> &newValue, bool &) {
HashRingPb oldRing;
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(oldRing.ParseFromString(oldValue), K_RUNTIME_ERROR,
"Failed to parse HashRingPb from string");
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
google::protobuf::util::MessageDifferencer differencer;
differencer.set_repeated_field_comparison(google::protobuf::util::MessageDifferencer::AS_SET);
if (!differencer.Compare(ringInfo_, oldRing)) {
LOG(WARNING) << "local hashring not update, wait and try again";
return Status::OK();
}
if (runningWorkerSize == allScaleDownWorkers.size()) {
LOG(WARNING) << "Can't find standby worker, there is only one worker left in the cluster, shutdown now";
taskExecutor_->ClearVoluntaryTaskId();
EraseUnFinishMigrateDataWorker(oldRing, newValue);
allWorkersVoluntaryScaleDown_ = true;
voluntaryScaleDownDone_ = true;
return Status::OK();
}
HashRingAllocator allocator(oldRing);
for (const auto &workerId : needRemoveWorkers) {
auto &worker = oldRing.mutable_workers()->at(workerId);
if (worker.state() == WorkerPb::ACTIVE) {
LOG(INFO) << "Begin to generate voluntary scale add_node_info for " << workerId;
worker.set_state(WorkerPb::LEAVING);
} else if (worker.state() == WorkerPb::LEAVING) {
INJECT_POINT("hashring.regenerate.sleep");
LOG(INFO) << "Regenerate voluntary scale add_node_info for " << workerId;
}
RETURN_IF_NOT_OK_PRINT_ERROR_MSG(
allocator.RemoveNodeVoluntarily(workerId, allScaleDownWorkers, oldRing),
"RemoveNodeVoluntarily failed");
if (workerId == workerAddr_) {
taskExecutor_->ClearVoluntaryTaskId();
}
}
newValue = std::make_unique<std::string>(oldRing.SerializeAsString());
return Status::OK();
};
HASH_RING_LOG_IF_ERROR(etcdStore_->CAS(ETCD_RING_PREFIX, "", funcHandler), " generate voluntary info failed");
return;
}
bool HashRing::IsPreLeaving(const std::string &workerAddr)
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
for (auto worker : ringInfo_.workers()) {
if (worker.second.need_scale_down() == true && worker.first == workerAddr) {
return true;
}
}
return false;
}
bool HashRing::IsLeaving(const std::string &workerAddr)
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
for (auto worker : ringInfo_.workers()) {
if (worker.second.state() == WorkerPb::LEAVING && worker.first == workerAddr) {
return true;
}
}
return false;
}
void HashRing::InspectAndProcessPeriodically()
{
if (state_.load() == NO_INIT) {
return;
}
INJECT_POINT("HashRing.HealthProbe", []() { ResetHealthProbe(); });
std::shared_lock<std::shared_timed_mutex> lock(mutex_, std::defer_lock);
bool rec = lock.try_lock();
if (!rec) {
return;
}
bool hasInit = ringInfo_.cluster_has_init();
if (state_.load() == INIT && !hasInit) {
int startWaitTime = 5;
INJECT_POINT("test.start.notWait", [&startWaitTime](int waitTime) { startWaitTime = waitTime; });
TryGenerateHashRange(startWaitTime, std::bind(&HashRing::TryFirstInit, this));
return;
}
if (state_.load() == INIT && hasInit) {
TryAdd();
}
if (state_.load() == PRE_RUNNING) {
TryGenerateHashRange(FLAGS_add_node_wait_time_s, std::bind(&HashRing::TryAdd, this));
}
if ((state_.load() == RUNNING || state_.load() == PRE_LEAVING) && needVoluntaryScaleDown_) {
HostPort migrateDbAddr;
std::string migrateDbName;
auto status =
HashRingEvent::GetDbPrimaryLocation::GetInstance().NotifyAll(workerAddr_, migrateDbAddr, migrateDbName);
if (status.IsOk() && migrateDbAddr.ToString() != workerAddr_) {
LOG(INFO) << "wait voluntary scale down node change primary replica to local";
return;
}
GenerateVoluntaryScaleDownChangingInfo();
SubmitMigrateDataTaskIfNeed(ringInfo_);
}
}
void HashRing::TryGenerateHashRange(int waitTime, std::function<void()> func)
{
if (!timer_ || timer_->ElapsedSecond() < waitTime) {
return;
}
func();
}
std::set<std::string> HashRing::GetAddingWorkers(const HashRingPb &ring) const
{
if ((!needForceJoin_ && ring.del_node_info_size() != 0) || ring.add_node_info_size() != 0) {
return {};
}
std::set<std::string> workers;
for (auto &i : ring.workers()) {
bool hasToken = i.second.hash_tokens_size() != 0;
if (i.second.state() == WorkerPb::JOINING && !needForceJoin_) {
return {};
}
if (i.second.state() == WorkerPb::LEAVING && hasToken && !needForceJoin_) {
return {};
}
if (!hasToken && i.second.state() == WorkerPb::INITIAL) {
workers.emplace(i.first);
}
}
return workers;
}
std::unordered_set<std::string> HashRing::GetLeavingWorkers(const HashRingPb &ring,
std::unordered_set<std::string> &allScaleDownWorkers,
uint32_t &runningWorkersSize) const
{
if (voluntaryScaleDownDone_) {
return {};
}
if (ring.del_node_info_size() != 0 || ring.add_node_info_size() != 0) {
return {};
}
std::unordered_set<std::string> workers;
bool isScaleUp = false;
std::unordered_set<std::string> leavingWorkers;
for (auto &i : ring.workers()) {
if (i.second.state() != WorkerPb::INITIAL) {
runningWorkersSize++;
}
if (i.second.state() != WorkerPb::ACTIVE && i.second.state() != WorkerPb::LEAVING) {
isScaleUp = true;
continue;
}
if (i.second.state() == WorkerPb::LEAVING && !i.second.hash_tokens().empty()) {
leavingWorkers.emplace(i.first);
}
if (i.second.need_scale_down()) {
allScaleDownWorkers.emplace(i.first);
if (!i.second.hash_tokens().empty()) {
workers.emplace(i.first);
}
}
}
if (isScaleUp) {
return leavingWorkers;
}
return workers;
}
Status HashRing::AddNode(const HashRingPb &currRing)
{
auto workers = GetAddingWorkers(currRing);
if (workers.empty()) {
return Status::OK();
}
LOG(INFO) << "add nodes to working hash ring: " << VectorToString(workers);
HashRingPb newRing = currRing;
HashRingAllocator allocator(currRing);
for (auto &i : workers) {
std::vector<uint32_t> newNodeTokens;
RETURN_IF_NOT_OK(allocator.AddNode(i, HashRingAllocator::defaultHashTokenNum, newNodeTokens));
for (auto token : newNodeTokens) {
(*newRing.mutable_workers())[i].mutable_hash_tokens()->Add(std::move(token));
}
(*newRing.mutable_workers())[i].set_state(WorkerPb::JOINING);
}
std::map<std::string, ChangeNodePb> addNodeInfos;
allocator.GetAddNodeInfo(addNodeInfos);
for (auto &info : addNodeInfos) {
(*newRing.mutable_add_node_info())[info.first] = std::move(info.second);
}
RangeSearchResult res;
RETURN_IF_NOT_OK(etcdStore_->CAS(
ETCD_RING_PREFIX, "",
[&newRing, &currRing](const std::string &oldValue, std::unique_ptr<std::string> &newValue, bool & ) {
HashRingPb oldRing;
if (!oldRing.ParseFromString(oldValue)) {
LOG(WARNING) << "Failed to parse HashRingPb from string. give up and wait for next time if needed.";
return Status::OK();
}
google::protobuf::util::MessageDifferencer differencer;
differencer.set_repeated_field_comparison(google::protobuf::util::MessageDifferencer::AS_SET);
if (!differencer.Compare(oldRing, currRing)) {
VLOG(1) << "ring has changed. give up and wait for next time if needed.";
return Status::OK();
}
newValue = std::make_unique<std::string>(newRing.SerializeAsString());
return Status::OK();
},
res));
LOG_IF(INFO, res.modRevision > 0) << "[Ring] Write the scale up task of " << VectorToString(workers)
<< " into etcd success with version " << res.modRevision;
return Status::OK();
}
Status HashRing::GetPrimaryWorkerUuid(const std::string &key, std::string &outWorkerUuid,
std::optional<RouteInfo> &routeInfo) const
{
uint32_t hash = hashFunction_(key);
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
std::string workerAddr;
RETURN_IF_NOT_OK(GetPrimaryWorkerAddrNoLock(hash, workerAddr, routeInfo));
return GetUuidByWorkerAddrNoLock(workerAddr, outWorkerUuid);
}
Status HashRing::GetPrimaryWorkerAddr(uint32_t keyHash, std::string &outWorkerAddr) const
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
std::optional<RouteInfo> routeInfo;
return GetPrimaryWorkerAddrNoLock(keyHash, outWorkerAddr, routeInfo);
}
Status HashRing::GetPrimaryWorkerAddrNoLock(uint32_t keyHash, std::string &outWorkerAddr,
std::optional<RouteInfo> &routeInfo) const
{
if (tokenMap_.empty()) {
RETURN_STATUS(K_NOT_READY, "HashRing not ready, the token map is empty.");
}
auto iter = tokenMap_.upper_bound(keyHash);
if (iter != tokenMap_.end()) {
if (routeInfo) {
routeInfo->currHashRingVersion = currHashRingVersion_;
routeInfo->payload = std::make_pair(
iter == tokenMap_.begin() ? tokenMap_.rbegin()->first : std::prev(iter)->first, iter->first);
}
VLOG(1) << "GetPrimaryWorker for hash:" << keyHash << ", the result token:" << iter->first
<< ", the result worker is:" << iter->second;
outWorkerAddr = iter->second;
return Status::OK();
}
VLOG(1) << "GetPrimaryWorker for hash:" << keyHash << ", the result token:" << tokenMap_.begin()->first
<< ", the result worker is:" << tokenMap_.begin()->second;
if (routeInfo) {
routeInfo->currHashRingVersion = currHashRingVersion_;
routeInfo->payload = std::make_pair(tokenMap_.rbegin()->first, tokenMap_.begin()->first);
}
outWorkerAddr = tokenMap_.begin()->second;
return Status::OK();
}
Status HashRing::HandleRingEvent(const mvccpb::Event &event, const std::string &prefix)
{
if (event.type() == mvccpb::Event_EventType::Event_EventType_DELETE) {
return Status::OK();
}
if (hashRingHealthCheck_ != nullptr) {
hashRingHealthCheck_->UpdateRing(event.kv().value(), event.kv().mod_revision());
}
if (event.kv().key() == (prefix + "/")) {
RETURN_IF_NOT_OK_APPEND_MSG(UpdateRing(event.kv().value(), event.kv().mod_revision()),
"UpdateRing failed when WatchKey");
}
return Status::OK();
}
bool HashRing::CheckAllAddNodeFinishWhenSrcFailed(const std::string &workerAddr,
const std::unordered_set<std::string> &failedWorkers)
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
if (ringInfo_.add_node_info().empty()
|| ringInfo_.add_node_info().find(workerAddr) != ringInfo_.add_node_info().end()) {
return true;
}
for (auto &i : ringInfo_.add_node_info()) {
for (auto &range : i.second.changed_ranges()) {
if (!range.finished() && failedWorkers.find(range.workerid()) == failedWorkers.end()) {
return false;
}
}
}
return true;
}
bool HashRing::IsInRange(const HashRange &ranges, const std::string &objKey)
{
return HashInRange(ranges, objKey);
}
bool HashRing::HashInRange(const HashRange &ranges, const std::string &objKey)
{
uint32_t hash = hashFunction_(objKey);
for (auto &range : ranges) {
if (range.first > range.second) {
if (hash > range.first || hash <= range.second) {
return true;
}
} else {
if (hash > range.first && hash <= range.second) {
return true;
}
}
}
return false;
}
void HashRing::RecoverMigrationTask(const std::string &node)
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
if (ringInfo_.add_node_info().find(node) != ringInfo_.add_node_info().end()) {
LOG(INFO) << "Recover the scale up task for " << node;
taskExecutor_->SubmitOneScaleUpTask({ node, ringInfo_.add_node_info().at(node) }, true);
}
}
bool HashRing::CheckWorkerIsScaleDown(const std::string &workerAddr) const
{
std::set<std::string> workers;
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
if (ringInfo_.workers().find(workerAddr) == ringInfo_.workers().end()
|| ringInfo_.del_node_info().find(workerAddr) != ringInfo_.del_node_info().end()) {
return true;
}
return false;
}
Status HashRing::ProcessUpdateRingEventIfRunning(const HashRingPb &oldRing, const HashRingPb &newRing)
{
if (IncrementalAddNodeInfo(oldRing, newRing)) {
return taskExecutor_->SubmitScaleUpTask(newRing);
}
* scale down begin.
* scale down task must before scale up finish, recover may be failed if scale down begin
* when scale up finish.
*/
if (IncrementalDelNodeInfo(oldRing, newRing)) {
return taskExecutor_->SubmitScaleDownTask(newRing);
}
if (!oldRing.add_node_info().empty() && newRing.add_node_info().empty()) {
LOG(INFO) << "Update tokens due to scale up finish: "
<< VectorToString(GetKeysFromPairsContainer(oldRing.add_node_info()));
taskExecutor_->SubmitVoluntaryRecoveryAsyncTask(oldRing, newRing);
}
return Status::OK();
}
Status HashRing::ProcessUpdateRingEventIfPreparing(const HashRingPb &oldRing, const HashRingPb &newRing)
{
* scale down begin.
* scale down task must before scale up finish, recover may be failed if scale down begin
* when scale up finish.
*/
if (IncrementalDelNodeInfo(oldRing, newRing)) {
return taskExecutor_->SubmitScaleDownTask(newRing);
}
if (!oldRing.add_node_info().empty() && newRing.add_node_info().empty()) {
taskExecutor_->SubmitVoluntaryRecoveryAsyncTask(oldRing, newRing);
}
return Status::OK();
}
bool HashRing::CheckReceiveMigrateInfo(const std::string &workerAddr)
{
INJECT_POINT("hashRing.noNeedToCheckForTest", []() { return true; });
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
return ringInfo_.add_node_info().find(workerAddr) != ringInfo_.add_node_info().end();
}
static std::string GetUnProcessedNodeInfo(const google::protobuf::Map<std::string, datasystem::ChangeNodePb> &nodeInfo)
{
std::string fullLog;
for (auto &pair : nodeInfo) {
std::string detail;
for (auto &range : pair.second.changed_ranges()) {
if (!range.finished()) {
detail += range.workerid() + ",";
}
}
fullLog += FormatString("{%s:[%s]}, ", pair.first, detail);
}
return fullLog;
}
std::string HashRing::SummarizeHashRing(const HashRingPb &ring)
{
int activeNum = 0;
int joiningNum = 0;
int initNum = 0;
int leavingNum = 0;
std::string joiningNodes;
std::string leavingNodes;
for (auto &i : ring.workers()) {
if (i.second.state() == WorkerPb::ACTIVE) {
++activeNum;
} else if (i.second.state() == WorkerPb::JOINING) {
++joiningNum;
joiningNodes += i.first + ',';
} else if (i.second.state() == WorkerPb::INITIAL) {
++initNum;
} else if (i.second.state() == WorkerPb::LEAVING) {
++leavingNum;
leavingNodes += i.first + ',';
}
}
return FormatString(
"Ring summarize: total:%d, active:%d, joining:%d(%s), initial:%d, leaving:%d(%s) add_node_info: %d(%s), "
"del_node_info: %d(%s)",
ring.workers_size(), activeNum, joiningNum, joiningNodes, initNum, leavingNum, leavingNodes,
ring.add_node_info_size(), GetUnProcessedNodeInfo(ring.add_node_info()), ring.del_node_info_size(),
GetUnProcessedNodeInfo(ring.del_node_info()));
}
void HashRing::SubmitMigrateDataTaskIfNeed(const HashRingPb &ring)
{
if (taskExecutor_->IsMigrateDataTaskRunning()) {
return;
}
auto iter = ring.workers().find(workerAddr_);
if (iter == ring.workers().end()) {
return;
}
if (iter->second.hash_tokens().empty() && iter->second.state() == WorkerPb::LEAVING) {
const int logInterval = 10;
LOG_EVERY_T(INFO, logInterval) << "Worker " << workerAddr_
<< " meta data migrate finish, begin to trigger data migration.";
if (HashRingEvent::DataMigrationReady::GetInstance().NotifyAll().IsOk()) {
dataMigrationStarted_ = true;
LOG_IF_ERROR(taskExecutor_->SubmitMigrateDataTask(), "Migrate data task failed");
}
}
}
void HashRing::ProcessUpdateRingEventIfLeaving(const HashRingPb &oldRing, const HashRingPb &newRing)
{
if (newRing.workers().find(workerAddr_) == newRing.workers().end()) {
int waitTimeMs = 1000;
std::this_thread::sleep_for(std::chrono::milliseconds(waitTimeMs));
needVoluntaryScaleDown_ = false;
voluntaryScaleDownDone_ = true;
}
* scale down begin.
* scale down task must before scale up finish, recover may be failed if scale down begin
* when scale up finish.
*/
if (IncrementalDelNodeInfo(oldRing, newRing)) {
HASH_RING_LOG_IF_ERROR(taskExecutor_->SubmitScaleDownTask(newRing), "scale down task failed");
}
if (IncrementalAddNodeInfo(oldRing, newRing)) {
LOG(INFO) << "Begin to migrate voluntary scale down meta data.";
INJECT_POINT("hashRing.voluntary.sleep", [](int time) {
std::this_thread::sleep_for(std::chrono::seconds(time));
return;
});
HASH_RING_LOG_IF_ERROR(taskExecutor_->SubmitScaleUpTask(newRing), "scale up task failed");
}
SubmitMigrateDataTaskIfNeed(newRing);
}
bool HashRing::SkipUpdateRing(const HashRingPb &newRing, int64_t version, bool forceUpdate)
{
if (!forceUpdate && version <= baselineModRevisionOfRing_.load()) {
LOG(INFO) << "Ignore ring update of version " << version
<< " because it's out-of-date. Ring summarize is: " << SummarizeHashRing(newRing);
return true;
}
if (!forceUpdate) {
baselineModRevisionOfRing_ = INT64_MIN;
}
if (state_ == FAIL) {
LOG(INFO) << "The node has failed. Wait for exiting, will not update ring anymore. Version: " << version;
return true;
}
return false;
}
void HashRing::UpdateLocalRing(const HashRingPb &newRing, bool forceUpdate)
{
ringInfo_.CopyFrom(newRing);
UpdateTokenMap();
UpdateLocalState(forceUpdate);
}
Status HashRing::UpdateRing(const std::string &newSerializedRingInfo, int64_t version, bool forceUpdate)
{
INJECT_POINT("HashRing.UpdateRing.sleep");
HashRingPb newRing;
if (!newRing.ParseFromString(newSerializedRingInfo)) {
return Status(K_RUNTIME_ERROR, "Failed to parse RingInfoPb");
}
HashRingPb oldRing;
{
std::lock_guard<std::shared_timed_mutex> lock(mutex_);
RETURN_OK_IF_TRUE(SkipUpdateRing(newRing, version, forceUpdate));
currHashRingVersion_++;
LOG(INFO) << "Update ring of version " << version << ". " << SummarizeHashRing(newRing);
auto lines = SplitRingJson(FormatString("Worker %s update local hash ring to", workerAddr_), newRing);
std::for_each(lines.begin(), lines.end(), [](const std::string &line) { LOG(INFO) << line; });
INJECT_POINT("HashRing.NotReceiveUpdate", [&oldRing, &newRing](uint32_t time_s) {
if (oldRing.add_node_info().empty() && !newRing.add_node_info().empty()) {
std::this_thread::sleep_for(std::chrono::seconds(time_s));
}
return Status::OK();
});
oldRing = std::move(ringInfo_);
UpdateLocalRing(newRing, forceUpdate);
}
if (!forceUpdate && !oldRing.cluster_has_init() && newRing.cluster_has_init()) {
ProcessForClusterInit();
}
ProcessForScaleDownFinish(oldRing, newRing);
ProcessForScaleupFinish(oldRing, newRing);
if (forceUpdate || state_ == FAIL || state_ == NO_INIT) {
return Status::OK();
}
if (EnableAutoDelDeadNode()) {
HashRingEvent::SyncClusterNodes::GetInstance().NotifyAll(GetKeysFromPairsContainer(newRing.workers()));
}
Status status = Status::OK();
switch (state_) {
case NO_INIT:
break;
case PRE_LEAVING:
ProcessUpdateRingEventIfLeaving(oldRing, newRing);
break;
case RUNNING:
status = ProcessUpdateRingEventIfRunning(oldRing, newRing);
break;
case INIT:
case PRE_RUNNING:
status = ProcessUpdateRingEventIfPreparing(oldRing, newRing);
break;
case FAIL:
break;
}
return status;
}
void HashRing::ProcessForClusterInit()
{
std::string nextWorkerUuid;
Status rc = GetNextWorker(workerUuid_, nextWorkerUuid);
if (rc.IsError()) {
LOG(ERROR) << "Get next worker failed:" << rc.ToString();
}
LOG(INFO) << "Cluster init finish, current worker uuid:" << workerUuid_ << ", next worker uuid:" << nextWorkerUuid;
HashRingEvent::ClusterInitFinish::GetInstance().NotifyAll(workerUuid_, nextWorkerUuid);
}
Status HashRing::ProcessForScaleDownFinish(HashRingPb &oldRing, HashRingPb &newRing)
{
std::vector<std::string> needDeleteDbNames;
auto oldWorkerInfos = GetWorkersFromHashRingPb(oldRing);
for (const auto &worker : oldWorkerInfos) {
if (newRing.workers().contains(worker.first)) {
continue;
}
std::string dbName = worker.second;
if (dbName.empty()) {
LOG(WARNING) << "The uuid is empty for " << worker.first;
continue;
}
needDeleteDbNames.emplace_back(dbName);
}
if (!needDeleteDbNames.empty()) {
HashRingEvent::ScaleDownFinish::GetInstance().NotifyAll(needDeleteDbNames);
}
return Status::OK();
}
void HashRing::ProcessForScaleupFinish(const HashRingPb &oldRing, const HashRingPb &newRing)
{
for (const auto &worker : oldRing.workers()) {
if (worker.second.state() != WorkerPb::JOINING) {
continue;
}
auto iter = newRing.workers().find(worker.first);
if (iter == newRing.workers().end()) {
LOG(WARNING) << "worker " << worker.first << " not exists in new ring.";
continue;
}
if (iter->second.state() == WorkerPb::ACTIVE) {
LOG(INFO) << "Worker " << worker.first << " uuid:" << iter->second.worker_uuid() << " scaleup finish.";
HashRingEvent::ScaleupFinish::GetInstance().NotifyAll(iter->second.worker_uuid());
}
}
}
bool HashRing::ChangeStateTo(HashState newState, std::string log)
{
static auto toString = [](HashState state) {
switch (state) {
case NO_INIT:
return "NO_INIT";
case INIT:
return "INIT";
case PRE_RUNNING:
return "PRE_RUNNING";
case RUNNING:
return "RUNNING";
case PRE_LEAVING:
return "PRE_LEAVING";
case FAIL:
return "FAIL";
default:
return "UNKNOWN";
}
};
if (newState != state_.load()) {
if (state_ == FAIL) {
LOG(ERROR) << FormatString(
"Skip convert hash ring local state from %s to %s because FAIL state is the terminate state.",
toString(state_.load()), toString(newState));
return false;
}
LOG(INFO) << FormatString("Convert hash ring local state from %s to %s. %s", toString(state_.load()),
toString(newState), log);
state_ = newState;
return true;
}
return false;
}
void HashRing::UpdateLocalState(bool forceUpdate)
{
if (IsCentralized()) {
ChangeStateTo(NO_INIT);
return;
}
if (state_ == FAIL) {
LOG(INFO) << "Skip update local state because this node is ready to quit.";
return;
}
auto iter = ringInfo_.workers().find(workerAddr_);
if (iter == ringInfo_.workers().end()) {
if (state_ == PRE_LEAVING) {
LOG(INFO) << "voluntary remove from hash ring success.";
return;
}
if (forceUpdate && ringInfo_.cluster_has_init()) {
ChangeStateTo(PRE_RUNNING);
return;
}
ChangeStateTo(FAIL, "The current node cannot find in the hash ring anymore.");
return;
}
if (ContainsKey(ringInfo_.del_node_info(), workerAddr_)) {
ChangeStateTo(FAIL, "There is a running scale down task on this node.");
return;
}
auto stateInEtcd = iter->second.state();
if (stateInEtcd == WorkerPb::ACTIVE) {
ChangeStateTo(RUNNING);
} else if (stateInEtcd == WorkerPb::LEAVING) {
ChangeStateTo(PRE_LEAVING);
} else if ((stateInEtcd == WorkerPb::INITIAL || stateInEtcd == WorkerPb::JOINING) && ringInfo_.cluster_has_init()) {
ChangeStateTo(PRE_RUNNING);
} else {
ChangeStateTo(INIT);
}
if (iter->second.need_scale_down()) {
if (forceUpdate && stateInEtcd == WorkerPb::ACTIVE) {
LOG(WARNING) << "Scale-in task that have not started will no longer be executed after restart";
} else {
needVoluntaryScaleDown_ = true;
}
}
}
void HashRing::UpdateTokenMap()
{
decltype(tokenMap_) tmpTokenMap;
decltype(workerUuidHashMap_) tmpWorkerUuidHashMap;
decltype(workerUuid2AddrMap_) tmpWorkerId2AddrMap;
decltype(relatedWorkerMap_) tmpRelatedWorkerMap;
decltype(workerAddr2UuidMap_) tmpAddrToWorkerIdMap;
for (const auto &kv : ringInfo_.workers()) {
if (kv.second.state() != WorkerPb::ACTIVE && kv.second.state() != WorkerPb::LEAVING) {
continue;
}
const auto &workerId = kv.first;
for (auto token : kv.second.hash_tokens()) {
tmpTokenMap.insert({ token, workerId });
}
if (kv.second.worker_uuid().empty()) {
continue;
}
uint32_t hash = hashFunction_(kv.second.worker_uuid());
tmpWorkerUuidHashMap.insert({ hash, workerId });
}
GenerateHashRingUuidMap(ringInfo_, tmpWorkerId2AddrMap, tmpAddrToWorkerIdMap, tmpRelatedWorkerMap);
std::stringstream log;
if (tokenMap_ != tmpTokenMap) {
tokenMap_ = std::move(tmpTokenMap);
log << "Update token map to size: " << tokenMap_.size() << ". ";
SaveHashRange();
}
if (workerUuidHashMap_ != tmpWorkerUuidHashMap) {
workerUuidHashMap_ = std::move(tmpWorkerUuidHashMap);
log << "Update workerid hash map to size: " << workerUuidHashMap_.size() << ". ";
}
if (tmpAddrToWorkerIdMap != workerAddr2UuidMap_ || tmpWorkerId2AddrMap != workerUuid2AddrMap_
|| tmpRelatedWorkerMap != relatedWorkerMap_) {
workerAddr2UuidMap_ = std::move(tmpAddrToWorkerIdMap);
workerUuid2AddrMap_ = std::move(tmpWorkerId2AddrMap);
relatedWorkerMap_ = std::move(tmpRelatedWorkerMap);
log << "Update {workerAddr, workerId} map to size: " << workerAddr2UuidMap_.size() << ". "
<< "Update {workerId, workerAddr} map for route to size:" << workerUuid2AddrMap_.size() << ". ";
}
if (log.rdbuf()->in_avail() > 0) {
LOG(INFO) << log.str();
}
}
bool HashRing::NeedRedirect(const std::string &objKey, HostPort &masterAddr)
{
if (IsCentralized()) {
return false;
}
{
HashRange ranges;
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
for (auto &i : ringInfo_.add_node_info()) {
for (auto &r : i.second.changed_ranges()) {
ranges.emplace_back(r.from(), r.end());
}
if (IsInRange(ranges, objKey)) {
LOG_IF_ERROR(masterAddr.ParseString(i.first), "parse failed: " + i.first);
VLOG(1) << "adding node, return new master: " << masterAddr.ToString();
return true;
}
ranges.clear();
}
}
HostPort currMetaAddr;
auto status = GetMasterAddr(objKey, currMetaAddr);
if (status.IsError() || currMetaAddr.ToString() == workerAddr_) {
LOG_IF_ERROR(status, "The server fails to check the master address and does not redirect.");
return false;
}
VLOG(1) << "add node finish, return new master";
masterAddr = currMetaAddr;
return true;
}
Status HashRing::GetWorkerAddrByUuidForAddressing(const std::string &workerUuid, HostPort &workerAddr)
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
return ::worker::GetWorkerAddrByUuidForAddressing(workerUuid2AddrMap_, workerUuid, workerAddr);
}
Status HashRing::GetWorkerAddrByUuidForMetadata(const std::string &workerUuid, HostPort &workerAddr)
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
return ::worker::GetWorkerAddrByUuidForMetadata(workerUuid2AddrMap_, workerUuid, workerAddr);
}
Status HashRing::GetUuidByWorkerAddr(const std::string &workerAddr, std::string &uuid)
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
return GetUuidByWorkerAddrNoLock(workerAddr, uuid);
}
Status HashRing::GetUuidByWorkerAddrNoLock(const std::string &workerAddr, std::string &uuid) const
{
auto it = workerAddr2UuidMap_.find(workerAddr);
CHECK_FAIL_RETURN_STATUS(it != workerAddr2UuidMap_.end(), K_NOT_FOUND, "Cannot find node with addr " + workerAddr);
uuid = it->second;
return Status::OK();
}
std::string HashRing::GetLocalWorkerUuid() const
{
return workerUuid_;
}
Status HashRing::GetMasterAddr(const std::string &objKey, HostPort &masterAddr)
{
std::string masterUuid;
RETURN_IF_NOT_OK(GetMasterUuid(objKey, masterUuid));
return GetWorkerAddrByUuidForAddressing(masterUuid, masterAddr);
}
Status HashRing::GetMasterUuid(const std::string &objKey, std::string &masterUuid)
{
RETURN_IF_NOT_OK(WaitWorkable());
std::optional<RouteInfo> routeInfo;
RETURN_IF_NOT_OK(GetPrimaryWorkerUuid(objKey, masterUuid, routeInfo));
return Status::OK();
}
void HashRing::SaveHashRange()
{
auto ranges = BuildHashRangeForWorkerNoLock(workerAddr_);
std::unique_lock<std::mutex> lock(hashRangeMutex_);
hashRange_ = std::move(ranges);
}
HashRange HashRing::GetHashRangeByWorker(const std::string &workerAddr)
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
return BuildHashRangeForWorkerNoLock(workerAddr);
}
HashRange HashRing::BuildHashRangeForWorkerNoLock(const std::string &workerAddr) const
{
HashPosition curr = 0;
bool needExtra = false;
HashRange ranges;
for (auto iter = tokenMap_.begin(); iter != tokenMap_.end(); ++iter) {
if (iter->second == workerAddr) {
needExtra |= (curr == 0);
auto beg = curr;
auto end = iter->first == 0 ? 0 : iter->first - 1;
if (beg == 0 && end == 0) {
continue;
}
ranges.emplace_back(beg, end);
}
curr = iter->first;
}
if (needExtra) {
ranges.emplace_back(curr, UINT32_MAX);
}
return ranges;
}
bool HashRing::NeedToTryRemoveWorker(const std::string &workerAddr, const std::unordered_set<std::string> &failWorkers)
{
if (workerAddr == workerAddr_) {
LOG_FIRST_N(ERROR, 1)
<< "Maybe suffered a passive scale down due to network connectivity issues or current worker not "
"exists in hash ring. Restart.";
SetUnhealthy();
if (state_.load() == PRE_LEAVING || voluntaryScaleDownDone_) {
INJECT_POINT("notExit", [] { return false; });
voluntaryScaleDownDone_ = true;
(void)raise(SIGTERM);
} else {
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
auto workerInRing = ringInfo_.workers().find(workerAddr_);
auto workerInRingStr =
workerInRing == ringInfo_.workers().end() ? "NOT_FOUND" : workerInRing->second.ShortDebugString();
LOG(ERROR) << FormatString(
"Worker is ready for passive scaling down, worker state_ is: %d, worker in ring: %s, del_node_info: "
"%s",
state_.load(), workerInRingStr, MapToString(ringInfo_.del_node_info()));
lock.unlock();
Provider::Instance().FlushLogs();
(void)raise(SIGKILL);
}
return false;
}
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
if (!ContainsKey(ringInfo_.workers(), workerAddr)) {
LOG(INFO) << workerAddr << " no longer exist in hash ring, try to remove from the cluster node table";
lock.unlock();
return false;
}
if (ContainsKey(ringInfo_.del_node_info(), workerAddr)) {
return false;
}
return CheckAllAddNodeFinishWhenSrcFailed(workerAddr, failWorkers);
}
void HashRing::RemoveWorkers(const std::unordered_set<std::string> &workers)
{
if (!EnableAutoDelDeadNode()) {
return;
}
bool expected = true;
(void)needForceJoin_.compare_exchange_strong(expected, false);
Status rc;
if (state_ == FAIL || (voluntaryScaleDownDone_ && !allWorkersVoluntaryScaleDown_)) {
rc = RemoveWorker(workerAddr_, workers);
if (rc.GetCode() == StatusCode::K_INVALID) {
LOG(WARNING) << "Remove worker " << workerAddr_ << " failed, detail: " << rc.ToString();
return;
}
}
for (auto &i : workers) {
rc = RemoveWorker(i, workers);
WARN_IF_ERROR(
rc, FormatString("Remove failed worker %s failed. All failed workers: %s", i, VectorToString(workers)));
if (rc.GetCode() == StatusCode::K_INVALID) {
LOG(WARNING) << "Remove worker " << i << " failed, detail: " << rc.ToString();
return;
}
}
}
static void LogOnce(const std::string &flag, const std::string &log)
{
static std::string lastFlag;
static std::vector<std::string> lastLogs;
if (lastFlag == flag) {
if (ContainsKey(lastLogs, log)) {
return;
}
} else {
lastFlag = flag;
lastLogs.clear();
}
lastLogs.push_back(log);
LOG(INFO) << log;
}
Status HashRing::GetWorkerUuidFromRing(const std::string &workerAddr, std::string &uuid)
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
auto iter = ringInfo_.workers().find(workerAddr);
if (iter == ringInfo_.workers().end()) {
return Status(K_NOT_FOUND, FormatString("Can't found the worker on ring", workerAddr));
}
uuid = iter->second.worker_uuid();
return Status::OK();
}
Status HashRing::RemoveWorker(const std::string &workerAddr, const std::unordered_set<std::string> &failWorkers)
{
RETURN_OK_IF_TRUE(!NeedToTryRemoveWorker(workerAddr, failWorkers));
std::string uuidOfRemoveWorker;
if (GetWorkerUuidFromRing(workerAddr, uuidOfRemoveWorker).IsError()) {
return Status::OK();
}
std::set<std::string> processWorkers;
RETURN_IF_NOT_OK(GetProcessWorkerForRemoveWorker(uuidOfRemoveWorker, failWorkers, processWorkers));
INJECT_POINT("ProcessWorkerIsLocal", [&processWorkers, this]() {
processWorkers.emplace(workerAddr_);
return Status::OK();
});
if (!ContainsKey(processWorkers, workerAddr_)) {
LogOnce(VectorToString(failWorkers),
FormatString("select [%s] to write the del_node_info of [%s]. skip..(failed workers: %s)",
VectorToString(std::vector<std::string>(processWorkers.begin(), processWorkers.end())),
workerAddr, VectorToString(failWorkers)));
return Status::OK();
}
VLOG(1) << "Try to write the scale down task of " << workerAddr << " into etcd";
RangeSearchResult res;
auto ret = etcdStore_->CAS(
ETCD_RING_PREFIX, "",
[this, &workerAddr](const std::string &oldValue, std::unique_ptr<std::string> &newValue, bool &retry) {
HashRingPb currRing;
if (!currRing.ParseFromString(oldValue)) {
return Status(K_RUNTIME_ERROR, "Failed to parse HashRingPb from string");
}
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
google::protobuf::util::MessageDifferencer differencer;
differencer.set_repeated_field_comparison(google::protobuf::util::MessageDifferencer::AS_SET);
if (!differencer.Compare(ringInfo_, currRing)) {
retry = false;
return Status(K_INVALID, "ringInfo is not latest");
}
auto worker = currRing.mutable_workers()->find(workerAddr);
if (worker == currRing.mutable_workers()->end()
|| currRing.del_node_info().find(workerAddr) != currRing.del_node_info().end()) {
return Status::OK();
}
if (worker->second.state() == WorkerPb::INITIAL) {
currRing.mutable_del_node_info()->insert({ workerAddr, {} });
} else {
auto rc = HashRingAllocator(currRing).RemoveNode(workerAddr, currRing);
if (rc.IsError()) {
LOG(WARNING) << FormatString("Remove worker %s failed: %s.", workerAddr, rc.ToString());
needForceJoin_ = true;
if (needVoluntaryScaleDown_) {
LOG(WARNING) << "Give up voluntary scale down due to no available nodes.";
voluntaryScaleDownDone_ = true;
}
return Status::OK();
}
}
if (google::protobuf::util::MessageDifferencer::Equals(ringInfo_, currRing)) {
LOG(INFO) << "skip write the repeat ring into etcd";
return Status::OK();
}
newValue = std::make_unique<std::string>(currRing.SerializeAsString());
INJECT_POINT("hashRing.removeWorker");
return Status::OK();
},
res);
LOG_IF(INFO, res.modRevision > 0) << "[Ring] Write the scale down task of " << workerAddr
<< " into etcd success with version " << res.modRevision << ".";
return ret;
}
Status HashRing::GetHashRingWorkerNum(int &workerNum, bool isFromOtherAz) const
{
workerNum = 0;
if (!IsCentralized()) {
CHECK_FAIL_RETURN_STATUS(IsWorkable(), K_NOT_READY, "HashRing is not workable. Call again later.");
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
auto findLocalWorker = ringInfo_.workers().find(workerAddr_) != ringInfo_.workers().end();
if (!isFromOtherAz) {
CHECK_FAIL_RETURN_STATUS(
findLocalWorker, K_NOT_READY,
FormatString("Local worker not in hashring, worker not ready. workerAddr: %s", workerAddr_));
}
for (auto &i : ringInfo_.workers()) {
if (ringInfo_.del_node_info().find(i.first) == ringInfo_.del_node_info().end()) {
workerNum++;
}
}
} else {
static const int DEFAULT_NUM_CENTRALIZED = -1;
workerNum = DEFAULT_NUM_CENTRALIZED;
}
return Status::OK();
}
std::set<std::string> HashRing::GetValidWorkersInHashRing() const
{
std::set<std::string> workers;
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
for (auto &i : ringInfo_.workers()) {
if (ringInfo_.del_node_info().find(i.first) == ringInfo_.del_node_info().end()) {
workers.emplace(i.first);
}
}
return workers;
}
std::set<std::string> HashRing::GetActiveWorkersInHashRing() const
{
std::set<std::string> workers;
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
for (const auto &worker : ringInfo_.workers()) {
if (ringInfo_.del_node_info().find(worker.first) != ringInfo_.del_node_info().end()) {
continue;
}
if (worker.second.state() != WorkerPb::ACTIVE || worker.second.need_scale_down()) {
continue;
}
workers.emplace(worker.first);
}
return workers;
}
std::set<std::string> HashRing::GetWorkersInDelNodeInfo() const
{
std::set<std::string> workers;
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
for (auto &i : ringInfo_.del_node_info()) {
workers.emplace(i.first);
}
return workers;
}
Status HashRing::GetStandbyWorkerByAddr(const std::string &workerAddr, std::string &nextWorker)
{
std::string uuid;
RETURN_IF_NOT_OK(GetUuidByWorkerAddr(workerAddr, uuid));
return GetStandbyWorkerByUuid(uuid, nextWorker);
}
Status HashRing::GetStandbyWorkerByUuid(const std::string &uuid, std::string &standbyWorker)
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
uint32_t hash = hashFunction_(uuid);
auto iter = workerUuidHashMap_.upper_bound(hash);
if (iter == workerUuidHashMap_.end()) {
auto smallest = workerUuidHashMap_.upper_bound(0);
if (smallest == workerUuidHashMap_.end() || smallest->first == hash) {
return Status(K_RUNTIME_ERROR,
FormatString("Standby worker not found on the ring, localWorker is (%s).", workerAddr_));
}
standbyWorker = smallest->second;
} else {
standbyWorker = iter->second;
}
return Status::OK();
}
Status HashRing::GetStandbyWorker(std::string &standbyWorker)
{
return GetStandbyWorkerByUuid(workerUuid_, standbyWorker);
}
Status HashRing::GetActiveWorkers(uint32_t num, std::vector<std::string> &activeWorkers)
{
activeWorkers.clear();
if (num == 0) {
RETURN_STATUS(StatusCode::K_INVALID, "required number is 0");
}
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
if (ringInfo_.workers().empty()) {
constexpr int logInterval = 30;
LOG_EVERY_T(WARNING, logInterval) << "worker in hash ring is empty";
return Status::OK();
}
auto currIt = ringInfo_.workers().begin();
uint32_t randomStep = RandomData().GetRandomUint32(0, ringInfo_.workers_size() - 1);
std::advance(currIt, randomStep);
for (auto it = currIt; it != ringInfo_.workers().end(); ++it) {
if (it->first == workerAddr_) {
continue;
}
if (it->second.state() != WorkerPb::ACTIVE || it->second.need_scale_down()
|| ringInfo_.del_node_info().find(it->first) != ringInfo_.del_node_info().end()) {
continue;
}
activeWorkers.emplace_back(it->first);
if (activeWorkers.size() >= num) {
break;
}
}
if (activeWorkers.size() >= num) {
return Status::OK();
}
for (auto it = ringInfo_.workers().begin(); it != currIt; ++it) {
if (it->first == workerAddr_) {
continue;
}
if (it->second.state() != WorkerPb::ACTIVE || it->second.need_scale_down()
|| ringInfo_.del_node_info().find(it->first) != ringInfo_.del_node_info().end()) {
continue;
}
activeWorkers.emplace_back(it->first);
if (activeWorkers.size() >= num) {
break;
}
}
return Status::OK();
}
Status HashRing::GetStandbyWorkerExceptNoLock(const std::string &uuid,
const std::unordered_set<std::string> &excludeAddrs,
std::string &standbyWorker)
{
Status err =
Status(K_RUNTIME_ERROR, FormatString("Standby worker not found on the ring, workerUuid is (%s).", uuid));
uint32_t hash = hashFunction_(uuid);
auto iter = workerUuidHashMap_.upper_bound(hash);
if (iter == workerUuidHashMap_.end()) {
iter = workerUuidHashMap_.begin();
}
auto startIter = iter;
do {
if (excludeAddrs.find(iter->second) == excludeAddrs.end() && iter->first != hash) {
standbyWorker = iter->second;
return Status::OK();
}
iter++;
if (iter == workerUuidHashMap_.end()) {
iter = workerUuidHashMap_.begin();
}
} while (iter != startIter);
return err;
}
Status HashRing::GetProcessWorkerForRemoveWorker(const std::string &uuid,
const std::unordered_set<std::string> &failedWorkers,
std::set<std::string> &processWorkerAddrs)
{
std::string processWorkerUuid = uuid;
std::string processWorkerAddr;
Status rc;
const std::unordered_set<WorkerPb::StatePb> states{ WorkerPb::ACTIVE, WorkerPb::LEAVING, WorkerPb::JOINING,
WorkerPb::INITIAL };
for (auto i = 0; i < MAX_CANDIDATE_WORKER_NUM; i++) {
rc = GetRelatedWorkerImpl(processWorkerUuid, states, failedWorkers, true, processWorkerUuid, processWorkerAddr);
if (rc.IsError()) {
break;
}
processWorkerAddrs.emplace(processWorkerAddr);
}
if (processWorkerAddrs.empty()) {
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
for (const auto &worker : ringInfo_.workers()) {
if (!ContainsKey(states, worker.second.state()) || ContainsKey(failedWorkers, worker.first)) {
continue;
}
processWorkerAddrs.emplace(worker.first);
break;
}
}
return processWorkerAddrs.empty() ? rc : Status::OK();
}
Status HashRing::VoluntaryScaleDown()
{
INJECT_POINT("worker.VoluntaryScaleDown");
LOG(INFO) << "Begin to process voluntary scale down task";
auto funcHandler = [&](const std::string &oldValue, std::unique_ptr<std::string> &newValue, bool & ) {
HashRingPb oldRing;
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(oldRing.ParseFromString(oldValue), K_RUNTIME_ERROR,
"Failed to parse HashRingPb from string");
auto itr = oldRing.mutable_workers()->find(workerAddr_);
if (itr == oldRing.workers().end()) {
LOG(INFO) << "Can't find the worker in the hash ring, give up VoluntaryScaleDown.";
return Status::OK();
}
itr->second.set_need_scale_down(true);
newValue = std::make_unique<std::string>(oldRing.SerializeAsString());
return Status::OK();
};
return etcdStore_->CAS(ETCD_RING_PREFIX, "", funcHandler);
}
bool HashRing::EnableAutoDelDeadNode() const
{
return enableDistributedMaster_ && FLAGS_auto_del_dead_node;
}
Status HashRing::GetRelatedWorkerImpl(const std::string &currWorkerUuid,
const std::unordered_set<WorkerPb::StatePb> &states,
const std::unordered_set<std::string> &failedWorkers, bool getNextNode,
std::string &outWorkerUuid, std::string &outWorkerAddr)
{
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
CHECK_FAIL_RETURN_STATUS(relatedWorkerMap_.size() >= 1, K_NOT_FOUND, "Insufficient number of workers.");
auto iter = currWorkerUuid.empty() ? relatedWorkerMap_.begin() : relatedWorkerMap_.find(currWorkerUuid);
if (iter == relatedWorkerMap_.end()) {
return Status(K_NOT_FOUND, FormatString("The worker %s not found on the ring.", currWorkerUuid));
}
for (size_t i = 0; i <= relatedWorkerMap_.size(); i++) {
iter = getNextNode ? LoopNext(relatedWorkerMap_, iter) : LoopPrev(relatedWorkerMap_, iter);
if (iter == relatedWorkerMap_.end()) {
continue;
}
std::string workerAddr = iter->second.ToString();
auto worker = ringInfo_.workers().find(workerAddr);
if (worker == ringInfo_.workers().end()) {
LOG(WARNING) << "Worker " << workerAddr << " not exists in ringInfo_";
continue;
}
if (states.count(worker->second.state()) == 0) {
continue;
}
if (failedWorkers.find(workerAddr) != failedWorkers.end()) {
continue;
}
outWorkerUuid = iter->first;
outWorkerAddr = std::move(workerAddr);
return Status::OK();
}
return Status(K_NOT_FOUND, FormatString("The %s node of worker %s not found on the ring.",
(getNextNode ? "next" : "prev"), currWorkerUuid));
}
Status HashRing::GetNextWorker(const std::string &currWorkerUuid, std::string &nextWorkerUuid)
{
std::string nextWorkerAddr;
return GetRelatedWorkerImpl(currWorkerUuid, { WorkerPb::ACTIVE, WorkerPb::LEAVING }, {}, true, nextWorkerUuid,
nextWorkerAddr);
}
Status HashRing::GetPrevWorker(const std::string &currWorkerUuid, std::string &prevWorkerUuid)
{
std::string prevWorkerAddr;
return GetRelatedWorkerImpl(currWorkerUuid, { WorkerPb::ACTIVE, WorkerPb::LEAVING }, {}, false, prevWorkerUuid,
prevWorkerAddr);
}
Status HashRing::WaitWorkable()
{
RETURN_OK_IF_TRUE(IsWorkable());
const int retryNumMax = 3;
int retryNum = 0;
static const int SLEEP_TIME_MS = 200;
while (retryNum < retryNumMax) {
std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_TIME_MS));
retryNum += 1;
if (IsWorkable()) {
break;
} else if (!IsWorkable() && retryNum == retryNumMax) {
RETURN_STATUS(K_NOT_READY, FormatString("Hash ring not ready[state: %d], get workerId failed", state_));
}
}
return Status::OK();
}
}
}