* Copyright (c) Huawei Technologies Co., Ltd. 2026. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "traefik_registry.h"
#include <sstream>
#include <regex>
#include "common/logs/logging.h"
namespace functionsystem::local_scheduler {
TraefikRegistry::TraefikRegistry(std::shared_ptr<MetaStorageAccessor> accessor,
const std::string& keyPrefix,
const std::string& httpEntryPoint,
bool enableTLS,
const std::string& serversTransport)
: accessor_(std::move(accessor)),
keyPrefix_(keyPrefix),
httpEntryPoint_(httpEntryPoint),
enableTLS_(enableTLS),
serversTransport_(serversTransport)
{
if (serversTransport_.empty()) {
YRLOG_WARN("TraefikRegistry: serversTransport is empty, 'https' protocol ports will not use backend TLS");
} else {
static const std::regex transportPattern(R"(^[^@]+@[^@]+$)");
if (!std::regex_match(serversTransport_, transportPattern)) {
YRLOG_ERROR("TraefikRegistry: invalid serversTransport format '{}', "
"expected 'name@provider' (e.g., 'yr-backend-tls@file')",
serversTransport_);
throw std::invalid_argument(
"serversTransport must be in format 'name@provider' (e.g., 'yr-backend-tls@file')");
}
YRLOG_INFO("TraefikRegistry: 'https' protocol ports will use backend TLS with ServersTransport: {}",
serversTransport_);
}
YRLOG_INFO("TraefikRegistry initialized: keyPrefix={}, httpEntryPoint={}, enableTLS={}, serversTransport={}",
keyPrefix_, httpEntryPoint_, enableTLS_, serversTransport_);
if (accessor_) {
accessor_->Put(keyPrefix_ + "/http/middlewares/stripprefix-all/stripPrefixRegex/regex",
"^/[^/]+/[0-9]+")
.Then([](const Status& status) -> Status {
if (!status.IsOk()) {
YRLOG_WARN("Failed to create global StripPrefix middleware: {}", status.GetMessage());
} else {
YRLOG_INFO("Global StripPrefix middleware created successfully");
}
return Status::OK();
});
}
}
litebus::Future<Status> TraefikRegistry::RegisterInstance(
const std::string& instanceID,
const std::string& hostIP,
const std::vector<PortMapping>& portMappings)
{
if (!accessor_) {
YRLOG_ERROR("TraefikRegistry: accessor is null, skip registration for instanceID={}", instanceID);
return litebus::Future<Status>(Status(StatusCode::ERR_INNER_SYSTEM_ERROR, "meta storage accessor is null"));
}
if (portMappings.empty()) {
YRLOG_INFO("No port mappings for instance {}, skip Traefik registration", instanceID);
return Status::OK();
}
std::string safeID = SanitizeID(instanceID);
std::vector<std::pair<std::string, std::string>> kvs;
for (const auto& mapping : portMappings) {
std::string routerName = safeID + "-p" + std::to_string(mapping.sandboxPort);
std::string prefixPath = "/" + safeID + "/" + std::to_string(mapping.sandboxPort);
std::string protocolLower = mapping.protocol;
std::transform(protocolLower.begin(), protocolLower.end(), protocolLower.begin(), ::tolower);
bool useHttpsForBackend = (protocolLower == "https");
std::string scheme = useHttpsForBackend ? "https" : "http";
YRLOG_DEBUG("TraefikRegistry: instance {} port {} protocol '{}' -> backend {}",
instanceID, mapping.sandboxPort, mapping.protocol, scheme);
std::string ruleValue = "PathPrefix(`" + prefixPath + "`)";
kvs.push_back({keyPrefix_ + "/http/routers/" + routerName + "/rule", ruleValue});
kvs.push_back({keyPrefix_ + "/http/routers/" + routerName + "/service", routerName});
kvs.push_back({keyPrefix_ + "/http/routers/" + routerName + "/middlewares/0", "stripprefix-all"});
kvs.push_back({keyPrefix_ + "/http/routers/" + routerName + "/entryPoints/0", httpEntryPoint_});
if (enableTLS_) {
kvs.push_back({keyPrefix_ + "/http/routers/" + routerName + "/tls", ""});
}
std::ostringstream urlStream;
urlStream << scheme << "://" << hostIP << ":" << mapping.hostPort;
kvs.push_back({keyPrefix_ + "/http/services/" + routerName + "/loadbalancer/servers/0/url",
urlStream.str()});
if (useHttpsForBackend) {
if (!serversTransport_.empty()) {
kvs.push_back({keyPrefix_ + "/http/services/" + routerName + "/loadbalancer/serverstransport",
serversTransport_});
YRLOG_DEBUG("TraefikRegistry: port {} using HTTPS with serversTransport={}",
mapping.sandboxPort, serversTransport_);
} else {
YRLOG_WARN("TraefikRegistry: port {} has 'https' protocol but serversTransport is empty, skipping TLS",
mapping.sandboxPort);
}
}
}
YRLOG_INFO("Registering instance {} to Traefik HTTP: {} ports, {} keys",
instanceID, portMappings.size(), kvs.size());
return accessor_->Txn(kvs)
.Then([instanceID, portMappingsCount = portMappings.size()](const Status& status) -> Status {
if (!status.IsOk()) {
YRLOG_ERROR("Failed to register instance {} to Traefik: {}", instanceID, status.GetMessage());
return status;
}
YRLOG_INFO("Successfully registered instance {} to Traefik with {} ports",
instanceID, portMappingsCount);
return Status::OK();
});
}
litebus::Future<Status> TraefikRegistry::UnregisterInstance(const std::string& instanceID)
{
if (!accessor_) {
YRLOG_WARN("TraefikRegistry: accessor is null, skip unregistration for instanceID={}", instanceID);
return litebus::Future<Status>(Status::OK());
}
YRLOG_INFO("Unregistering instance {} from Traefik HTTP", instanceID);
std::string safeID = SanitizeID(instanceID);
std::string routerPrefix = keyPrefix_ + "/http/routers/" + safeID;
std::string servicePrefix = keyPrefix_ + "/http/services/" + safeID;
return accessor_->Delete(routerPrefix, true)
.Then([this, servicePrefix](const Status& status) {
if (!status.IsOk()) {
YRLOG_WARN("Failed to delete Traefik HTTP routers: {}", status.GetMessage());
}
return accessor_->Delete(servicePrefix, true);
})
.Then([instanceID](const Status& status) -> Status {
if (!status.IsOk()) {
YRLOG_WARN("Failed to unregister instance {} from Traefik HTTP: {}", instanceID, status.GetMessage());
} else {
YRLOG_INFO("Successfully unregistered instance {} from Traefik HTTP", instanceID);
}
return Status::OK();
});
}
std::string TraefikRegistry::SanitizeID(const std::string& id)
{
constexpr size_t atReplacementLen = 4;
constexpr size_t maxRouterNameLen = 200;
std::string result = id;
size_t pos = 0;
while ((pos = result.find('@', pos)) != std::string::npos) {
result.replace(pos, 1, "-at-");
pos += atReplacementLen;
}
std::replace(result.begin(), result.end(), '/', '-');
std::replace(result.begin(), result.end(), '.', '-');
std::replace(result.begin(), result.end(), '_', '-');
if (result.length() > maxRouterNameLen) {
result = result.substr(0, maxRouterNameLen);
}
return result;
}
}