/**
 * Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/**
 * Description: RPC Server.
 */
#include "datasystem/common/rpc/rpc_server.h"

#include "datasystem/common/rpc/rpc_auth_key_manager.h"
#include "datasystem/common/rpc/zmq/zmq_server_impl.h"
#include "datasystem/common/util/thread_pool.h"

namespace datasystem {
RpcServer::RpcServer(Token key, const RpcCredential &cred)
{
    (void)key;
    LOG(INFO) << "Start up server with ZMQ communication framework.";
    auto passkey = ZmqServerImpl::Token();
    pimpl_ = std::make_unique<ZmqServerImpl>(passkey, cred);
}
RpcServer::~RpcServer() noexcept = default;

Status RpcServer::Init()
{
    return std::visit([](auto &pimpl) { return pimpl->Init(); }, pimpl_);
}

Status RpcServer::Run()
{
    return std::visit([](auto &pimpl) { return pimpl->Run(); }, pimpl_);
}

void RpcServer::Shutdown()
{
    std::visit([](auto &pimpl) { pimpl->Shutdown(); }, pimpl_);
}

Status RpcServer::Bind(const std::string &endpoint)
{
    return std::visit([&endpoint](auto &pimpl) { return pimpl->Bind(endpoint); }, pimpl_);
}

Status RpcServer::RegisterService(ZmqService *svc, const RpcServiceCfg &cfg)
{
    return std::get<std::unique_ptr<ZmqServerImpl>>(pimpl_)->RegisterService(ZmqServerImpl::Token(), svc, cfg);
}

Status RpcServer::InitAuthHandler()
{
    return std::visit([](auto &pimpl) { return pimpl->InitAuthHandler(); }, pimpl_);
}

void RpcServer::Interrupt()
{
    std::visit([](auto &pimpl) { pimpl->Interrupt(); }, pimpl_);
}

bool RpcServer::IsInterrupted() const
{
    return std::visit([](auto &pimpl) { return pimpl->IsInterrupted(); }, pimpl_);
}

std::vector<std::string> RpcServer::GetListeningPorts() const
{
    return std::visit([](auto &pimpl) { return pimpl->GetListeningPorts(); }, pimpl_);
}

ThreadPool::ThreadPoolUsage RpcServer::GetRpcServicesUsage(const std::string &serviceName) const
{
    return std::visit([&serviceName](auto &pimpl) { return pimpl->GetRpcServicesUsage(serviceName); }, pimpl_);
}

ThreadPool::ThreadPoolUsage RpcServer::GetRpcServicesSnapshot(const std::string &serviceName) const
{
    return std::visit([&serviceName](auto &pimpl) { return pimpl->GetRpcServicesSnapshot(serviceName); }, pimpl_);
}

Status RpcServer::Builder::Init(std::unique_ptr<RpcServer> &server) const
{
    auto key = Token();
    server = std::make_unique<RpcServer>(key, cred_);
    RETURN_IF_NOT_OK(server->Init());
    if (RpcAuthKeyManager::Instance().HasAuthHandler()) {
        RETURN_IF_NOT_OK(server->InitAuthHandler());
    }
    for (const auto &v : endPts_) {
        RETURN_IF_NOT_OK(server->Bind(v));
    }
    return Status::OK();
}

Status RpcServer::Builder::BuildAndStart(std::unique_ptr<RpcServer> &server) const
{
    try {
        for (auto &ele : svcList_) {
            auto &svcEle = ele.second;
            auto func = [&server, &svcEle](auto *svc) {
                RETURN_RUNTIME_ERROR_IF_NULL(svc);
                RETURN_IF_NOT_OK(server->RegisterService(svc, svcEle));
                return Status::OK();
            };
            RETURN_IF_NOT_OK(std::visit([&func](auto *svc) { return func(svc); }, ele.first));
        }
        if (preStartCallback_) {
            RETURN_IF_NOT_OK(preStartCallback_());
        }
        RETURN_IF_NOT_OK(server->Run());
    } catch (const std::bad_alloc &e) {
        RETURN_STATUS(StatusCode::K_OUT_OF_MEMORY, e.what());
    }
    return Status::OK();
}
}  // namespace datasystem