/**
 * Copyright (c) 2025 Huawei Technologies Co., Ltd.
 * This program is free software, you can redistribute it and/or modify it under the terms and conditions of 
 * CANN Open Software License Agreement Version 2.0 (the "License").
 * Please refer to the License for details. You may not use this file except in compliance with the License.
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, 
 * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
 * See LICENSE in the root of the software repository for the full text of the License.
 */

#include "daemon/daemon_service.h"
#include <cinttypes>
#include <fstream>
#include <regex>
#include "ge/ge_api_error_codes.h"
#include "base/err_msg.h"
#include "common/util.h"
#include "dflow/base/utils/process_utils.h"
#include "common/config/configurations.h"
#include "daemon/daemon_client_manager.h"
#include "common/utils/rts_api_utils.h"
#include "deploy/deployer/deployer_service_impl.h"
#include "deploy/deployer/deployer_proxy.h"
#include "deploy/resource/resource_manager.h"
#include "deploy/resource/deployer_port_distributor.h"
#include "deploy/deployer/deployer_authentication.h"

namespace ge {
namespace {
const std::string kProtocolTypeRdma = "RDMA";
}
Status DaemonService::Process(const std::string &peer_uri,
                              const deployer::DeployerRequest &request,
                              deployer::DeployerResponse &response) {
  auto request_type = request.type();
  if (request_type == deployer::kInitRequest) {
    ProcessInitRequest(peer_uri, request, response);
  } else if (request_type == deployer::kDisconnect) {
    ProcessDisconnectRequest(peer_uri, request, response);
  } else if (request_type == deployer::kHeartbeat) {
    ProcessHeartbeatRequest(request, response);
  } else {
    ProcessDeployRequest(request, response);
  }
  return SUCCESS;
}

Status DaemonService::VerifyIpaddr(const std::string &peer_uri) {
  const auto &remote_configs = Configurations::GetInstance().GetRemoteNodeConfigs();
  if (remote_configs.empty()) {
    GELOGI("Without remote config, no need to verify ipaddr.");
    return SUCCESS;
  }

  for (const auto &config : remote_configs) {
    if (peer_uri.find(config.ipaddr) != std::string::npos) {
      return SUCCESS;
    }
  }
  GELOGE(FAILED, "Failed to match ipaddr with remote configs, remote size = %zu.", remote_configs.size());
  return FAILED;
}

Status DaemonService::VerifySignData(const deployer::DeployerRequest &request) {
  const auto &local_node = Configurations::GetInstance().GetLocalNode();
  if (local_node.auth_lib_path.empty()) {
    GELOGI("No need to verify data for deployer");
    return SUCCESS;
  }
  const auto &init_request = request.init_request();
  auto data = std::to_string(request.type());
  return DeployerAuthentication::GetInstance().AuthVerify(data, init_request.sign_data());
}

Status DaemonService::VerifyInitRequest(const std::string &peer_uri,
                                        const deployer::DeployerRequest &request,
                                        deployer::DeployerResponse &response) {
  auto init_request = const_cast<deployer::DeployerRequest &>(request).mutable_init_request();
  GE_MAKE_GUARD(init_request, [&init_request]() {
    auto sign_data = init_request->mutable_sign_data();
    sign_data->replace(sign_data->begin(), sign_data->end(), "");
  });
  std::string error_msg;
  GE_DISMISSABLE_GUARD(failed_process, ([&error_msg, &response]() {
    response.set_error_code(FAILED);
    response.set_error_message(error_msg);
  }));
  error_msg = "Failed to verify ipaddr.";
  GE_CHK_STATUS_RET(VerifyIpaddr(peer_uri), "%s", error_msg.c_str());

  error_msg = "Failed to verify sign data.";
  GE_CHK_STATUS_RET_NOLOG(VerifySignData(request));
  GE_DISMISS_GUARD(failed_process);
  return SUCCESS;
}

void DaemonService::SetSupportFlowgwMerged(deployer::DeployerResponse &response) {
  const auto &local_node = Configurations::GetInstance().GetLocalNode();
  bool support_flowgw_merged = false;  // default not support merged
  if (local_node.protocol == kProtocolTypeRdma) {
    support_flowgw_merged = false;
  } else {
    int32_t support_merged = 0;
    (void) aclrtGetDeviceCapability(0, ACL_FEATURE_SYSTEM_MEMQ_EVENT_CROSS_DEV,
                                    &support_merged);
    support_flowgw_merged = (support_merged == 1);
  }
  GEEVENT("Support flowgw merged = %d.", static_cast<int32_t>(support_flowgw_merged));
  response.mutable_init_response()->set_support_flowgw_merged(support_flowgw_merged);
}

void DaemonService::ProcessInitRequest(const std::string &peer_uri,
                                       const deployer::DeployerRequest &request,
                                       deployer::DeployerResponse &response) {
  GEEVENT("[Process][Request] init request start.");
  auto res = VerifyInitRequest(peer_uri, request, response);
  if (res != SUCCESS) {
    GELOGW("[Create][Client] Verify init request failed.");
    return;
  }

  std::map<std::string, std::string> deployer_envs;
  for (const auto &env : request.init_request().envs()) {
    deployer_envs.emplace(env.first, env.second);
  }
  int64_t client_id = 0;
  res = client_manager_->CreateAndInitClient(peer_uri, deployer_envs, client_id);
  if (res != SUCCESS) {
    REPORT_INNER_ERR_MSG("E19999", "Create client failed.");
    GELOGE(FAILED, "[Create][Client] Create client failed.");
    response.set_error_code(FAILED);
    response.set_error_message("Create client id failed.");
    return;
  }

  int32_t dev_count = 1;
  int32_t offset = 0;
  client_manager_->GenDgwPortOffset(dev_count, offset);
  GEEVENT("[Process][Request] init request success, client_id = %" PRId64 ", "
          "address = %s, device count = %d, dwg port offset = %d.",
          client_id, peer_uri.c_str(), dev_count, offset);
  response.mutable_init_response()->set_client_id(client_id);
  response.mutable_init_response()->set_dev_count(dev_count);
  response.mutable_init_response()->set_dgw_port_offset(offset);
  SetSupportFlowgwMerged(response);
}

void DaemonService::ProcessDisconnectRequest(const std::string &peer_uri,
                                             const deployer::DeployerRequest &request,
                                             deployer::DeployerResponse &response) {
  int64_t client_id = request.client_id();
  GEEVENT("[Process][Request] disconnect request start, client_id = %" PRId64 ", addr = %s.", client_id, peer_uri.c_str());
  DeployerDaemonClient *client = nullptr;
  if (GetClient(client_id, &client, response)) {
    (void)client_manager_->CloseClient(client_id);
  }
  GEEVENT("[Process][Request] disconnect request succeeded, client_id = %" PRId64 ", addr = %s.", client_id, peer_uri.c_str());
}

void DaemonService::ProcessHeartbeatRequest(const deployer::DeployerRequest &request,
                                            deployer::DeployerResponse &response) {
  int64_t client_id = request.client_id();
  DeployerDaemonClient *client = nullptr;
  if (GetClient(client_id, &client, response)) {
    client->SetIsExecuting(true);
    client->OnHeartbeat();
    client->SetIsExecuting(false);
    (void)client->ProcessHeartbeatRequest(request, response);
  }
}

Status DaemonService::InitClientManager() {
  client_manager_ = MakeUnique<DaemonClientManager>();
  GE_CHECK_NOTNULL(client_manager_);
  GE_CHK_STATUS_RET(client_manager_->Initialize(), "Failed to initialize ClientManager");
  return SUCCESS;
}

bool DaemonService::GetClient(int64_t client_id, DeployerDaemonClient **client, deployer::DeployerResponse &response) {
  *client = client_manager_->GetClient(client_id);
  if (*client != nullptr) {
    return true;
  }

  REPORT_INNER_ERR_MSG("E19999", "Get client[%" PRId64 "] failed.", client_id);
  GELOGE(FAILED, "[Get][Client] Get client[%" PRId64 "] failed.", client_id);
  response.set_error_code(FAILED);
  response.set_error_message("Not exist client id");
  return false;
}

void DaemonService::ProcessDeployRequest(const deployer::DeployerRequest &request,
                                         deployer::DeployerResponse &response) {
  int64_t client_id = request.client_id();
  DeployerDaemonClient *client = nullptr;
  if (GetClient(client_id, &client, response)) {
    GELOGI("[Process][Request] process deploy request begin, client_id = %" PRId64 ", type = %s.", client_id,
           deployer::DeployerRequestType_Name(request.type()).c_str());
    GE_CHK_STATUS(client->ProcessDeployRequest(request, response),
                  "[Process][Request] process deploy request failed, client_id = %" PRId64 ", type = %s", client_id,
                  deployer::DeployerRequestType_Name(request.type()).c_str());
    GE_CHK_STATUS(response.error_code(),
                  "[Process][Request] check response failed, client_id = %" PRId64 ", type = %s, error code = %u", client_id,
                  deployer::DeployerRequestType_Name(request.type()).c_str(), response.error_code());
    GELOGI("[Process][Request] process deploy request end, client_id = %" PRId64 ", type = %s, ret = %u.", client_id,
           deployer::DeployerRequestType_Name(request.type()).c_str(), response.error_code());
  }
}

Status DaemonService::Initialize() {
  const auto &local_node = Configurations::GetInstance().GetLocalNode();
  GE_CHK_STATUS_RET(DeployerAuthentication::GetInstance().Initialize(local_node.auth_lib_path),
                    "Failed to deployer auth.");
  GE_CHK_STATUS_RET(DeployerProxy::GetInstance().Initialize(local_node), "Failed to initialize device proxy.");
  ResourceManager::GetInstance().ClearWorkingDir();
  GE_CHK_STATUS_RET(ResourceManager::GetInstance().Initialize(), "Failed to initialize ResourceManager");
  GE_CHK_STATUS_RET(InitClientManager(), "Failed to initialize ClientManager");
  GELOGI("DaemonService initialized successfully");
  return SUCCESS;
}

void DaemonService::Finalize() {
  DeployerProxy::GetInstance().Finalize();
  ResourceManager::GetInstance().Finalize();
  ResourceManager::GetInstance().ClearWorkingDir();
  client_manager_->Finalize();
  DeployerPortDistributor::GetInstance().Finalize();
  DeployerAuthentication::GetInstance().Finalize();
  GELOGI("DaemonService finalized successfully");
}

Status DeployerDaemonService::Initialize() {
  daemon_service_ = MakeUnique<DaemonService>();
  GE_CHECK_NOTNULL(daemon_service_);
  return daemon_service_->Initialize();
}

void DeployerDaemonService::Finalize() {
  daemon_service_->Finalize();
}

Status DeployerDaemonService::Process(const std::string &peer_uri,
                                      const deployer::DeployerRequest &request,
                                      deployer::DeployerResponse &response) {
  return daemon_service_->Process(peer_uri, request, response);
}
} // namespace ge