* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* MindIE is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef LLM_ENGINE_H
#define LLM_ENGINE_H
#include <atomic>
#include <cstdint>
#include <thread>
#include "data_type.h"
#include "dummy_quota_manager.h"
#include "engine/illm_engine.h"
#include "iload_balancer.h"
#include "ischeduler.h"
#include "latency_predictor/latency_predictor.h"
#include "layerwise_mixin/layerwise_mixin.h"
#include "lora_manager.h"
#include "model_exec_output_handler.h"
#include "process_group.h"
#include "request_response/request.h"
#include "seq_group_builder_from_infer_req.h"
#include "transfer_output_handler.h"
namespace mindie_llm {
constexpr int DEFAULT_SLEEP_TIME_BETWEEN_TWO_ITER = 1;
constexpr int HEARTBEAT_INTERVAL_SECONDS = 60;
constexpr int METRICS_UPDATE_INTERVAL = 50;
constexpr uint64_t ENGINE_ALL_THREADS_EMPTY_SCHEDULE_MS = 5000;
struct EngineMetricStatics {
std::atomic<float> prefillThroughput_{0.0};
std::atomic<float> decodeThroughput_{0.0};
};
struct EnginePerDP {
std::thread schedulerThread;
std::shared_ptr<LatencyPredictor> latencypredictor;
SchedulerPtr scheduler;
IExecutorSPtr modelExecutor;
std::unique_ptr<ModelExecOutputHandler> modelExecOutputHandler;
std::unique_ptr<TransferOutputHandler> transferOutputHandler;
ConcurrentDeque<RequestId> abortedRequestIds;
bool lastScheduleEmpty{false};
ForwardRespToManagerCall abortRespToManagerCall;
size_t addedRequestNum{0};
size_t abortedRequestNum{0};
ForwardMode lastForwardMode_;
std::chrono::time_point<std::chrono::high_resolution_clock> lastExecuteStartTime_{INVALID_TIME};
size_t lastBatchTokenNum_;
EngineMetricStatics engineMetricStatics_;
std::unordered_set<SequenceId> TGCleanupSeqIds_;
DummyQuotaManagerSPtr dummyQuotaManagerSPtr_;
std::atomic<uint64_t> lastNonEmptyScheduleSteadyMs_{0};
};
using EnginePerDPSPtr = std::shared_ptr<EnginePerDP>;
using SchOutDataPair =
std::pair<std::vector<std::vector<SequenceGroupMetaDatas>>, std::vector<std::vector<SchedulerOutputs>>>;
class LlmEngine final : public ILlmEngine {
public:
LlmEngine(SchedulerConfig schedulerConfig, std::vector<IExecutorSPtr> executors, ForwardRespToManagerCall cb,
Role pdRole, std::atomic<bool> *engineReadyFlag = nullptr);
~LlmEngine() override;
void InitProcessGroup(const std::vector<NodeInfo> &nodeInfos, std::string &processGroupMasterIP,
uint32_t processGroupMasterPort) override;
void StartEngineThread() override;
bool DistDecodeAcquireDummyQuota(bool isDummy, EnginePerDPSPtr enginePerDP) const;
bool AddRequest(RequestSPtr request) override;
void SendAbortResponse(SequenceGroupSPtr seqGroup, size_t localDPRank, InferStatusType flag) const;
void AbortRequests(std::unordered_set<RequestId> &requestIds) override;
void ReleaseKvCache(std::unordered_set<RequestId> &requestIds) override;
void Stop() override;
EngineMetric CollectEngineMetric(size_t localDPRank = 0) override;
EngineMetric CollectAllDpEngineMetric() override;
void SetPrefillPercentage(uint32_t prefillPercentage) override;
void PauseScheduling() override;
void ResumeScheduling() override;
void ExecuteRecoverCommand(RecoverCommandInfo &commandInfo) override;
protected:
void SchedulerThreadEntry(size_t localDPRank);
void ScheduleExecTransfer(std::shared_ptr<EnginePerDP> &engine) const;
void SyncBatchInfoAcrossNodes(SequenceGroupMetaDatas &metadata) const;
static ExecuteModelRequestPtr BuildExecuteModelRequest(std::vector<std::vector<SequenceGroupMetaDatas>> &metadatas,
std::vector<std::vector<SchedulerOutputs>> &schedulerOutputs,
bool distributedEnable, int dpRankId);
static TGCleanupRequestPtr BuildTGCleanupRequest(
std::unordered_set<SequenceId> &TGCleanupSeqIds);
SequenceId NextSeqId();
void RecordEngineMetrics(SchedulerOutputs &scOut, std::shared_ptr<EnginePerDP> enginePerDP);
void CalculateThroughput(std::shared_ptr<EnginePerDP> enginePerDP) const;
private:
void AccumulateDpMetricInto(size_t dpIndex, EngineMetric &aggregatedMetric);
void AbortParallelSeqGroups(size_t localDPRank) const;
void SendRecomputeResponse(std::vector<SequenceId> &recomputeSeqIds, size_t localDPRank);
template <typename T>
SequenceGroupSPtr GetSequenceGroupWithoutRank(T id);
SchOutDataPair PostScheduleSyncUp(bool needSync, SequenceGroupMetaDatas &metas, SchedulerOutputs &schOut,
size_t localDPRank);
std::pair<SequenceGroupMetaDatas, SchedulerOutputs> MakeDummySchedulerOutput(
SequenceGroupMetaDatas seqGroupMetadata) const;
void ExecuteDummy(EnginePerDPSPtr enginePerDP, SequenceGroupMetaDatas &seqGroupMetadata, size_t localDPRank,
std::function<void(ModelBatchResultSPtr)> responseHandler) const;
SchedulerConfigSPtr schedulerConfig_ = {nullptr};
std::vector<EnginePerDPSPtr> enginePerDPs_;
std::atomic<bool> stop_ = {false};
Role role_{Role::PnD};
LoadBalancerPtr loadBalancer_;
bool isProcessGroupInit{false};
bool isCentralizedThreadCCReady_{false};
bool isDistributedPNodeProcessCCReady_{false};
void SetupLatencyPredictor(const std::chrono::high_resolution_clock::time_point &batchExecuteStartTime,
int dpRankId);
std::atomic<bool> isPauseScheduling_{false};
std::atomic<bool> *llmEngineReady_{nullptr};
static uint64_t SteadyClockMsSinceEpoch();
void NotifySchedulerDidNonEmptySchedule(size_t localDPRank);
void MaybeMarkEngineNotReadyIfAllSchedulersEmptyTooLong();
int dpRankId_{0};
void switchRole(size_t localDPRank);
inline void CheckAndPrintHeartbeat(std::chrono::time_point<std::chrono::high_resolution_clock> &heartbeatBegin,
const EnginePerDPSPtr &enginePerDP) const {
auto diffTime = std::chrono::high_resolution_clock::now() - heartbeatBegin;
if (diffTime > std::chrono::seconds(HEARTBEAT_INTERVAL_SECONDS)) {
heartbeatBegin = std::chrono::high_resolution_clock::now();
auto passed_seconds = std::chrono::duration_cast<std::chrono::seconds>(diffTime).count();
MINDIE_LLM_LOG_INFO_REQUEST(
"Since last schedule, pass "
<< passed_seconds
<< " seconds, AsyncBatchNum=" << enginePerDP->modelExecOutputHandler->GetAsyncBatchNum()
<< ", freeNpuBlockNum=" << enginePerDP->scheduler->CollectSchedulerMetric().blockInfo.freeNpuBlockNum_
<< ", freeCpuBlockNum=" << enginePerDP->scheduler->CollectSchedulerMetric().blockInfo.freeCpuBlockNum_);
}
}
void LayerwiseEosClean(bool layerwiseDisaggregated, std::unordered_set<SequenceId> &eosCleanupSeqIds,
EnginePerDPSPtr enginePerDP) const;
LayerwiseMixin layerwiseMixin_;
};
}
#endif