* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "working_dir_deployer.h"
#include <fstream>
#include <iostream>
#include "async/uuid_generator.hpp"
#include "common/logs/logging.h"
#include "common/metadata/metadata.h"
#include "common/utils/exec_utils.h"
#include "common/utils/hash_util.h"
#include "common/kv_client/kv_client.h"
#include "function_agent/flags/function_agent_flags.h"
#include "utils/os_utils.hpp"
namespace functionsystem::function_agent {
const std::string FILE_SCHEME = "file://";
const std::string PATH_SCHEME = "path://";
const std::string FTP_SCHEME = "ftp://";
const std::string DS_SCHEME = "ds://";
const std::string APP_FOLDER_PREFIX = "app";
const std::string WORKING_DIR_FOLDER_PREFIX = "working_dir";
std::string StripScheme(const std::string &uri, const std::string &scheme)
{
return uri.compare(0, scheme.length(), scheme) == 0 ? uri.substr(scheme.length()) : uri;
}
bool EndsWith(const std::string &str, const std::string &suffix)
{
if (suffix.size() > str.size()) {
return false;
}
return str.substr(str.size() - suffix.size()) == suffix;
}
std::string GetDirectoryPath(const std::string &path)
{
if (path.empty()) {
return ".";
}
auto pos = path.find_last_of("/");
if (pos == std::string::npos) {
return ".";
}
if (pos == 0) {
return "/";
}
return path.substr(0, pos);
}
class ResourceAccessor {
public:
virtual std::pair<Status, std::string> GetResource(std::string dst) = 0;
virtual std::string GetHash() = 0;
virtual std::string GetWorkingDir(std::string dst) = 0;
virtual ~ResourceAccessor()
{
}
};
class FileResourceAccessor : public ResourceAccessor {
public:
explicit FileResourceAccessor(const std::string &uri) : filePath_(uri)
{
}
std::pair<Status, std::string> GetResource(std::string dst) override
{
std::string realFilePath = StripScheme(filePath_, FILE_SCHEME);
if (EndsWith(realFilePath, ".img")) {
return std::make_pair(Status::OK(), realFilePath);
} else if (EndsWith(realFilePath, ".zip")) {
Status unzipStatus = UnzipFile(dst, realFilePath);
return std::make_pair(unzipStatus, dst);
} else {
return std::make_pair(
Status(StatusCode::FUNC_AGENT_INVALID_WORKING_DIR_FILE, "format not support " + realFilePath), "");
}
}
std::string GetWorkingDir(std::string dst) override
{
return dst;
}
std::string GetHash() override
{
std::string realFilePath = StripScheme(filePath_, FILE_SCHEME);
return CalculateFileMD5(realFilePath);
}
private:
std::string filePath_;
};
class PathResourceAccessor : public ResourceAccessor {
public:
explicit PathResourceAccessor(const std::string &uri) : filePath_(uri)
{
}
~PathResourceAccessor() override = default;
std::pair<Status, std::string> GetResource(std::string dst) override
{
(void)dst;
return std::make_pair(Status::OK(), GetRealFilePath());
}
std::string GetWorkingDir(std::string dst) override
{
(void)dst;
return GetRealFilePath();
}
std::string GetHash() override
{
return CalculateFileMD5(GetRealFilePath());
}
private:
std::string GetRealFilePath() const
{
return StripScheme(filePath_, PATH_SCHEME);
}
std::string filePath_;
};
class DSAccessor : public ResourceAccessor {
public:
explicit DSAccessor(const std::string &uri) : dsKey_(uri)
{
}
~DSAccessor() override = default;
std::pair<Status, std::string> GetResource(std::string dst) override
{
auto filename = dsKey_.substr(DS_SCHEME.length());
auto splits = litebus::strings::Split(filename, ".");
auto [s, buffer] = KVClient::GetInstance().Get(splits[0]);
if (!s.OK()) {
YRLOG_WARN("failed to get dsKey {}, err: {}", filename, s.ToString());
return std::make_pair(s, "");
}
if (buffer.GetSize() == 0) {
YRLOG_WARN("{} buffer size is 0", filename);
return std::make_pair(
Status(StatusCode::FUNC_AGENT_INVALID_WORKING_DIR_FILE, "invalid package size with " + dsKey_), "");
}
auto destinationPath = dst;
if (EndsWith(dst, ".img")) {
destinationPath = GetDirectoryPath(dst);
}
std::string fullpath = litebus::os::Join(destinationPath, filename);
std::ofstream file(fullpath, std::ios::out | std::ios::binary);
if (!file.is_open()) {
YRLOG_WARN("failed to open {}", filename);
return std::make_pair(
Status(StatusCode::FUNC_AGENT_INVALID_WORKING_DIR_FILE, "Failed to open file: " + fullpath), "");
}
file.write(static_cast<const char *>(buffer.ImmutableData()), buffer.GetSize());
if (file.fail()) {
YRLOG_WARN("failed to write {}", filename);
return std::make_pair(
Status(StatusCode::FUNC_AGENT_INVALID_WORKING_DIR_FILE, "Failed to write file: " + fullpath), "");
}
file.close();
if (EndsWith(dsKey_, ".img")) {
return std::make_pair(Status::OK(), fullpath);
} else if (EndsWith(dsKey_, ".zip")) {
Status unzipStatus = UnzipFile(dst, fullpath);
return std::make_pair(unzipStatus, dst);
} else {
YRLOG_WARN("format not support {}", dsKey_);
return std::make_pair(
Status(StatusCode::FUNC_AGENT_INVALID_WORKING_DIR_FILE, "format not support " + dsKey_), "");
}
return std::make_pair(Status::OK(), fullpath);
}
std::string GetWorkingDir(std::string dst) override
{
if (EndsWith(dsKey_, ".img")) {
return litebus::os::Join(dst, dsKey_.substr(DS_SCHEME.length()));
}
return dst;
}
std::string GetHash() override
{
return GetHashString(dsKey_);
}
private:
std::string dsKey_;
};
class ResourceAccessorFactory {
public:
static std::shared_ptr<ResourceAccessor> CreateAccessor(const std::string &uri)
{
if (uri.find(FTP_SCHEME) == 0) {
return nullptr;
}
if (uri.find(DS_SCHEME) == 0) {
return std::make_shared<DSAccessor>(uri);
}
if (uri.find(FILE_SCHEME) == 0) {
return std::make_shared<FileResourceAccessor>(uri);
}
if (uri.find(PATH_SCHEME) == 0) {
return std::make_shared<PathResourceAccessor>(uri);
}
if (IsDir(uri) || (!uri.empty() && uri[0] == '/' && uri.find("://") == std::string::npos)) {
return std::make_shared<PathResourceAccessor>(uri);
}
return nullptr;
}
};
WorkingDirDeployer::WorkingDirDeployer()
{
auto baseDir = GetDeployDir();
std::string appDir = litebus::os::Join(baseDir, APP_FOLDER_PREFIX);
std::string workingDir = litebus::os::Join(appDir, WORKING_DIR_FOLDER_PREFIX);
baseDeployDir_ = workingDir;
}
std::string WorkingDirDeployer::GetDestination(const std::string &deployDir, const std::string &uriFile,
const std::string &appID)
{
if (appID.empty() && uriFile.empty()) {
return "";
}
std::shared_ptr<ResourceAccessor> accessor = ResourceAccessorFactory::CreateAccessor(uriFile);
if (!accessor) {
YRLOG_WARN("Unsupported working_dir schema: {}", uriFile);
return "";
}
std::string workingDir;
if (!deployDir.empty()) {
std::string appDir = litebus::os::Join(deployDir, APP_FOLDER_PREFIX);
workingDir = litebus::os::Join(appDir, WORKING_DIR_FOLDER_PREFIX);
} else {
workingDir = baseDeployDir_;
}
std::string hash = accessor->GetHash();
YRLOG_DEBUG("md5 of workingDirZipFile({}): {}", uriFile, hash);
if (hash.empty()) {
YRLOG_DEBUG("failed to get hash of workingDirZipFile({}). use workingDir({}) instead", uriFile, workingDir);
return accessor->GetWorkingDir(workingDir);
}
auto res = litebus::os::Join(workingDir, hash);
res = accessor->GetWorkingDir(res);
YRLOG_DEBUG("{}|working dir deployer destination: {}", appID, res);
return res;
}
bool WorkingDirDeployer::IsDeployed(const std::string &destination, [[maybe_unused]] bool isMonopoly)
{
if (!litebus::os::ExistPath(destination)) {
return false;
}
if (EndsWith(destination, ".img")) {
return true;
}
auto option = litebus::os::Ls(destination);
if (option.IsSome() && !option.Get().empty()) {
return true;
}
return false;
}
DeployResult WorkingDirDeployer::Deploy(const std::shared_ptr<messages::DeployRequest> &request)
{
auto &config = request->deploymentconfig();
DeployResult result;
result.destination = GetDestination(config.deploydir(), config.bucketid(), config.objectid());
YRLOG_DEBUG(
"WorkingDir deployer received Deploy request to directory({}), workingDirZipFile({}), appID({}), "
"destination({})",
config.deploydir(), config.bucketid(), config.objectid(), result.destination);
std::string normalizedBucketID = StripScheme(config.bucketid(), PATH_SCHEME);
if (result.destination == config.bucketid() || result.destination == normalizedBucketID) {
if (!litebus::os::ExistPath(result.destination)) {
(void)litebus::os::Mkdir(result.destination);
}
result.status = Status::OK();
return result;
}
std::shared_ptr<ResourceAccessor> accessor =
ResourceAccessorFactory::CreateAccessor(config.bucketid());
if (!accessor) {
YRLOG_WARN("Unsupported working_dir schema: {}", config.bucketid());
result.status = Status(StatusCode::FUNC_AGENT_UNSUPPORTED_WORKING_DIR_SCHEMA,
"Unsupported working_dir schema: " + config.objectid());
return result;
}
auto dst = result.destination;
if (EndsWith(dst, ".img")) {
dst = GetDirectoryPath(dst);
}
if (!CheckIllegalChars(dst) || !litebus::os::Mkdir(dst).IsNone()) {
YRLOG_ERROR("failed to create dir for workingDir({}).", dst);
result.status =
Status(StatusCode::FUNC_AGENT_MKDIR_DEST_WORKING_DIR_ERROR,
"failed to create dest working dir for " + dst + ", msg: +" + litebus::os::Strerror(errno));
return result;
}
std::string cmd = "chmod -R 750 " + dst;
int chmodCode = std::system(cmd.c_str());
if (chmodCode) {
YRLOG_WARN("failed to execute chmod cmd({}). code: {}", cmd, chmodCode);
}
auto [status, workingDirZipFile] = accessor->GetResource(result.destination);
if (!status.IsOk()) {
result.status = status;
return result;
}
YRLOG_DEBUG("working dir: {}", workingDirZipFile);
result.status = Status::OK();
return result;
}
bool WorkingDirDeployer::Clear(const std::string &filePath, const std::string &objectKey)
{
YRLOG_DEBUG("Clear filePath({}), objectKey({})", filePath, objectKey);
std::string needsClearPath = filePath;
if (EndsWith(filePath, ".img")) {
needsClearPath = GetDirectoryPath(filePath);
}
return ClearFile(needsClearPath, objectKey);
}
Status WorkingDirDeployer::UnzipFile(const std::string &destDir, const std::string &workingDirZipFile)
{
if (!IsFile(workingDirZipFile)) {
return Status(StatusCode::FUNC_AGENT_INVALID_WORKING_DIR_FILE, "working_dir file is invalid");
}
std::string cmd = "unzip -d " + destDir + " " + workingDirZipFile;
if (!CheckIllegalChars(cmd)) {
return Status(StatusCode::PARAMETER_ERROR, "command has invalid characters");
}
int unzipCode = std::system(cmd.c_str());
if (unzipCode) {
YRLOG_ERROR("failed to execute unzip working_dir cmd({}). code: {}", cmd, unzipCode);
return Status(StatusCode::FUNC_AGENT_INVALID_WORKING_DIR_FILE, "failed to unzip working_dir file");
}
return Status::OK();
}
}