* Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "datasystem/worker/hash_ring/hash_ring_tools.h"
#include <google/protobuf/util/json_util.h>
#include "datasystem/common/util/container_util.h"
namespace datasystem {
namespace worker {
void GenerateHashRingUuidMap(const HashRingPb &ringInfo, std::map<std::string, HostPort> &workerUuid2AddrMap,
std::map<std::string, std::string> &workerAddr2UuidMap,
std::map<std::string, HostPort> &relatedWorkerMap)
{
workerUuid2AddrMap.clear();
workerAddr2UuidMap.clear();
Status rc;
for (const auto &kv : ringInfo.workers()) {
const auto &workerAddr = kv.first;
HostPort workerHostPort;
rc = workerHostPort.ParseString(workerAddr);
if (rc.IsError()) {
LOG(ERROR) << "Failed to parse address " << workerAddr;
continue;
}
if (kv.second.worker_uuid().empty()) {
continue;
}
workerUuid2AddrMap[kv.second.worker_uuid()] = workerHostPort;
relatedWorkerMap[kv.second.worker_uuid()] = workerHostPort;
workerAddr2UuidMap[workerAddr] = kv.second.worker_uuid();
}
}
Status GetWorkerAddrByUuidForAddressing(const std::map<std::string, HostPort> &workerUuid2AddrMap,
const std::string &workerUuid, HostPort &workerAddr)
{
auto it = workerUuid2AddrMap.find(workerUuid);
if (it != workerUuid2AddrMap.end()) {
workerAddr = it->second;
return Status::OK();
}
return Status(K_NOT_FOUND, FormatString("Can not find the address of workerUuid %s", workerUuid));
}
Status GetWorkerAddrByUuidForMetadata(const std::map<std::string, HostPort> &workerUuid2AddrMap,
const std::string &workerUuid, HostPort &workerAddr)
{
auto it = workerUuid2AddrMap.find(workerUuid);
if (it != workerUuid2AddrMap.end()) {
workerAddr = it->second;
return Status::OK();
}
return Status(K_NOT_FOUND, FormatString("Can not find the address of workerUuid %s", workerUuid));
}
bool IncrementalAddNodeInfo(const HashRingPb &oldRing, const HashRingPb &newRing)
{
if (newRing.add_node_info().empty()) {
return false;
}
if (oldRing.add_node_info().empty()) {
return true;
}
int oldAddRangesSize{ 0 };
for (auto &r : oldRing.add_node_info()) {
oldAddRangesSize += r.second.changed_ranges_size();
};
int newAddRangesSize{ 0 };
for (auto &r : newRing.add_node_info()) {
newAddRangesSize += r.second.changed_ranges_size();
};
return (newAddRangesSize > oldAddRangesSize);
}
bool IncrementalDelNodeInfo(const HashRingPb &oldRing, const HashRingPb &newRing)
{
if (newRing.del_node_info().empty()) {
return false;
}
if (oldRing.del_node_info().empty()) {
return true;
}
auto delNodesInNewRing = GetKeysFromPairsContainer(newRing.del_node_info());
auto delNodesInOldRing = GetKeysFromPairsContainer(oldRing.del_node_info());
std::set<std::string> diff;
std::set_difference(delNodesInNewRing.begin(), delNodesInNewRing.end(), delNodesInOldRing.begin(),
delNodesInOldRing.end(), std::inserter(diff, diff.begin()));
return !diff.empty();
}
bool DecrementalDelNodeInfo(const HashRingPb &oldRing, const HashRingPb &newRing, std::set<std::string> &diff)
{
if (oldRing.del_node_info().empty()) {
return false;
}
auto delNodesInNewRing = GetKeysFromPairsContainer(newRing.del_node_info());
auto delNodesInOldRing = GetKeysFromPairsContainer(oldRing.del_node_info());
std::set_difference(delNodesInOldRing.begin(), delNodesInOldRing.end(), delNodesInNewRing.begin(),
delNodesInNewRing.end(), std::inserter(diff, diff.begin()));
return !diff.empty();
}
Status GetWorkeridByWorkerAddr(const HashRingPb &currRing, const std::string &addr, std::string &workerId)
{
auto worker = currRing.workers().find(addr);
CHECK_FAIL_RETURN_STATUS(worker != currRing.workers().end() && !worker->second.worker_uuid().empty(), K_NOT_FOUND,
FormatString("Cannot find the workerid of %s in this ring %s.", addr,
currRing.ShortDebugString()));
workerId = worker->second.worker_uuid();
return Status::OK();
}
std::string HashRingToJsonString(const HashRingPb &ring)
{
std::string jsonStr;
auto rc = google::protobuf::util::MessageToJsonString(ring, &jsonStr);
if (!rc.ok()) {
LOG(WARNING) << "Pb to Json string failed:" << rc.ToString();
}
return jsonStr;
}
std::vector<std::string> SplitRingJson(const std::string &prefix, const HashRingPb &ring)
{
std::vector<std::string> lines;
std::string log = HashRingToJsonString(ring);
if (log.size() <= LOG_MAX_SIZE_LIMIT) {
lines.emplace_back(FormatString("%s [%s]", prefix, log));
} else {
auto i = 0u;
for (; i < log.size() / LOG_MAX_SIZE_LIMIT; i++) {
lines.emplace_back(
FormatString("%s part%d [%s]", prefix, i, log.substr(i * LOG_MAX_SIZE_LIMIT, LOG_MAX_SIZE_LIMIT)));
}
if (log.size() % LOG_MAX_SIZE_LIMIT != 0) {
lines.emplace_back(FormatString("%s part%d [%s]", prefix, i, log.substr(i * LOG_MAX_SIZE_LIMIT)));
}
}
return lines;
}
std::unordered_map<std::string, std::string> GetWorkersFromHashRingPb(const HashRingPb &hashRing)
{
std::unordered_map<std::string, std::string> workerInfos;
for (const auto &worker : hashRing.workers()) {
std::string workerUuid = worker.second.worker_uuid();
workerInfos.emplace(worker.first, std::move(workerUuid));
}
return workerInfos;
}
}
}