/**
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "bench_perf.h"

#include <sstream>
#include <unistd.h>
#include <pwd.h>
#include <fcntl.h>
#include <iostream>
#include <set>

#include "datasystem/common/util/net_util.h"
#include "datasystem/common/util/status_helper.h"
#include "datasystem/common/util/uri.h"

namespace datasystem {
namespace bench {

#ifdef ENABLE_PERF
PerfManager::PerfManager(const ArgsBase &args)
    : supportPerf_(true),
      action_(args.action),
      perfWorkersStr_(args.perfWorkers.empty() ? args.workerAddress : args.perfWorkers),
      perfPath_(args.perfPath),
      accessKey_(args.accessKey),
      secretKey_(args.secretKey)
{
}

Status PerfManager::Init()
{
    auto perfWorkerAddrs = Split(perfWorkersStr_, ",");
    std::set<std::string> uniquePerfWorkerAddrs = { perfWorkerAddrs.begin(), perfWorkerAddrs.end() };

    for (const auto &worker : uniquePerfWorkerAddrs) {
        HostPort hostPort;
        auto rc = hostPort.ParseString(worker);
        if (rc.IsError()) {
            std::cerr << "Worker address parse error: " << rc.GetMsg() << std::endl;
            supportPerf_ = false;
            return rc;
        }

        ConnectOptions connectOptions = { .host = hostPort.Host(), .port = hostPort.Port() };
        connectOptions.accessKey = accessKey_;
        connectOptions.secretKey = secretKey_;
        auto client = std::make_unique<PerfClient>(connectOptions);
        rc = client->Init();
        if (rc.IsError()) {
            std::cerr << "PerfClient init error: " << rc.GetMsg() << std::endl;
            supportPerf_ = false;
            return rc;
        }
        perfClients_.emplace(worker, std::move(client));
    }

    return Status::OK();
}

Status PerfManager::SavePerfLog(const std::string &type, const std::string &workerAddr)
{
    if (!supportPerf_ || perfPath_.empty()) {
        return Status::OK();
    }

    std::unordered_map<std::string, std::unordered_map<std::string, uint64_t>> perfLog;
    auto it = perfClients_.find(workerAddr);
    if (it == perfClients_.end()) {
        return Status::OK();
    }
    auto &perfClient = it->second;
    RETURN_IF_NOT_OK(perfClient->GetPerfLog(type, perfLog));

    std::stringstream ss;
    for (auto &item : perfLog) {
        ss << action_ << ",";
        ss << type;
        if (type == "worker") {
            ss << " " << workerAddr;
        }
        ss << ",";
        ss << item.first << ",";
        ss << item.second["avg_time"] << ",";
        ss << item.second["count"] << ",";
        ss << item.second["min_time"] << ",";
        ss << item.second["max_time"] << ",";
        ss << item.second["total_time"] << ",";
        ss << item.second["max_frequency"];
        ss << "\n";
    }
    auto output = ss.str();

    std::string finalPerfPath = perfPath_;

    const std::string kDefaultPerfPath = "~/.datasystem/perf";
    const std::string kAdditionalPerfPath = "";

    Status status = Uri::NormalizePathWithUserHomeDir(finalPerfPath, kDefaultPerfPath, kAdditionalPerfPath);
    if (!status.OK()) {
        return status;
    }

    const int fileMode = 0640;
    int fd = open(finalPerfPath.c_str(), O_WRONLY | O_CREAT | O_APPEND, fileMode);
    if (fd < 0) {
        std::string errMsg = "Failed to open perf log file: " + std::string(strerror(errno));
        std::cerr << errMsg << std::endl;
        return Status(K_RUNTIME_ERROR, __LINE__, __FILE__, errMsg);
    }

    int sz = write(fd, output.c_str(), output.size());
    if (sz < 0) {
        std::string errMsg = "Failed to write to perf log file: " + std::string(strerror(errno));
        std::cerr << errMsg << std::endl;
        RETRY_ON_EINTR(close(fd));
        return Status(K_RUNTIME_ERROR, __LINE__, __FILE__, errMsg);
    }

    RETRY_ON_EINTR(close(fd));

    return Status::OK();
}

Status PerfManager::SaveAllPerfLog()
{
    if (!supportPerf_ || perfClients_.empty() || perfPath_.empty()) {
        std::cout << "exit ahead" << std::endl;
        return Status::OK();
    }

    bool isFirst = true;
    for (auto &item : perfClients_) {
        auto &workerAddr = item.first;
        if (isFirst) {
            RETURN_IF_NOT_OK(SavePerfLog("client", workerAddr));
            isFirst = false;
        }
        RETURN_IF_NOT_OK(SavePerfLog("worker", workerAddr));
    }

    return Status::OK();
}

Status PerfManager::ResetPerfLog()
{
    if (!supportPerf_ || perfClients_.empty()) {
        std::cout << "exit ahead" << std::endl;
        return Status::OK();
    }

    bool isFirst = true;
    for (auto &item : perfClients_) {
        auto &perfClient = item.second;
        if (isFirst) {
            RETURN_IF_NOT_OK(perfClient->ResetPerfLog("client"));
            isFirst = false;
        }
        RETURN_IF_NOT_OK(perfClient->ResetPerfLog("worker"));
    }

    return Status::OK();
}
#endif
}  // namespace bench
}  // namespace datasystem