/**
 * Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/**
 * Description: Worker program master file.
 */

#include <cerrno>
#include <cstdint>
#include <cstring>
#include <fstream>
#include <string>
#include <libgen.h>

#include "datasystem/common/log/log.h"
#include "datasystem/common/log/logging.h"
#include "datasystem/common/metrics/metrics.h"
#include "datasystem/common/perf/perf_manager.h"
#include "datasystem/common/signal/signal.h"
#include "datasystem/common/util/format.h"
#include "datasystem/common/util/gflag/flags.h"
#include "datasystem/common/util/net_util.h"
#include "datasystem/common/util/status_helper.h"
#include "datasystem/common/util/timer.h"
#include "datasystem/common/util/uri.h"
#include "datasystem/common/util/version.h"
#include "datasystem/common/util/validator.h"
#include "datasystem/worker/worker.h"
#include "datasystem/worker/worker_cli.h"
#include "datasystem/worker/worker_oc_server.h"
#include "datasystem/worker/worker_update_flag_check.h"

DS_DECLARE_string(monitor_config_file);

using namespace datasystem;
using namespace datasystem::worker;
static constexpr int CHECK_EVERY_MS = 1'000;
static constexpr int MAX_TOLERATED_ATTEMPTS = 10;
static constexpr int REPORTING_THRESHOLD_MS = CHECK_EVERY_MS * MAX_TOLERATED_ATTEMPTS;
static constexpr size_t ERR_MSG_BUF_SIZE = 256;

struct SetSchedRuntimeResult {
    bool success;
    int err;
};

SetSchedRuntimeResult SetWorkerSchedRuntime();
uint64_t GetWorkerSchedRuntimeNs();

std::string StrError(int err)
{
    char buf[ERR_MSG_BUF_SIZE] = { 0 };
#if defined(__GLIBC__) && defined(_GNU_SOURCE)
    return std::string(strerror_r(err, buf, sizeof(buf)));
#else
    auto ret = strerror_r(err, buf, sizeof(buf));
    if (ret != 0) {
        return FormatString("Unknown error %d", err);
    }
    return std::string(buf);
#endif
}

void LogSetWorkerSchedRuntimeResult(const SetSchedRuntimeResult &result)
{
    if (!result.success) {
        LOG(WARNING) << FormatString("Failed to set worker sched runtime to %llu ns, errno: %d, error: %s",
                                     static_cast<unsigned long long>(GetWorkerSchedRuntimeNs()), result.err,
                                     StrError(result.err));
        return;
    }
    LOG(INFO) << "Set worker sched runtime to " << GetWorkerSchedRuntimeNs() << " ns.";
}

int main(int argc, char **argv)
{
    auto setSchedRuntimeResult = SetWorkerSchedRuntime();

    Flags flags;
    auto rc = Worker::GetInstance()->Init(flags, argc, argv);
    if (rc.IsError()) {
        LOG(ERROR) << "Worker runtime error:" << rc.ToString();
        LogSetWorkerSchedRuntimeResult(setSchedRuntimeResult);
        LOG_IF_ERROR(Worker::GetInstance()->ShutDown(), "worker shutdown failed");
        return -1;
    }
    if (!IsTermSignalReceived()) {
        LogSetWorkerSchedRuntimeResult(setSchedRuntimeResult);
    }
    PerfManager *perfManager = PerfManager::Instance();

    Timer timer;
    {
        std::unique_lock<std::mutex> termSignalLock(g_termSignalMutex);
        while (!IsTermSignalReceived()) {
            bool signalReceived = g_termSignalCv.wait_for(termSignalLock, std::chrono::milliseconds(CHECK_EVERY_MS),
                                                          [] { return IsTermSignalReceived(); });
            if (signalReceived) {
                break;
            }
            auto elapsedMs = timer.ElapsedMilliSecondAndReset();
            if (elapsedMs > REPORTING_THRESHOLD_MS) {
                LOG(ERROR) << FormatString("Worker was hanged about %.2f ms", elapsedMs);
            }
            if (perfManager != nullptr) {
                perfManager->Tick();
            }
            metrics::Tick();
            // Check whether the configuration file is updated every 10 seconds.
            flags.MonitorConfigFile(FLAGS_monitor_config_file);
        }
    }
    if (perfManager != nullptr) {
        perfManager->PrintPerfLog();
    }
    metrics::PrintSummary();

    rc = Worker::GetInstance()->PreShutDown();
    if (rc.IsError()) {
        LOG(ERROR) << "Worker preShutDown error:" << rc.ToString();
        LOG_IF_ERROR(Worker::GetInstance()->ShutDown(), "worker preshutdown failed");
        return -1;
    }

    rc = Worker::GetInstance()->ShutDown();
    if (rc.IsError()) {
        LOG(ERROR) << "Worker runtime error:" << rc.ToString();
        return -1;
    }
    return 0;
}