* Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: Test interface to HashRing
*/
#include <google/protobuf/util/message_differencer.h>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <string>
#include <thread>
#include "common.h"
#include "datasystem/common/inject/inject_point.h"
#include "datasystem/common/util/container_util.h"
#include "datasystem/common/util/uuid_generator.h"
#include "datasystem/common/util/wait_post.h"
#include "datasystem/common/log/log.h"
#include "datasystem/worker/cluster_manager/etcd_cluster_manager.h"
#include "datasystem/worker/hash_ring/hash_ring.h"
#include "datasystem/worker/worker_cli.h"
using namespace datasystem::worker;
DS_DECLARE_string(etcd_address);
DS_DECLARE_string(master_address);
DS_DECLARE_uint32(add_node_wait_time_s);
DS_DECLARE_uint32(node_dead_timeout_s);
DS_DECLARE_bool(auto_del_dead_node);
namespace datasystem {
namespace st {
namespace {
struct Range {
uint32_t start;
uint32_t end;
};
WorkerPb MakeWorkerPb(std::initializer_list<uint32_t> &&tokens, WorkerPb::StatePb state = WorkerPb::ACTIVE)
{
WorkerPb pb;
for (auto token : tokens) {
pb.mutable_hash_tokens()->Add(std::move(token));
}
pb.set_state(state);
return pb;
}
void InsertWorker(HashRingPb &pb, const std::string &id, WorkerPb &&workerPb)
{
workerPb.set_worker_uuid(GetStringUuid());
pb.mutable_workers()->insert({ id, workerPb });
}
}
constexpr static int HASH_RING_NUM_TWO = 2;
constexpr static int HASH_RING_NUM_THREE = 3;
constexpr static int DEFAULT_ADD_NODE_WAIT_TIME_S = 3;
constexpr static int CHECK_INTERVAL_MS = 10;
class TestHashRing : public HashRing {
public:
* @brief Get the primary worker addr by consistent hash algorithm.
* @param[in] key Use the key to calculate the uint32 hash value, and then find the first node that is greater than
* the hash value on the consistent hash ring.
* @param[out] outWorkerAddr the outWorkerAddr is calc by consistent hash algorithm when enable consistent
* hash(enable_distribute_master is true and etcd_address is valid);
* @return Status of the call.
*/
Status GetPrimaryWorkerAddr(const std::string &key, std::string &outWorkerAddr) const;
* @brief Get the hash tokens of the hash ring.
* @return Return the hash tokens of the hash ring.
*/
std::vector<uint32_t> GetHashTokens() const;
* @brief Get the primary worker by consistent hash algorithm.
* @param[in] keyHash Find the first node that is greater than the keyHash on the consistent hash ring.
* @param[out] outWorkerAddr the outWorkerAddr is calc by consistent hash algorithm when enable consistent
* @return Status of the call.
*/
Status GetPrimaryWorkerAddr(uint32_t keyHash, std::string &outWorkerAddr) const;
};
Status TestHashRing::GetPrimaryWorkerAddr(const std::string &key, std::string &outWorkerAddr) const
{
uint32_t hash = hashFunction_(key);
return HashRing::GetPrimaryWorkerAddr(hash, outWorkerAddr);
}
std::vector<uint32_t> TestHashRing::GetHashTokens() const
{
std::vector<uint32_t> ret;
if (state_.load() != RUNNING) {
return {};
}
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
std::transform(tokenMap_.begin(), tokenMap_.end(), std::back_inserter(ret),
[](const auto &kv) { return kv.first; });
return ret;
}
Status TestHashRing::GetPrimaryWorkerAddr(uint32_t keyHash, std::string &outWorkerAddr) const
{
return HashRing::GetPrimaryWorkerAddr(keyHash, outWorkerAddr);
}
class HashRingTest : public ExternalClusterTest {
protected:
HashRingTest()
{
}
~HashRingTest() = default;
void SetClusterSetupOptions(ExternalClusterOptions &opts) override
{
opts.numEtcd = 1;
opts.numMasters = 0;
opts.numWorkers = 0;
FLAGS_v = 1;
FLAGS_add_node_wait_time_s = DEFAULT_ADD_NODE_WAIT_TIME_S;
}
void TearDown() override
{
exit_ = true;
for (auto &cm : etcdCMs_) {
if (cm) {
cm->Shutdown();
cm.reset();
}
}
for (auto &db : etcdStores_) {
if (db) {
db->Shutdown();
db.reset();
}
}
ExternalClusterTest::TearDown();
}
protected:
void InitTestEtcdInstance();
void PutRingToEtcd(const std::string &jsonRing);
void InitRing(uint32_t workerNum);
void ResetRing();
void RestartRing(int workerIndex);
void CheckAllRunning();
void CheckAllRunningAndTokens(const std::vector<uint32_t> &expectTokens);
void CheckRange(const std::vector<std::vector<Range>> &workerRange);
void CheckRingInEtcd(const std::string &jsonRing, int timeoutMs);
TestHashRing *ConstructAndGetRing(const std::string &addrStr)
{
HostPort addr;
addr.ParseString(addrStr);
etcdStores_.emplace_back(std::make_unique<EtcdStore>(FLAGS_etcd_address));
etcdStores_.back()->Init();
etcdCMs_.emplace_back(std::make_unique<EtcdClusterManager>(addr, addr, etcdStores_.back().get(), nullptr));
rings_.emplace_back(static_cast<TestHashRing *>(etcdCMs_.back()->GetHashRing()));
ClusterInfo clusterInfo;
DS_EXPECT_OK(EtcdClusterManager::ConstructClusterInfoViaEtcd(etcdStores_.back().get(), clusterInfo));
DS_EXPECT_OK(etcdCMs_.back()->Init(clusterInfo));
etcdCMs_.back()->SetWorkerReady();
return static_cast<TestHashRing *>(etcdCMs_.back()->GetHashRing());
}
std::unique_ptr<EtcdStore> db_;
std::vector<std::unique_ptr<EtcdClusterManager>> etcdCMs_;
std::vector<std::unique_ptr<EtcdStore>> etcdStores_;
std::unique_ptr<ThreadPool> threadPool_{ nullptr };
std::vector<std::future<Status>> futures_;
std::vector<TestHashRing *> rings_;
std::vector<std::string> workerIds_;
std::atomic<bool> exit_{ false };
std::vector<uint32_t> threeWorkerFourVirtualNodeRingTokens_{ 357913941, 715827882, 1073741823, 1431655764,
1789569705, 2147483646, 2505397587, 2863311528,
3221225469, 3579139410, 3937053351, 4294967292 };
std::vector<std::vector<Range>> threeWorkerFourVirtualNodeWorkerRange_{ { { 4294967292, 357913941 },
{ 1073741823, 1431655764 },
{ 2147483646, 2505397587 },
{ 3221225469, 3579139410 } },
{ { 357913941, 715827882 },
{ 1431655764, 1789569705 },
{ 2505397587, 2863311528 },
{ 3579139410, 3937053351 } },
{ { 715827882, 1073741823 },
{ 1789569705, 2147483646 },
{ 2863311528, 3221225469 },
{ 3937053351, 4294967292 } } };
};
void HashRingTest::InitTestEtcdInstance()
{
std::string etcdAddress;
for (size_t i = 0; i < cluster_->GetEtcdNum(); ++i) {
std::pair<HostPort, HostPort> addrs;
cluster_->GetEtcdAddrs(i, addrs);
if (!etcdAddress.empty()) {
etcdAddress += ",";
}
etcdAddress += addrs.first.ToString();
}
FLAGS_etcd_address = etcdAddress;
LOG(INFO) << "The etcd address is:" << FLAGS_etcd_address << std::endl;
db_ = std::make_unique<EtcdStore>(etcdAddress);
if ((db_ != nullptr) && (db_->Init().IsOk())) {
db_->DropTable(ETCD_RING_PREFIX);
(void)db_->CreateTable(ETCD_RING_PREFIX, ETCD_RING_PREFIX);
}
}
void HashRingTest::PutRingToEtcd(const std::string &jsonRing)
{
HashRingPb hashRing;
auto rc = google::protobuf::util::JsonStringToMessage(jsonRing, &hashRing);
ASSERT_TRUE(rc.ok()) << rc.ToString();
ASSERT_EQ(db_->Put(ETCD_RING_PREFIX, "", hashRing.SerializeAsString()), Status::OK());
}
void HashRingTest::CheckRingInEtcd(const std::string &jsonRing, int timeoutMs)
{
HashRingPb hashRing;
auto rc = google::protobuf::util::JsonStringToMessage(jsonRing, &hashRing);
ASSERT_TRUE(rc.ok()) << rc.ToString();
Timer timer;
HashRingPb currRing;
while (timer.ElapsedMilliSecond() < timeoutMs) {
std::string ringStr;
(void)db_->Get(ETCD_RING_PREFIX, "", ringStr);
if (!currRing.ParseFromString(ringStr)) {
continue;
}
if (google::protobuf::util::MessageDifferencer::Equals(hashRing, currRing)) {
return;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
ASSERT_TRUE(false) << "The ring in etcd is not as expected in " << timeoutMs << "ms."
<< "hashring:" << currRing.DebugString();
}
void HashRingTest::InitRing(uint32_t workerNum)
{
rings_.resize(workerNum);
workerIds_.resize(workerNum);
threadPool_ = std::make_unique<ThreadPool>(workerNum);
for (uint32_t i = 0; i < workerNum; i++) {
HostPort addr;
addr.ParseString("127.0.0.1:" + std::to_string(i));
workerIds_[i] = addr.ToString();
LOG(INFO) << "Ready to init for " << workerIds_[i];
etcdStores_.emplace_back(std::make_unique<EtcdStore>(FLAGS_etcd_address));
etcdStores_.back()->Init();
etcdCMs_.emplace_back(std::make_unique<EtcdClusterManager>(addr, addr, etcdStores_.back().get(), nullptr));
rings_[i] = static_cast<TestHashRing *>(etcdCMs_.back()->GetHashRing());
futures_.emplace_back(threadPool_->Submit([this, i]() {
ClusterInfo clusterInfo;
RETURN_IF_NOT_OK(EtcdClusterManager::ConstructClusterInfoViaEtcd(etcdStores_[i].get(), clusterInfo));
RETURN_IF_NOT_OK_PRINT_ERROR_MSG(etcdCMs_[i]->Init(clusterInfo), "etcd cm init failed.");
etcdCMs_[i]->SetWorkerReady();
while (!this->rings_[i]->IsRunning() && !this->exit_.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(CHECK_INTERVAL_MS));
}
LOG(INFO) << workerIds_[i] << " finished init and is running ?" << this->rings_[i]->IsRunning();
return Status::OK();
}));
}
}
void HashRingTest::ResetRing()
{
exit_ = true;
for (auto &future : futures_) {
(void)future.get();
}
futures_.clear();
rings_.clear();
threadPool_.reset();
exit_ = false;
}
void HashRingTest::RestartRing(int workerIndex)
{
CHECK_LT(workerIndex, (int)rings_.size());
CHECK_EQ(rings_.size(), workerIds_.size());
LOG(INFO) << "Start to shutdown worker " << workerIds_[workerIndex];
rings_[workerIndex] = nullptr;
etcdCMs_[workerIndex].reset();
std::this_thread::sleep_for(std::chrono::seconds(1));
LOG(INFO) << "Start to restart worker " << workerIds_[workerIndex];
HostPort addr;
addr.ParseString("127.0.0.1:" + std::to_string(workerIndex));
etcdStores_[workerIndex].reset();
etcdStores_[workerIndex] = std::make_unique<EtcdStore>(FLAGS_etcd_address);
etcdStores_[workerIndex]->Init();
etcdCMs_.emplace(etcdCMs_.begin() + workerIndex,
std::make_unique<EtcdClusterManager>(addr, addr, etcdStores_[workerIndex].get(), nullptr));
const auto &cm = etcdCMs_[workerIndex];
ClusterInfo clusterInfo;
DS_ASSERT_OK(EtcdClusterManager::ConstructClusterInfoViaEtcd(etcdStores_[workerIndex].get(), clusterInfo));
DS_ASSERT_OK(cm->Init(clusterInfo));
cm->SetWorkerReady();
rings_[workerIndex] = static_cast<TestHashRing *>(cm->GetHashRing());
futures_.emplace_back(threadPool_->Submit([this, workerIndex]() {
while (!this->rings_[workerIndex]->IsRunning() && !this->exit_.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(CHECK_INTERVAL_MS));
}
LOG(INFO) << workerIds_[workerIndex] << " finished init and is running ?"
<< this->rings_[workerIndex]->IsRunning();
return Status::OK();
}));
}
void HashRingTest::CheckAllRunning()
{
for (auto &future : futures_) {
auto status = future.get();
EXPECT_EQ(status, Status::OK());
}
futures_.clear();
}
void HashRingTest::CheckAllRunningAndTokens(const std::vector<uint32_t> &expectTokens)
{
CheckAllRunning();
for (auto ring : rings_) {
ASSERT_TRUE(ring->IsRunning());
std::vector<uint32_t> outTokens = ring->GetHashTokens();
EXPECT_EQ(outTokens, expectTokens);
}
}
void HashRingTest::CheckRange(const std::vector<std::vector<Range>> &workerRange)
{
for (size_t i = 0; i < workerRange.size(); i++) {
for (size_t j = 0; j < workerRange[i].size(); j++) {
int start = workerRange[i][j].start;
int end = workerRange[i][j].end;
std::string &workerId = workerIds_[i];
auto &ring = rings_[i];
std::string addr;
DS_ASSERT_OK(ring->GetPrimaryWorkerAddr(start, addr));
EXPECT_EQ(addr, workerId);
DS_ASSERT_OK(ring->GetPrimaryWorkerAddr(end, addr));
EXPECT_NE(addr, workerId);
if (end > 0) {
DS_ASSERT_OK(ring->GetPrimaryWorkerAddr(end - 1, addr));
EXPECT_EQ(addr, workerId);
}
if (start > end) {
DS_ASSERT_OK(ring->GetPrimaryWorkerAddr(UINT32_MAX, addr));
EXPECT_EQ(addr, workerId);
DS_ASSERT_OK(ring->GetPrimaryWorkerAddr(0, addr));
EXPECT_EQ(addr, workerId);
}
}
}
}
TEST_F(HashRingTest, SetInitWorkerNumBeforeStart)
{
InitTestEtcdInstance();
InitRing(HASH_RING_NUM_THREE);
CheckAllRunningAndTokens(threeWorkerFourVirtualNodeRingTokens_);
CheckRange(threeWorkerFourVirtualNodeWorkerRange_);
}
TEST_F(HashRingTest, WillNotChangeWhenRestartWorker)
{
InitTestEtcdInstance();
InitRing(HASH_RING_NUM_THREE);
CheckAllRunningAndTokens(threeWorkerFourVirtualNodeRingTokens_);
std::vector<std::string> beforeRestart;
for (auto ring : rings_) {
beforeRestart.emplace_back(ring->GetLocalWorkerUuid());
}
RestartRing(0);
CheckAllRunningAndTokens(threeWorkerFourVirtualNodeRingTokens_);
for (int i = 0; i < HASH_RING_NUM_THREE; i++) {
ASSERT_EQ(rings_[i]->GetLocalWorkerUuid(), beforeRestart[i]);
}
}
TEST_F(HashRingTest, AddNodeToWorkingRing)
{
FLAGS_add_node_wait_time_s = 0;
datasystem::inject::Set("HashRing.SubmitScaleUpTask.skip", "return(1)");
InitTestEtcdInstance();
InitRing(HASH_RING_NUM_TWO);
for (auto &future : futures_) {
auto status = future.get();
ASSERT_EQ(status, Status::OK());
}
futures_.clear();
auto newWorker = "127.0.0.1:4";
TestHashRing *ring = ConstructAndGetRing(newWorker);
auto future = threadPool_->Submit([this, ring, newWorker]() {
while (!ring->IsRunning() && !this->exit_.load()) {
sleep(1);
}
LOG(INFO) << newWorker << " finished init and is running ?" << ring->IsRunning();
return Status::OK();
});
auto status = future.get();
ASSERT_EQ(status, Status::OK());
EXPECT_EQ(rings_[0]->GetHashTokens(), ring->GetHashTokens());
EXPECT_EQ(rings_[0]->GetHashTokens().size(), 12U);
}
TEST_F(HashRingTest, LEVEL2_AddNodesInBatch)
{
constexpr int waitTime = 5;
FLAGS_add_node_wait_time_s = waitTime;
datasystem::inject::Set("HashRing.SubmitScaleUpTask.skip", "return(1)");
InitTestEtcdInstance();
InitRing(HASH_RING_NUM_TWO);
for (auto &future : futures_) {
auto status = future.get();
ASSERT_EQ(status, Status::OK());
}
futures_.clear();
auto workers = { "127.0.0.1:4", "127.0.0.1:5", "127.0.0.1:6", "127.0.0.1:3" };
std::vector<std::thread> threads;
for (auto &newWorker : workers) {
auto ring = ConstructAndGetRing(newWorker);
futures_.emplace_back(threadPool_->Submit(([this, ring, newWorker]() {
while (!ring->IsRunning() && !this->exit_.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(CHECK_INTERVAL_MS));
}
LOG(INFO) << newWorker << " finished init and is running ?" << ring->IsRunning();
return Status::OK();
})));
constexpr int addInterval = 2;
sleep(addInterval);
}
for (auto &future : futures_) {
auto status = future.get();
ASSERT_EQ(status, Status::OK());
}
sleep(1);
auto expectTokens = rings_[0]->GetHashTokens();
HostPort master;
for (auto &ring : rings_) {
ASSERT_EQ(ring->GetHashTokens(), expectTokens);
DS_ASSERT_OK(ring->GetMasterAddr("any_key", master));
}
EXPECT_EQ(expectTokens.size(), rings_.size() * workers.size());
}
TEST_F(HashRingTest, InitInDifferentState)
{
InitTestEtcdInstance();
HashRingPb ringPb;
ringPb.set_cluster_id("");
ringPb.set_cluster_has_init(true);
InsertWorker(ringPb, "127.0.0.1:19562", MakeWorkerPb({ 357913941, 1431655764, 2505397587, 3579139410 }));
InsertWorker(ringPb, "127.0.0.1:25428", MakeWorkerPb({ 715827882, 1789569705, 2863311528, 3937053351 }));
InsertWorker(ringPb, "127.0.0.1:42753", MakeWorkerPb({ 1073741823, 2147483646, 3221225469, 3221225469 }));
InsertWorker(ringPb, "127.0.0.1:22746",
MakeWorkerPb({ 196852666, 554766608, 912680549, 1270594490 }, WorkerPb::JOINING));
InsertWorker(ringPb, "127.0.0.1:33333", MakeWorkerPb({}, WorkerPb::INITIAL));
ASSERT_EQ(db_->Put(ETCD_RING_PREFIX, "", ringPb.SerializeAsString()), Status::OK());
HostPort master;
auto activeRestartRing = ConstructAndGetRing("127.0.0.1:42753");
ASSERT_TRUE(activeRestartRing->IsRunning());
DS_ASSERT_OK(activeRestartRing->GetMasterAddr("redirect_test_4", master));
ASSERT_EQ(master.ToString(), "127.0.0.1:19562");
auto newRing = ConstructAndGetRing("127.0.0.1:44444");
ASSERT_FALSE(newRing->IsRunning());
ASSERT_TRUE(newRing->IsWorkable());
DS_ASSERT_OK(newRing->GetMasterAddr("redirect_test_4", master));
ASSERT_EQ(master.ToString(), "127.0.0.1:19562");
auto initRestartRing = ConstructAndGetRing("127.0.0.1:33333");
ASSERT_FALSE(initRestartRing->IsRunning());
ASSERT_TRUE(initRestartRing->IsWorkable());
DS_ASSERT_OK(initRestartRing->GetMasterAddr("redirect_test_4", master));
ASSERT_EQ(master.ToString(), "127.0.0.1:19562");
auto joiningRestartRing = ConstructAndGetRing("127.0.0.1:22746");
ASSERT_FALSE(joiningRestartRing->IsRunning());
ASSERT_TRUE(joiningRestartRing->IsWorkable());
DS_ASSERT_OK(joiningRestartRing->GetMasterAddr("redirect_test_4", master));
ASSERT_EQ(master.ToString(), "127.0.0.1:19562");
}
TEST_F(HashRingTest, LEVEL1_TestNeedRedirect)
{
constexpr int waitTime = 3;
FLAGS_add_node_wait_time_s = waitTime;
datasystem::inject::Set("HashRing.SubmitScaleUpTask.skip", "return(10)");
datasystem::inject::Set("MurmurHash3", "100*return()");
InitTestEtcdInstance();
InitRing(HASH_RING_NUM_TWO);
for (auto &future : futures_) {
auto status = future.get();
ASSERT_EQ(status, Status::OK());
}
futures_.clear();
std::string key = "a_key_hash_to_1073741824";
HostPort masterAddr;
std::string dbName;
DS_ASSERT_OK(rings_[0]->GetMasterAddr(key, masterAddr));
EXPECT_EQ(masterAddr.ToString(), "127.0.0.1:0");
EXPECT_FALSE(rings_[0]->NeedRedirect(key, masterAddr));
auto newWorker = "127.0.0.1:4";
auto ring = ConstructAndGetRing(newWorker);
auto future = threadPool_->Submit([this, ring, newWorker]() {
while (!ring->IsRunning() && !this->exit_.load()) {
sleep(1);
}
LOG(INFO) << newWorker << " finished init and is running ?" << ring->IsRunning();
return Status::OK();
});
using namespace std::chrono_literals;
future.wait_for(5s);
EXPECT_TRUE(rings_[0]->NeedRedirect(key, masterAddr));
EXPECT_EQ(masterAddr.ToString(), newWorker);
auto status = future.get();
ASSERT_EQ(status, Status::OK());
EXPECT_TRUE(rings_[0]->NeedRedirect(key, masterAddr));
EXPECT_EQ(masterAddr.ToString(), newWorker);
}
TEST_F(HashRingTest, GetPrimaryWorker)
{
InitTestEtcdInstance();
InitRing(HASH_RING_NUM_THREE);
inject::Set("HashRing.UpdateRing.sleep", "sleep(2000)");
std::this_thread::sleep_for(std::chrono::seconds(1));
std::string userDataKey{ "userDataKey" };
for (auto &future : futures_) {
auto status = future.get();
EXPECT_EQ(status, Status::OK());
}
for (auto &r : rings_) {
std::string outWorkerAddr;
r->GetPrimaryWorkerAddr(userDataKey, outWorkerAddr);
EXPECT_EQ(outWorkerAddr, "127.0.0.1:0");
}
}
TEST_F(HashRingTest, LEVEL1_10Worker)
{
InitTestEtcdInstance();
constexpr uint32_t workerNum = 10;
InitRing(workerNum);
CheckAllRunning();
}
TEST_F(HashRingTest, RestartDonotModify)
{
InitTestEtcdInstance();
HashRingPb ringPb;
ringPb.set_cluster_has_init(true);
InsertWorker(ringPb, "127.0.0.1:0", MakeWorkerPb({ 357913941, 1431655764, 2505397587, 3579139410 }));
InsertWorker(ringPb, "127.0.0.1:1", MakeWorkerPb({ 715827882, 1789569705, 2863311528, 3937053351 }));
InsertWorker(ringPb, "127.0.0.1:2", MakeWorkerPb({ 1073741823, 2147483646, 3221225469, 4294967292 }));
DS_ASSERT_OK(db_->Put(ETCD_RING_PREFIX, "", ringPb.SerializeAsString()));
RangeSearchResult oldRes;
DS_ASSERT_OK(db_->Get(ETCD_RING_PREFIX, "", oldRes));
constexpr uint32_t workerNum = 3;
InitRing(workerNum);
CheckAllRunning();
RangeSearchResult newRes;
DS_ASSERT_OK(db_->Get(ETCD_RING_PREFIX, "", newRes));
ASSERT_EQ(oldRes.modRevision, newRes.modRevision);
}
TEST_F(HashRingTest, StartRingDuringScalingDown)
{
InitTestEtcdInstance();
FLAGS_add_node_wait_time_s = 0;
std::string hashRingJsonStr = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:0": {"hashTokens": [90000, 270000], "workerUuid": "dXVpZDA=", "state": "ACTIVE"},
"127.0.0.1:1": {"hashTokens": [60000, 120000], "workerUuid": "dXVpZDE=", "state": "ACTIVE"}
},
"delNodeInfo": {
"127.0.0.1:0": {"changedRanges": [{"workerId": "not-exist-worker", "from": 120000, "end": 180000}]}
}
})";
PutRingToEtcd(hashRingJsonStr);
InitRing(HASH_RING_NUM_TWO);
sleep(1);
ASSERT_FALSE(rings_[0]->IsRunning());
hashRingJsonStr = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:1": {"hashTokens": [90000, 270000], "workerUuid": "dXVpZDA=", "state": "ACTIVE"}
}
})";
PutRingToEtcd(hashRingJsonStr);
CheckAllRunning();
ASSERT_TRUE(Validator::ValidateUuid("WorkerUuid", rings_[0]->GetLocalWorkerUuid()));
}
TEST_F(HashRingTest, StartScaleTasks)
{
FLAGS_v = 2;
datasystem::inject::Set("ClearDataDelay", "1*sleep(3000)");
datasystem::inject::Set("notExcuteClearData", "call()");
datasystem::inject::Set("notExcuteClearData.skip", "return()");
InitTestEtcdInstance();
FLAGS_add_node_wait_time_s = 0;
std::string hashRingJsonStr = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:0": {"hashTokens": [90000, 270000], "workerUuid": "dXVpZDA=", "state": "ACTIVE"},
"127.0.0.1:1": {"hashTokens": [60000, 120000], "workerUuid": "dXVpZDE=", "state": "ACTIVE"},
"127.0.0.1:2": {"hashTokens": [0, 180000], "workerUuid": "cXdl", "state": "ACTIVE"},
"127.0.0.1:3": {"hashTokens": [230000, 350000], "workerUuid": "Cnd3cWU=", "state": "ACTIVE"},
"127.0.0.1:4": {"hashTokens": [320000, 380000], "workerUuid": "cXdlcm0=", "state": "ACTIVE"}
},
})";
PutRingToEtcd(hashRingJsonStr);
std::string hashRingJsonStr2 = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:0": {"hashTokens": [90000, 270000], "workerUuid": "dXVpZDA=", "state": "ACTIVE"},
"127.0.0.1:1": {"hashTokens": [60000, 120000], "workerUuid": "dXVpZDE=", "state": "ACTIVE"},
"127.0.0.1:2": {"hashTokens": [0, 180000], "workerUuid": "cXdl", "state": "ACTIVE"},
"127.0.0.1:3": {"hashTokens": [230000, 350000], "workerUuid": "Cnd3cWU=", "state": "ACTIVE"},
"127.0.0.1:4": {"hashTokens": [320000, 380000], "workerUuid": "cXdlcm0=", "state": "ACTIVE"}
},
"delNodeInfo": {
"127.0.0.1:4": {"changedRanges": [{"workerId": "127.0.0.1:0", "from": 20000, "end": 30000}]}
}
})";
std::string hashRingJsonStr3 = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:0": {"hashTokens": [90000, 270000], "workerUuid": "dXVpZDA=", "state": "ACTIVE"},
"127.0.0.1:1": {"hashTokens": [60000, 120000], "workerUuid": "dXVpZDE=", "state": "ACTIVE"},
"127.0.0.1:2": {"hashTokens": [0, 180000], "workerUuid": "cXdl", "state": "ACTIVE"},
"127.0.0.1:3": {"hashTokens": [230000, 350000], "workerUuid": "Cnd3cWU=", "state": "ACTIVE"},
"127.0.0.1:4": {"hashTokens": [320000, 380000], "workerUuid": "cXdlcm0=", "state": "ACTIVE"}
},
"delNodeInfo": {
"127.0.0.1:4": {"changedRanges": [{"workerId": "127.0.0.1:0", "from": 20000, "end": 30000}]},
"127.0.0.1:3": {"changedRanges": [{"workerId": "127.0.0.1:0", "from": 20000, "end": 50000}]}
}
})";
std::vector<std::string> hashRings = { hashRingJsonStr2, hashRingJsonStr3 };
InitRing(1);
CheckAllRunning();
for (const auto &ring : hashRings) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
PutRingToEtcd(ring);
}
Timer timer;
bool sucesss = false;
while (timer.ElapsedSecond() < 5) {
auto workers = rings_[0]->GetWorkersInDelNodeInfo();
if (workers.find("127.0.0.1:4") == workers.end() && workers.find("127.0.0.1:3") == workers.end()) {
sucesss = true;
break;
}
}
ASSERT_TRUE(sucesss);
}
TEST_F(HashRingTest, StartScaleTaskNotExcuteMultipleTimes)
{
FLAGS_v = 2;
datasystem::inject::Set("ClearDataDelay", "1*sleep(3000)");
datasystem::inject::Set("notExcuteClearData", "call()");
datasystem::inject::Set("notExcuteClearData.skip", "1*return()");
InitTestEtcdInstance();
FLAGS_add_node_wait_time_s = 0;
std::string hashRingJsonStr = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:0": {"hashTokens": [90000, 270000], "workerUuid": "dXVpZDA=", "state": "ACTIVE"},
"127.0.0.1:1": {"hashTokens": [60000, 120000], "workerUuid": "dXVpZDE=", "state": "ACTIVE"},
"127.0.0.1:2": {"hashTokens": [0, 180000], "workerUuid": "cXdl", "state": "ACTIVE"},
"127.0.0.1:3": {"hashTokens": [230000, 350000], "workerUuid": "Cnd3cWU=", "state": "ACTIVE"},
"127.0.0.1:4": {"hashTokens": [320000, 380000], "workerUuid": "cXdlcm0=", "state": "ACTIVE"}
},
})";
PutRingToEtcd(hashRingJsonStr);
std::string hashRingJsonStr1 = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:0": {"hashTokens": [90000, 270000], "workerUuid": "dXVpZDA=", "state": "ACTIVE"},
"127.0.0.1:1": {"hashTokens": [60000, 120000], "workerUuid": "dXVpZDE=", "state": "ACTIVE"},
"127.0.0.1:2": {"hashTokens": [0, 180000], "workerUuid": "cXdl", "state": "ACTIVE"},
"127.0.0.1:3": {"hashTokens": [230000, 350000], "workerUuid": "Cnd3cWU=", "state": "ACTIVE"},
"127.0.0.1:4": {"hashTokens": [320000, 380000], "workerUuid": "cXdlcm0=", "state": "ACTIVE"}
},
"delNodeInfo": {
"127.0.0.1:4": {"changedRanges": [{"workerId": "127.0.0.1:0", "from": 20000, "end": 30000}]}
}
})";
std::string hashRingJsonStr2 = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:0": {"hashTokens": [90000, 270000], "workerUuid": "dXVpZDA=", "state": "ACTIVE"},
"127.0.0.1:1": {"hashTokens": [60000, 120000], "workerUuid": "dXVpZDE=", "state": "ACTIVE"},
"127.0.0.1:2": {"hashTokens": [0, 180000], "workerUuid": "cXdl", "state": "ACTIVE"},
"127.0.0.1:3": {"hashTokens": [230000, 350000], "workerUuid": "Cnd3cWU=", "state": "ACTIVE"},
"127.0.0.1:4": {"hashTokens": [320000, 380000], "workerUuid": "cXdlcm0=", "state": "ACTIVE"}
},
"delNodeInfo": {
"127.0.0.1:4": {"changedRanges": [{"workerId": "127.0.0.1:0", "from": 20000, "end": 30000}]},
"127.0.0.1:6": {"changedRanges": [{"workerId": "127.0.0.1:1", "from": 20000, "end": 30000}]}
}
})";
std::string hashRingJsonStr3 = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:0": {"hashTokens": [90000, 270000], "workerUuid": "dXVpZDA=", "state": "ACTIVE"},
"127.0.0.1:1": {"hashTokens": [60000, 120000], "workerUuid": "dXVpZDE=", "state": "ACTIVE"},
"127.0.0.1:2": {"hashTokens": [0, 180000], "workerUuid": "cXdl", "state": "ACTIVE"},
"127.0.0.1:3": {"hashTokens": [230000, 350000], "workerUuid": "Cnd3cWU=", "state": "ACTIVE"},
"127.0.0.1:4": {"hashTokens": [320000, 380000], "workerUuid": "cXdlcm0=", "state": "ACTIVE"}
},
"delNodeInfo": {
"127.0.0.1:4": {"changedRanges": [{"workerId": "127.0.0.1:0", "from": 20000, "end": 30000}]},
"127.0.0.1:7": {"changedRanges": [{"workerId": "127.0.0.1:1", "from": 20000, "end": 30000}]}
}
})";
std::string hashRingJsonStr4 = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:0": {"hashTokens": [90000, 270000], "workerUuid": "dXVpZDA=", "state": "ACTIVE"},
"127.0.0.1:1": {"hashTokens": [60000, 120000], "workerUuid": "dXVpZDE=", "state": "ACTIVE"},
"127.0.0.1:2": {"hashTokens": [0, 180000], "workerUuid": "cXdl", "state": "ACTIVE"},
"127.0.0.1:3": {"hashTokens": [230000, 350000], "workerUuid": "Cnd3cWU=", "state": "ACTIVE"},
"127.0.0.1:4": {"hashTokens": [320000, 380000], "workerUuid": "cXdlcm0=", "state": "ACTIVE"}
},
"delNodeInfo": {
"127.0.0.1:4": {"changedRanges": [{"workerId": "127.0.0.1:0", "from": 20000, "end": 30000}]}
}
})";
std::vector<std::string> hashRings = { hashRingJsonStr1, hashRingJsonStr2, hashRingJsonStr3, hashRingJsonStr4 };
InitRing(1);
CheckAllRunning();
for (const auto &ring : hashRings) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
PutRingToEtcd(ring);
}
Timer timer;
bool sucesss = false;
while (timer.ElapsedSecond() < 5) {
auto workers = rings_[0]->GetWorkersInDelNodeInfo();
if (workers.find("127.0.0.1:4") == workers.end()) {
sucesss = true;
break;
}
}
ASSERT_TRUE(sucesss);
}
TEST_F(HashRingTest, RemoveInitNode)
{
datasystem::inject::Set("EtcdClusterManager.CheckWaitNodeTableComplete.waitTime", "call(2)");
InitTestEtcdInstance();
FLAGS_add_node_wait_time_s = 0;
FLAGS_node_timeout_s = 2;
FLAGS_node_dead_timeout_s = 3;
FLAGS_auto_del_dead_node = true;
std::string hashRingJsonStr = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:0": {"hashTokens": [90000, 270000], "workerUuid": "dXVpZDA=", "state": "ACTIVE"},
"127.0.0.1:1": {"workerUuid": "dXVpZDE=", "state": "INITIAL"}
},
})";
PutRingToEtcd(hashRingJsonStr);
InitRing(1);
CheckAllRunning();
etcdCMs_[0]->CheckWaitNodeTableComplete();
std::string expectRing = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:0": {"hashTokens": [90000, 270000], "workerUuid": "dXVpZDA=", "state": "ACTIVE"}
}
})";
CheckRingInEtcd(expectRing, 5'000);
}
TEST_F(HashRingTest, EtcdWatchGet)
{
TimerQueue::GetInstance()->Initialize();
datasystem::inject::Set("EtcdClusterManager.CheckWaitNodeTableComplete.waitTime", "call(2)");
datasystem::inject::Set("EtcdWatch.RetrieveEventPassively.RetrieveEventQuickly", "call(100)");
InitTestEtcdInstance();
FLAGS_add_node_wait_time_s = 0;
std::string hashRingJsonStr = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:0": {"hashTokens": [90000, 270000], "workerUuid": "dXVpZDA=", "state": "ACTIVE"},
"127.0.0.1:1": {"hashTokens": [120000, 130000], "workerUuid": "dXVpZDE=", "state": "JOINING"},
},
"addNodeInfo": {
"127.0.0.1:1": {"changedRanges": [{"workerId": "127.0.0.1:0", "from": 20000, "end": 25000},
{"workerId": "127.0.0.1:0", "from": 25000, "end": 30000}]}
}
})";
std::string hashRingJsonStr1 = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:0": {"hashTokens": [90000, 270000], "workerUuid": "dXVpZDA=", "state": "ACTIVE",
"needScaleDown": true},
"127.0.0.1:1": {"hashTokens": [120000, 130000], "workerUuid": "dXVpZDE=", "state": "ACTIVE"}
}
})";
std::string hashRingJsonStr2 = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:0": {"hashTokens": [90000, 270000], "workerUuid": "dXVpZDA=", "state": "LEAVING",
"needScaleDown": true},
"127.0.0.1:1": {"hashTokens": [120000, 130000], "workerUuid": "dXVpZDE=", "state": "ACTIVE"}
},
"addNodeInfo": {
"127.0.0.1:1": {"changedRanges": [{"workerId": "127.0.0.1:0", "from": 90000, "end": 270000}]}
}
})";
PutRingToEtcd(hashRingJsonStr);
InitRing(1);
CheckAllRunning();
etcdCMs_[0]->CheckWaitNodeTableComplete();
datasystem::inject::Set("EtcdWatch.StoreEvents.IgnoreEvent", "sleep(2000)");
PutRingToEtcd(hashRingJsonStr1);
PutRingToEtcd(hashRingJsonStr2);
std::string expectRing = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:0": {"hashTokens": [90000], "workerUuid": "dXVpZDA=", "state": "LEAVING", "needScaleDown": true},
"127.0.0.1:1": {"hashTokens": [120000, 130000], "workerUuid": "dXVpZDE=", "state": "ACTIVE"}
}
})";
CheckRingInEtcd(expectRing, 8'000);
}
TEST_F(HashRingTest, CasRetry)
{
InitTestEtcdInstance();
datasystem::inject::Set("EtcdClusterManager.CheckWaitNodeTableComplete.waitTime", "call(2)");
datasystem::inject::Set("hashring.finishaddnodeinfo", "sleep(3000)");
datasystem::inject::Set("waitUntilHealth", "return()");
std::string hashRingJsonStr = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:0": {"hashTokens": [90000, 270000], "workerUuid": "dXVpZDA", "state": "ACTIVE",
"needScaleDown": true},
"127.0.0.1:1": {"hashTokens": [120000, 130000], "workerUuid": "dXVpZDE=", "state": "ACTIVE"}
}
})";
std::string hashRingJsonStr1 = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:0": {"hashTokens": [90000, 270000], "workerUuid": "dXVpZDA", "state": "ACTIVE",
"needScaleDown": true},
"127.0.0.1:1": {"hashTokens": [120000, 130000], "workerUuid": "dXVpZDE=", "state": "ACTIVE"}
},
"addNodeInfo": {
"127.0.0.1:1": {"changedRanges": [{"workerId": "127.0.0.1:0", "from": 90000, "end": 270000},
{"workerId": "127.0.0.1:0", "from": 0, "end": 90000}]}
}
})";
PutRingToEtcd(hashRingJsonStr);
InitRing(2);
CheckAllRunning();
etcdCMs_[0]->CheckWaitNodeTableComplete();
PutRingToEtcd(hashRingJsonStr1);
sleep(1);
datasystem::inject::Set("SendRpc.Failed.isKeepAliveTimeoutHandler", "return(K_RETRY_IF_LEAVING)");
datasystem::inject::Set("notExit", "return()");
std::string expectRing = R"({
"clusterHasInit": true,
"workers": {
"127.0.0.1:1": {"hashTokens": [120000, 130000], "workerUuid": "dXVpZDE=", "state": "ACTIVE"}
}
})";
sleep(5);
datasystem::inject::Clear("SendRpc.Failed.isKeepAliveTimeoutHandler");
Timer timer;
HashRingPb ring;
bool success = false;
while (timer.ElapsedMilliSecond() < 10000) {
std::string ringStr;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
(void)db_->Get(ETCD_RING_PREFIX, "", ringStr);
if (!ring.ParseFromString(ringStr)) {
continue;
}
for (auto &worker : ring.workers()) {
if (worker.second.state() != WorkerPb::ACTIVE) {
continue;
}
}
if (!ring.add_node_info().empty() || !ring.del_node_info().empty()) {
continue;
}
success = true;
}
ASSERT_TRUE(success) << ring.DebugString();
}
TEST_F(HashRingTest, HashRingToJsonFile)
{
InitTestEtcdInstance();
HashRingPb ring;
WorkerPb workerPb;
workerPb.set_worker_uuid("5d941f6a-dd77-43ee-b3cf-40a1cf77489e");
workerPb.set_state(WorkerPb::ACTIVE);
workerPb.set_need_scale_down(true);
std::vector<uint32_t> tokens = { 703003027, 1320750813, 2986027838, 1537794891 };
for (auto token : tokens) {
workerPb.mutable_hash_tokens()->Add(token);
}
ring.mutable_workers()->insert({ "127.0.0.1:9999", workerPb });
ring.set_cluster_has_init(true);
DS_ASSERT_OK(db_->Put(ETCD_RING_PREFIX, "", ring.SerializeAsString()));
auto ringPath = GetTestCaseDataDir() + "/ring.json";
DS_ASSERT_OK(cli::SaveHashRingToFile(ringPath));
std::string jsonStr;
DS_ASSERT_OK(ReadFileToString(ringPath, jsonStr));
ASSERT_TRUE(jsonStr.find("127.0.0.1:9999") != std::string::npos);
}
TEST_F(HashRingTest, LoadHashRingFromJsonFile)
{
InitTestEtcdInstance();
auto ringPath = GetTestCaseDataDir() + "/ring.json";
std::string jsonStr = R"(
{
"clusterId": "127.0.0.1:32044",
"workers": {
"127.0.0.1:47805": {
"hashTokens": [715827882, 1789569705, 2863311528, 3937053351],
"workerUuid": "OTJlYjE0NzctZDA3Yi00Y2NjLTk5MDEtZTVmYjg1ZjEzZjlh",
"state": "ACTIVE"
},
"127.0.0.1:53257": {
"hashTokens": [1073741823, 2147483646, 3221225469, 4294967292],
"workerUuid": "YWRjNzQyNGEtNGM1MC00ZmVhLTk5YTYtYzFkNDlkZDAxMmJi",
"state": "ACTIVE"
},
"127.0.0.1:14964": {
"hashTokens": [357913941, 1431655764, 2505397587, 3579139410],
"workerUuid": "ZTBmYWRlMDItMDIyOS00YzU3LWIwMmMtYzdhMjRhN2MwZDVm",
"state": "LEAVING",
"needScaleDown": true
}
},
"clusterHasInit": true,
"addNodeInfo": {
"127.0.0.1:53257": {
"changedRanges": [
{
"workerId": "127.0.0.1:14964",
"from": 715827882,
"end": 1073741823
},
{
"workerId": "127.0.0.1:14964",
"from": 1789569705,
"end": 2147483646
},
{
"workerId": "127.0.0.1:14964",
"from": 2863311528,
"end": 3221225469
},
{
"workerId": "127.0.0.1:14964",
"from": 3937053351,
"end": 4294967292
}
]
}
}
}
)";
std::ofstream outFile(ringPath);
ASSERT_TRUE(outFile.is_open());
outFile << jsonStr;
outFile.close();
DS_ASSERT_OK(cli::UpdateHashRingFromFile(ringPath));
HashRingPb ring;
std::string ringRaw;
DS_ASSERT_OK(db_->Get(ETCD_RING_PREFIX, "", ringRaw));
ASSERT_TRUE(ring.ParseFromString(ringRaw));
const size_t workerCount = 3;
ASSERT_EQ(ring.workers_size(), workerCount);
ASSERT_EQ(ring.add_node_info_size(), 1);
}
}
}