* This program is free software, you can redistribute it and/or modify it.
* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This file is a part of the CANN Open Software.
* Licensed under CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include <memory>
#include <algorithm>
#include <vector>
#include <cstdlib>
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include "llm_datadist/llm_error_codes.h"
#include "llm_datadist_v2.h"
#include "common/llm_inner_types.h"
#include "llm_datadist/llm_engine_types.h"
#include "dlog_pub.h"
#include "depends/mmpa/src/mmpa_stub.h"
#include "depends/llm_datadist/src/data_cache_engine_test_helper.h"
#include "common/llm_mem_pool.h"
#include "rt_error_codes.h"
#include "data_transfer/d2d_data_transfer_job.h"
#include "cache_mgr/data_cache_engine.h"
#include "link_mgr/comm_entity_manager.h"
#include "fsm/send_state.h"
#include "hccl/hccl_adapter.h"
#include "depends/llm_datadist/src/hccl_stub.h"
using namespace std;
using namespace ge;
using namespace ::testing;
using ::testing::Invoke;
using ::testing::Mock;
namespace llm {
namespace {
class DataCacheEngineRunner {
public:
DataCacheEngineRunner() = default;
~DataCacheEngineRunner() = default;
void LlmDatadistInitAndLink(const llm::CacheDesc &src_cache_desc, const llm::CacheDesc &dst_cache_desc,
const llm::PullCacheParam &pull_cache_param, bool use_host_mem_pool = false,
bool enable_remote_cache_accessible = false) {
std::map<ge::AscendString, ge::AscendString> default_options{
{llm::LLM_OPTION_ROLE, llm::kDecoder},
{OPTION_EXEC_DEVICE_ID, "0"},
{llm::LLM_OPTION_SYNC_KV_CACHE_WAIT_TIME, "600000"},
{llm::LLM_OPTION_MEM_POOL_CONFIG, "{\"memory_size\": 1024288000}"}};
if (use_host_mem_pool) {
default_options[llm::LLM_OPTION_HOST_MEM_POOL_CONFIG] = "{\"memory_size\": 102428800}";
}
if (enable_remote_cache_accessible) {
default_options[llm::kLlmOptionEnableRemoteCacheAccessible] = "1";
}
EXPECT_EQ(llm_data_dist_.LLMDataDistInitialize(default_options), ge::SUCCESS);
EXPECT_EQ(llm_data_dist_.CheckCapacity(1024), ge::SUCCESS);
if (!pull_cache_param.decoder_blocks.empty()) {
dst_cache_keys_.resize(1);
dst_cache_keys_.front().is_allocate_blocks = true;
dst_cache_keys_.front().req_id = 1;
}
if (!pull_cache_param.prompt_blocks.empty()) {
src_cache_keys_.resize(1);
src_cache_keys_.front().is_allocate_blocks = true;
}
uint32_t host_buffers_size = 0U;
if (src_cache_desc.placement == 0) {
host_buffers_size = src_cache_desc.num_tensors;
}
if (dst_cache_desc.placement == 0) {
host_buffers_size = dst_cache_desc.num_tensors;
}
host_buffers_.resize(host_buffers_size, std::vector<int32_t>(8 * 1024 * 1024));
if (src_cache_desc.placement == 1) {
ASSERT_EQ(llm_data_dist_.AllocateCache(src_cache_desc, src_cache_, src_cache_keys_), SUCCESS);
} else {
src_cache_.per_device_tensor_addrs.resize(1);
for (auto &host_buffer : host_buffers_) {
src_cache_.per_device_tensor_addrs[0].emplace_back(reinterpret_cast<uintptr_t>(&host_buffer[0]));
}
ASSERT_EQ(llm_data_dist_.RegisterCache(src_cache_desc, src_cache_, src_cache_keys_), SUCCESS);
}
if (dst_cache_desc.placement == 1) {
ASSERT_EQ(llm_data_dist_.AllocateCache(dst_cache_desc, dst_cache_, dst_cache_keys_), SUCCESS);
} else {
if (use_host_mem_pool) {
ASSERT_EQ(llm_data_dist_.AllocateCache(dst_cache_desc, dst_cache_, dst_cache_keys_), SUCCESS);
} else {
dst_cache_.per_device_tensor_addrs.resize(1);
for (auto &host_buffer : host_buffers_) {
dst_cache_.per_device_tensor_addrs[0].emplace_back(reinterpret_cast<uintptr_t>(&host_buffer[0]));
}
ASSERT_EQ(llm_data_dist_.RegisterCache(dst_cache_desc, dst_cache_, dst_cache_keys_), SUCCESS);
}
}
const std::map<uint64_t, uint32_t> cluster_to_rank{{0, 0}, {1, 1}};
std::string rank_table = "01";
std::string cluster_name = "link";
EXPECT_EQ(llm_data_dist_.Link(cluster_name, cluster_to_rank, rank_table, comm_id_), ge::SUCCESS);
std::vector<int32_t> tensor_data{1, 2, 3, 4};
memcpy((void *)src_cache_.per_device_tensor_addrs[0][0], tensor_data.data(), tensor_data.size() * sizeof(int32_t));
llm::RegisterMemoryStatus status = llm::RegisterMemoryStatus::PREPARING;
while (status != llm::RegisterMemoryStatus::OK) {
EXPECT_EQ(llm_data_dist_.QueryRegisterMemStatus(comm_id_, status), ge::SUCCESS);
}
}
void ReleaseResource() {
EXPECT_EQ(llm_data_dist_.DeallocateCache(src_cache_.cache_id), ge::SUCCESS);
EXPECT_EQ(llm_data_dist_.DeallocateCache(dst_cache_.cache_id), ge::SUCCESS);
for (const auto src_cache_key : src_cache_keys_) {
EXPECT_EQ(llm_data_dist_.RemoveCacheKey(src_cache_key), ge::SUCCESS);
}
for (const auto dst_cache_key : dst_cache_keys_) {
EXPECT_EQ(llm_data_dist_.RemoveCacheKey(dst_cache_key), ge::SUCCESS);
}
EXPECT_EQ(llm_data_dist_.Unlink(comm_id_), ge::SUCCESS);
llm_data_dist_.LLMDataDistFinalize();
}
void PullDataCache(const llm::PullCacheParam &pull_cache_param, std::vector<int32_t> &pull_result) {
llm::CacheKey cache_key{};
cache_key.prompt_cluster_id = 0;
cache_key.prompt_cache_id = 1;
EXPECT_EQ(llm_data_dist_.PullCache(dst_cache_.cache_id, cache_key, pull_cache_param), SUCCESS);
auto pulled_data = reinterpret_cast<int32_t *>(dst_cache_.per_device_tensor_addrs[0][0]);
memcpy(pull_result.data(), pulled_data, sizeof(int32_t) * pull_result.size());
}
void TransferDataCache(const llm::TransferCacheConfig &transfer_cache_config,
const llm::TransferBlockConfig &transfer_block_config, std::vector<int32_t> &pull_result) {
EXPECT_EQ(llm_data_dist_.TransferCache(0, transfer_cache_config, transfer_block_config), SUCCESS);
LLMLOGI("dst_addrs 0 addrs:%p", transfer_cache_config.dst_addrs[0]);
auto pulled_data = reinterpret_cast<int32_t *>(transfer_cache_config.dst_addrs[0]);
memcpy(pull_result.data(), pulled_data, sizeof(int32_t) * pull_result.size());
}
llm::Cache &GetSrcCache() {
return src_cache_;
}
llm::Cache &GetDstCache() {
return dst_cache_;
}
uint64_t GetCommId() const {
return comm_id_;
}
llm::LLMDataDistV2 &GetLlmDataDist() {
return llm_data_dist_;
}
private:
llm::LLMDataDistV2 llm_data_dist_{llm::LLMDataDistV2(1)};
uint64_t comm_id_{0U};
llm::Cache src_cache_;
llm::Cache dst_cache_;
std::vector<llm::CacheKey> src_cache_keys_;
std::vector<llm::CacheKey> dst_cache_keys_;
std::vector<std::vector<int32_t>> host_buffers_;
};
HcclResult HcclExchangeMemDesc1(HcclComm comm, uint32_t remoteRank, HcclMemDescs *local, int timeout,
HcclMemDescs *remote, uint32_t *actualNum) {
for (uint32_t i = 0U; i < local->arrayLength; ++i) {
strcpy(remote->array[i].desc, local->array[i].desc);
}
*actualNum = local->arrayLength;
remote->arrayLength = local->arrayLength;
return HcclResult::HCCL_SUCCESS;
}
class MockMmpaLongTimeRegister : public MmpaStubApiGe {
public:
void *DlOpen(const char *file_name, int32_t mode) override {
static int64_t addr = 0;
return &addr;
}
void *DlSym(void *handle, const char *func_name) override {
static const std::map<std::string, void*> func_map = {
{"HcclCommInitClusterInfoMemConfig", reinterpret_cast<void*>(&HcclCommInitClusterInfoMemConfig)},
{"HcclExchangeMemDesc", reinterpret_cast<void*>(&HcclExchangeMemDesc1)},
{"HcclCommDestroy", reinterpret_cast<void*>(&HcclCommDestroy)},
{"HcclBatchPut", reinterpret_cast<void*>(&HcclBatchPut)},
{"HcclBatchGet", reinterpret_cast<void*>(&HcclBatchGet)},
{"HcclRemapRegistedMemory", reinterpret_cast<void*>(&HcclRemapRegistedMemory)},
{"HcclRegisterGlobalMem", reinterpret_cast<void*>(&HcclRegisterGlobalMem)},
{"HcclDeregisterGlobalMem", reinterpret_cast<void*>(&HcclDeregisterGlobalMem)},
{"HcclCommBindMem", reinterpret_cast<void*>(&HcclCommBindMem)},
{"HcclCommUnbindMem", reinterpret_cast<void*>(&HcclCommUnbindMem)},
{"HcclCommPrepare", reinterpret_cast<void*>(&HcclCommPrepare)},
};
auto it = func_map.find(func_name);
if (it != func_map.end()) {
LLMLOGI("%s addr:%lu", func_name, static_cast<uint64_t>(reinterpret_cast<uintptr_t>(it->second)));
return it->second;
}
return nullptr;
}
int32_t DlClose(void *handle) override {
return 0;
}
};
class MockRuntime : public llm::RuntimeStub {
public:
rtError_t rtStreamSynchronizeWithTimeout(rtStream_t stm, int32_t timeout) override {
return count_++ == 1 ? ACL_ERROR_RT_STREAM_SYNC_TIMEOUT : RT_ERROR_NONE;
}
int32_t count_ = 0;
};
}
class DataCacheEngineSTest : public ::testing::Test {
protected:
void SetUp() override {
llm::MockMmpaForHcclApi::Install();
}
void TearDown() override {
llm::HcclAdapter::GetInstance().Finalize();
llm::MockMmpaForHcclApi::Reset();
}
};
TEST_F(DataCacheEngineSTest, UnlinkWhenPrepareNotFinished) {
MmpaStub::GetInstance().SetImpl(std::make_shared<MockMmpaLongTimeRegister>());
llm::LLMDataDistV2 llm_datadist(1U);
std::map<ge::AscendString, ge::AscendString> options{};
options["llm.Role"] = "Decoder";
EXPECT_EQ(llm_datadist.LLMDataDistInitialize(options), ge::SUCCESS);
std::map<uint64_t, uint32_t> cluster2rank{{1, 0}, {2, 1}};
std::string rank_table;
uint64_t comm_id;
std::string cluster_name = "link";
EXPECT_EQ(llm_datadist.Link(cluster_name, cluster2rank, rank_table, comm_id), ge::SUCCESS);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_EQ(llm_datadist.Unlink(comm_id), ge::SUCCESS);
llm_datadist.LLMDataDistFinalize();
}
TEST_F(DataCacheEngineSTest, TestMemPool) {
int64_t addr = 0;
void *base_address_ = &addr;
size_t pool_size = 10UL * 64 * 1024;
llm::ScalableConfig config{};
config.page_idem_num = 16;
config.page_mem_size_total_threshold = pool_size;
llm::LlmMemPool mem_pool(config);
ASSERT_EQ(mem_pool.Initialize(base_address_, pool_size), ge::SUCCESS);
std::vector<void *> addrs;
for (int i = 0; i < 10; ++i) {
auto addr = mem_pool.Alloc(32 * 1024);
EXPECT_TRUE(addr != nullptr);
addrs.emplace_back(addr);
}
EXPECT_TRUE(mem_pool.Alloc(32 * 1024, 1000) == nullptr);
std::thread free_th([&mem_pool, &addrs]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
for (auto addr : addrs) {
mem_pool.Free(addr);
}
});
for (int i = 0; i < 10; ++i) {
LLMLOGI("TEST--- allocate index = %d, start", i);
auto addr = mem_pool.Alloc(32 * 1024, 3000);
LLMLOGI("TEST--- allocate index = %d, ret = %p", i, addr);
ASSERT_TRUE(addr != nullptr);
}
free_th.join();
}
TEST_F(DataCacheEngineSTest, TestMemPool_Shared) {
int64_t addr = 0;
void *base_address_ = &addr;
size_t pool_size = 10UL * 64 * 1024;
llm::ScalableConfig config{};
config.page_idem_num = 16;
config.page_mem_size_total_threshold = pool_size;
llm::LlmMemPool mem_pool(config);
ASSERT_EQ(mem_pool.Initialize(base_address_, pool_size), ge::SUCCESS);
std::vector<std::shared_ptr<void>> addrs;
for (int i = 0; i < 10; ++i) {
auto addr = mem_pool.AllocShared(32 * 1024);
EXPECT_TRUE(addr != nullptr);
addrs.emplace_back(addr);
}
addrs.clear();
}
TEST_F(DataCacheEngineSTest, PullDataCache_D2H_C2C) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {4, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
llm::CacheDesc dst_cache_desc = src_cache_desc;
src_cache_desc.shape = {2, 128};
dst_cache_desc.placement = 0;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = 128 * 4;
pull_cache_param.batch_index = 1;
std::vector<int32_t> pull_result(2 * 128);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[128], &pull_result[128 + 4]);
EXPECT_EQ(actual, (std::vector<int32_t>{1, 2, 3, 4}));
}
TEST_F(DataCacheEngineSTest, PullDataCache_D2H_B2B) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {128, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
src_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.placement = 0;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
pull_cache_param.prompt_blocks = {0,1,4,5,6};
pull_cache_param.decoder_blocks = {1,2,4,6,9};
std::vector<int32_t> pull_result(128 * 128);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[128], &pull_result[128 + 4]);
EXPECT_EQ(actual, (std::vector<int32_t>{1, 2, 3, 4}));
}
TEST_F(DataCacheEngineSTest, PullCache_D2H_B2B_BigBlock) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 1;
src_cache_desc.shape = {1, 10L * 1024 * 1024};
src_cache_desc.data_type = ge::DT_INT32;
src_cache_desc.placement = 1;
src_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.placement = 0;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
pull_cache_param.prompt_blocks.resize(32);
pull_cache_param.decoder_blocks.resize(32);
llm::MockMmpaForHcclApi::Install();
llm::RuntimeStub::SetInstance(std::make_shared<llm::DataCacheEngineRuntimeMock>());
llm::HcclAdapter::GetInstance().Initialize();
llm::DataCacheEngineTestRunner test_runner(100 * 1024 * 1024);
test_runner.Initialize(src_cache_desc, dst_cache_desc, pull_cache_param);
EXPECT_EQ(test_runner.Run(pull_cache_param), ge::SUCCESS);
}
TEST_F(DataCacheEngineSTest, PullDataCache_D2H_C2B) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {1, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
auto dst_cache_desc = src_cache_desc;
dst_cache_desc.shape = {64, 2};
dst_cache_desc.placement = 0;
dst_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
pull_cache_param.decoder_blocks = {1, 3, 5, 7};
std::vector<int32_t> pull_result(4 * 32);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[0], &pull_result[8]);
EXPECT_EQ(actual, (std::vector<int32_t>{0, 0, 1, 2, 0, 0, 3, 4}));
}
TEST_F(DataCacheEngineSTest, PullDataCache_H2D_C2C) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {2, 256 * 1024 * 3};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 0;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.shape = {2, 256 * 1024 * 4};
dst_cache_desc.placement = 1;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = 1024 * 1024 * 3;
pull_cache_param.batch_index = 1;
std::vector<int32_t> pull_result(2 * 256 * 1024 * 4);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[256 * 1024 * 4], &pull_result[256 * 1024 * 4 + 4]);
EXPECT_EQ(actual, (std::vector<int32_t>{1, 2, 3, 4}));
}
TEST_F(DataCacheEngineSTest, PullDataCache_H2D_B2B) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {128, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 0;
src_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.placement = 1;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
for (int i = 0; i < 64; ++i) {
pull_cache_param.prompt_blocks.push_back(i);
pull_cache_param.decoder_blocks.push_back(i);
}
std::reverse(pull_cache_param.decoder_blocks.begin(), pull_cache_param.decoder_blocks.end());
std::vector<int32_t> pull_result(128 * 128);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[128 * 63], &pull_result[128 * 63 + 4]);
EXPECT_EQ(actual, (std::vector<int32_t>{1, 2, 3, 4}));
}
TEST_F(DataCacheEngineSTest, PullDataCache_D2D_C2C) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {1, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.shape = {4, 128};
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = 128 * 4;
pull_cache_param.batch_index = 1;
std::vector<int32_t> pull_result(4 * 128);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[128], &pull_result[128 + 4]);
EXPECT_EQ(actual, (std::vector<int32_t>{1, 2, 3, 4}));
}
void PullCacheThread(llm::LLMDataDistV2 &llm_data_dist, llm::CacheKey &cache_key, llm::Cache &dst_cache,
llm::PullCacheParam &pull_cache_param) {
auto ret = llm_data_dist.PullCache(dst_cache.cache_id, cache_key, pull_cache_param);
EXPECT_TRUE(ret == ge::SUCCESS || ret == ge::LLM_NOT_YET_LINK);
}
void UnlinkThread(llm::LLMDataDistV2 &llm_data_dist, uint64_t comm_id) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
EXPECT_EQ(llm_data_dist.Unlink(comm_id), ge::SUCCESS);
}
TEST_F(DataCacheEngineSTest, PullDataCache_D2D_C2C_sync_with_unlink) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {1, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.shape = {4, 128};
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = 128 * 4;
pull_cache_param.batch_index = 1;
std::vector<int32_t> pull_result(4 * 128);
llm::Cache dst_cache;
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
llm::LLMDataDistV2 &llm_data_dist = data_cache_engine_runner.GetLlmDataDist();
llm::CacheKey cache_key{};
cache_key.prompt_cluster_id = 0;
cache_key.prompt_cache_id = 1;
std::thread thread2(UnlinkThread, std::ref(llm_data_dist), data_cache_engine_runner.GetCommId());
std::thread thread1(PullCacheThread, std::ref(llm_data_dist), std::ref(cache_key),
std::ref(data_cache_engine_runner.GetDstCache()), std::ref(pull_cache_param));
if (thread1.joinable()) {
thread1.join();
}
if (thread2.joinable()) {
thread2.join();
}
llm_data_dist.LLMDataDistFinalize();
}
TEST_F(DataCacheEngineSTest, PullDataCache_D2D_B2B) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {128, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
src_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.placement = 1;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
pull_cache_param.prompt_blocks = {0,1,4,5,6};
pull_cache_param.decoder_blocks = {1,2,4,6,9};
std::vector<int32_t> pull_result(128 * 128);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[128], &pull_result[128 + 4]);
EXPECT_EQ(actual, (std::vector<int32_t>{1, 2, 3, 4}));
}
TEST_F(DataCacheEngineSTest, PullCache_D2D_B2B_BatchGet) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {128, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
src_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.placement = 1;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
pull_cache_param.prompt_blocks = {0,1,4,5,6};
pull_cache_param.decoder_blocks = {1,2,4,6,9};
std::vector<int32_t> pull_result(128 * 128);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param, false, true);
llm::CacheKey cache_key{};
cache_key.prompt_cluster_id = 0;
cache_key.prompt_cache_id = 1;
llm::RuntimeStub::SetInstance(std::make_shared<MockRuntime>());
EXPECT_EQ(data_cache_engine_runner.GetLlmDataDist().PullCache(data_cache_engine_runner.GetDstCache().cache_id,
cache_key,
pull_cache_param), ge::LLM_TIMEOUT);
llm::RuntimeStub::Reset();
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[128], &pull_result[128 + 4]);
EXPECT_EQ(actual, (std::vector<int32_t>{1, 2, 3, 4}));
}
TEST_F(DataCacheEngineSTest, PullDataCache_D2D_C2B) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {1, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
auto dst_cache_desc = src_cache_desc;
dst_cache_desc.shape = {64, 2};
dst_cache_desc.placement = 1;
dst_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
pull_cache_param.decoder_blocks = {1, 3, 5, 7};
std::vector<int32_t> pull_result(64 * 2);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[0], &pull_result[8]);
EXPECT_EQ(actual, (std::vector<int32_t>{0, 0, 1, 2, 0, 0, 3, 4}));
}
TEST_F(DataCacheEngineSTest, PullDataCache_D2D_C2B_with_remaider) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {1, 7};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
auto dst_cache_desc = src_cache_desc;
dst_cache_desc.shape = {64, 2};
dst_cache_desc.placement = 1;
dst_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
pull_cache_param.decoder_blocks = {1, 3, 5, 7};
std::vector<int32_t> pull_result(64 * 2);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[0], &pull_result[8]);
EXPECT_EQ(actual, (std::vector<int32_t>{0, 0, 1, 2, 0, 0, 3, 4}));
}
TEST_F(DataCacheEngineSTest, CopyCache_C2C_Big) {
llm::CommEntityManager comm_entity_manager;
llm::CommMemManager comm_mem_manager;
llm::DataCacheEngine cache_engine;
cache_engine.SetCommEntityManager(&comm_entity_manager);
cache_engine.SetCommMemManager(&comm_mem_manager);
llm::CacheManager cache_manager;
cache_engine.SetCacheManager(&cache_manager);
std::map<ge::AscendString, ge::AscendString> options;
options[llm::LLM_OPTION_MEM_POOL_CONFIG] = "{\"memory_size\": 10000000}";
EXPECT_EQ(cache_engine.Initialize(options), ge::SUCCESS);
int64_t big_dim = static_cast<int64_t>(1024 * 1024 + 128);
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 1;
src_cache_desc.placement = 1;
src_cache_desc.shape = {1, big_dim};
src_cache_desc.data_type = ge::DT_INT32;
llm::CacheDesc dst_cache_desc = src_cache_desc;
llm::Cache src_cache;
llm::Cache dst_cache;
EXPECT_EQ(cache_engine.Allocate(src_cache_desc, {}, src_cache), ge::SUCCESS);
EXPECT_EQ(cache_engine.Allocate(dst_cache_desc, {}, dst_cache), ge::SUCCESS);
llm::CopyCacheParam copy_cache_param{};
copy_cache_param.src_cache_id = src_cache.cache_id;
copy_cache_param.dst_cache_id = dst_cache.cache_id;
auto src_tensor_base = reinterpret_cast<int32_t *>(src_cache.per_device_tensor_addrs[0][0]);
auto dst_tensor_base = reinterpret_cast<int32_t *>(dst_cache.per_device_tensor_addrs[0][0]);
std::iota(src_tensor_base, src_tensor_base + big_dim, 0);
EXPECT_EQ(cache_engine.CopyCache(copy_cache_param), ge::SUCCESS);
EXPECT_EQ(memcmp(src_tensor_base, dst_tensor_base, 4 * big_dim), 0);
cache_manager.Finalize();
cache_engine.Finalize();
}
TEST_F(DataCacheEngineSTest, CopyCache_B2B) {
llm::CommEntityManager comm_entity_manager;
llm::CommMemManager comm_mem_manager;
llm::DataCacheEngine cache_engine;
cache_engine.SetCommEntityManager(&comm_entity_manager);
cache_engine.SetCommMemManager(&comm_mem_manager);
llm::CacheManager cache_manager;
cache_engine.SetCacheManager(&cache_manager);
std::map<ge::AscendString, ge::AscendString> options;
options[llm::LLM_OPTION_MEM_POOL_CONFIG] = "{\"memory_size\": 10000000}";
EXPECT_EQ(cache_engine.Initialize(options), ge::SUCCESS);
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 4;
src_cache_desc.placement = 1;
src_cache_desc.shape = {128, 4};
src_cache_desc.data_type = ge::DT_INT32;
src_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.shape = {64, 4};
llm::Cache src_cache;
llm::Cache dst_cache;
llm::CacheKey src_cache_key{};
src_cache_key.is_allocate_blocks = true;
llm::CacheKey dst_cache_key = src_cache_key;
dst_cache_key.req_id = 1;
EXPECT_EQ(cache_engine.Allocate(src_cache_desc, {src_cache_key}, src_cache), ge::SUCCESS);
EXPECT_EQ(cache_engine.Allocate(dst_cache_desc, {dst_cache_key}, dst_cache), ge::SUCCESS);
llm::CopyCacheParam copy_cache_param{};
copy_cache_param.src_cache_id = src_cache.cache_id;
copy_cache_param.dst_cache_id = dst_cache.cache_id;
copy_cache_param.copy_block_infos.emplace_back(2, 1);
copy_cache_param.copy_block_infos.emplace_back(3, 2);
copy_cache_param.copy_block_infos.emplace_back(4, 0);
copy_cache_param.copy_block_infos.emplace_back(0, 3);
auto src_tensor_base = reinterpret_cast<int32_t *>(src_cache.per_device_tensor_addrs[0][0]);
auto dst_tensor_base = reinterpret_cast<int32_t *>(dst_cache.per_device_tensor_addrs[0][0]);
std::iota(src_tensor_base, src_tensor_base + 4 * 128, 0);
EXPECT_EQ(cache_engine.CopyCache(copy_cache_param), ge::SUCCESS);
std::vector<int32_t> expected{
16, 17, 18, 19,
8, 9, 10, 11,
12, 13, 14, 15,
0, 1, 2, 3
};
std::vector<int32_t> dst_value(dst_tensor_base, dst_tensor_base + 4 * 4);
EXPECT_EQ(dst_value, expected);
cache_manager.Finalize();
cache_engine.Finalize();
}
TEST_F(DataCacheEngineSTest, AllocateOutOfMemory) {
llm::HcclAdapter::GetInstance().Finalize();
llm::LLMDataDistV2 llm_data_dist(1);
std::map<ge::AscendString, ge::AscendString> default_options{
{llm::LLM_OPTION_ROLE, llm::kDecoder},
{ge::OPTION_EXEC_DEVICE_ID, "0"},
{llm::LLM_OPTION_SYNC_KV_CACHE_WAIT_TIME, "600000"},
{llm::LLM_OPTION_MEM_POOL_CONFIG, "{\"memory_size\": 65536}"}
};
EXPECT_EQ(llm_data_dist.LLMDataDistInitialize(default_options), ge::SUCCESS);
llm::Cache cache;
llm::CacheDesc cache_desc{};
cache_desc.num_tensors = 2;
cache_desc.shape = {4, 128};
cache_desc.data_type = ge::DT_INT32;
cache_desc.placement = 1;
std::vector<llm::CacheKey> cache_keys;
EXPECT_EQ(llm_data_dist.AllocateCache(cache_desc, cache, cache_keys), ge::LLM_OUT_OF_MEMORY);
llm_data_dist.LLMDataDistFinalize();
}
TEST_F(DataCacheEngineSTest, TransferDataCache_D2D_C2C) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {1, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.shape = {4, 128};
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = 128 * 4;
pull_cache_param.batch_index = 1;
std::vector<int32_t> pull_result(4 * 128);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
const auto &src_cache = data_cache_engine_runner.GetSrcCache();
const auto &dst_cache = data_cache_engine_runner.GetDstCache();
llm::TransferCacheConfig transfer_cache_config{};
transfer_cache_config.src_cache_id = src_cache.cache_id;
transfer_cache_config.layer_index = 0;
uint64_t layer_index = 2U;
transfer_cache_config.dst_addrs =
std::vector<uintptr_t>(dst_cache.per_device_tensor_addrs[0].begin() + layer_index * 2,
dst_cache.per_device_tensor_addrs[0].begin() + layer_index * 2 + 2);
llm::TransferBlockConfig transfer_block_config{};
data_cache_engine_runner.TransferDataCache(transfer_cache_config, transfer_block_config, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[0], &pull_result[4]);
EXPECT_EQ(actual, (std::vector<int32_t>{1, 2, 3, 4}));
}
TEST_F(DataCacheEngineSTest, TransferDataCache_D2D_B2B) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {128, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
src_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.placement = 1;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
pull_cache_param.prompt_blocks = {0, 1, 4, 5, 6};
pull_cache_param.decoder_blocks = {1, 2, 4, 6, 9};
std::vector<int32_t> pull_result(128 * 128);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
const auto &src_cache = data_cache_engine_runner.GetSrcCache();
const auto &dst_cache = data_cache_engine_runner.GetDstCache();
llm::TransferCacheConfig transfer_cache_config{};
transfer_cache_config.src_cache_id = src_cache.cache_id;
transfer_cache_config.layer_index = 0;
uint64_t layer_index = 2U;
transfer_cache_config.dst_addrs =
std::vector<uintptr_t>(dst_cache.per_device_tensor_addrs[0].begin() + layer_index * 2,
dst_cache.per_device_tensor_addrs[0].begin() + layer_index * 2 + 2);
llm::TransferBlockConfig transfer_block_config{};
transfer_block_config.src_blocks = {0, 1, 4, 5, 6};
transfer_block_config.dst_blocks = {1, 2, 4, 6, 9};
data_cache_engine_runner.TransferDataCache(transfer_cache_config, transfer_block_config, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[128], &pull_result[128 + 4]);
EXPECT_EQ(actual, (std::vector<int32_t>{1, 2, 3, 4}));
}
TEST_F(DataCacheEngineSTest, TransferDataCache_D2D_C2B) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {1, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
auto dst_cache_desc = src_cache_desc;
dst_cache_desc.shape = {64, 2};
dst_cache_desc.placement = 1;
dst_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
pull_cache_param.decoder_blocks = {1, 3, 5, 7};
std::vector<int32_t> pull_result(64 * 2);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
const auto &src_cache = data_cache_engine_runner.GetSrcCache();
const auto &dst_cache = data_cache_engine_runner.GetDstCache();
llm::TransferCacheConfig transfer_cache_config{};
transfer_cache_config.src_cache_id = src_cache.cache_id;
transfer_cache_config.layer_index = 0;
uint64_t layer_index = 1U;
transfer_cache_config.dst_addrs =
std::vector<uintptr_t>(dst_cache.per_device_tensor_addrs[0].begin() + layer_index * 2,
dst_cache.per_device_tensor_addrs[0].begin() + layer_index * 2 + 2);
llm::TransferBlockConfig transfer_block_config{};
transfer_block_config.block_mem_size = 8;
transfer_block_config.dst_blocks = {1, 3, 5, 7};
data_cache_engine_runner.TransferDataCache(transfer_cache_config, transfer_block_config, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[0], &pull_result[8]);
EXPECT_EQ(actual, (std::vector<int32_t>{0, 0, 1, 2, 0, 0, 3, 4}));
}
TEST_F(DataCacheEngineSTest, TransferDataCache_D2D_C2B_remainder) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {1, 7};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
auto dst_cache_desc = src_cache_desc;
dst_cache_desc.shape = {64, 2};
dst_cache_desc.placement = 1;
dst_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
pull_cache_param.decoder_blocks = {1, 3, 5, 7};
std::vector<int32_t> pull_result(64 * 2);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
const auto &src_cache = data_cache_engine_runner.GetSrcCache();
const auto &dst_cache = data_cache_engine_runner.GetDstCache();
llm::TransferCacheConfig transfer_cache_config{};
transfer_cache_config.src_cache_id = src_cache.cache_id;
transfer_cache_config.layer_index = 0;
uint64_t layer_index = 1U;
transfer_cache_config.dst_addrs =
std::vector<uintptr_t>(dst_cache.per_device_tensor_addrs[0].begin() + layer_index * 2,
dst_cache.per_device_tensor_addrs[0].begin() + layer_index * 2 + 2);
llm::TransferBlockConfig transfer_block_config{};
transfer_block_config.block_mem_size = 8;
transfer_block_config.dst_blocks = {1, 3, 5, 7};
data_cache_engine_runner.TransferDataCache(transfer_cache_config, transfer_block_config, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[0], &pull_result[8]);
EXPECT_EQ(actual, (std::vector<int32_t>{0, 0, 1, 2, 0, 0, 3, 4}));
}
TEST_F(DataCacheEngineSTest, TransferDataCache_D2H_C2C) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {1, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.shape = {4, 128};
dst_cache_desc.placement = 0;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = 128 * 4;
pull_cache_param.batch_index = 1;
std::vector<int32_t> pull_result(4 * 128);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param, true);
const auto &src_cache = data_cache_engine_runner.GetSrcCache();
const auto &dst_cache = data_cache_engine_runner.GetDstCache();
llm::TransferCacheConfig transfer_cache_config{};
transfer_cache_config.src_cache_id = src_cache.cache_id;
transfer_cache_config.layer_index = 0;
uint64_t layer_index = 2U;
transfer_cache_config.dst_addrs =
std::vector<uintptr_t>(dst_cache.per_device_tensor_addrs[0].begin() + layer_index * 2,
dst_cache.per_device_tensor_addrs[0].begin() + layer_index * 2 + 2);
llm::TransferBlockConfig transfer_block_config{};
data_cache_engine_runner.TransferDataCache(transfer_cache_config, transfer_block_config, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[0], &pull_result[4]);
EXPECT_EQ(actual, (std::vector<int32_t>{1, 2, 3, 4}));
}
TEST_F(DataCacheEngineSTest, TransferDataCache_D2H_B2B) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {128, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
src_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.placement = 0;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
pull_cache_param.prompt_blocks = {0,1,4,5,6};
pull_cache_param.decoder_blocks = {1,2,4,6,9};
std::vector<int32_t> pull_result(128 * 128);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param, true);
const auto &src_cache = data_cache_engine_runner.GetSrcCache();
const auto &dst_cache = data_cache_engine_runner.GetDstCache();
llm::TransferCacheConfig transfer_cache_config{};
transfer_cache_config.src_cache_id = src_cache.cache_id;
transfer_cache_config.layer_index = 0;
uint64_t layer_index = 2U;
transfer_cache_config.dst_addrs =
std::vector<uintptr_t>(dst_cache.per_device_tensor_addrs[0].begin() + layer_index * 2,
dst_cache.per_device_tensor_addrs[0].begin() + layer_index * 2 + 2);
llm::TransferBlockConfig transfer_block_config{};
transfer_block_config.src_blocks = {0, 1, 4, 5, 6};
transfer_block_config.dst_blocks = {1, 2, 4, 6, 9};
data_cache_engine_runner.TransferDataCache(transfer_cache_config, transfer_block_config, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[128], &pull_result[128 + 4]);
EXPECT_EQ(actual, (std::vector<int32_t>{1, 2, 3, 4}));
}
TEST_F(DataCacheEngineSTest, TransferDataCache_D2H_C2B) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {1, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
auto dst_cache_desc = src_cache_desc;
dst_cache_desc.shape = {64, 2};
dst_cache_desc.placement = 0;
dst_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
pull_cache_param.decoder_blocks = {1, 3, 5, 7};
std::vector<int32_t> pull_result(4 * 32);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
const auto &src_cache = data_cache_engine_runner.GetSrcCache();
const auto &dst_cache = data_cache_engine_runner.GetDstCache();
llm::TransferCacheConfig transfer_cache_config{};
transfer_cache_config.src_cache_id = src_cache.cache_id;
transfer_cache_config.layer_index = 0;
uint64_t layer_index = 1U;
transfer_cache_config.dst_addrs =
std::vector<uintptr_t>(dst_cache.per_device_tensor_addrs[0].begin() + layer_index * 2,
dst_cache.per_device_tensor_addrs[0].begin() + layer_index * 2 + 2);
llm::TransferBlockConfig transfer_block_config{};
transfer_block_config.block_mem_size = 8;
transfer_block_config.dst_blocks = {1, 3, 5, 7};
data_cache_engine_runner.TransferDataCache(transfer_cache_config, transfer_block_config, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[0], &pull_result[8]);
EXPECT_EQ(actual, (std::vector<int32_t>{0, 0, 1, 2, 0, 0, 3, 4}));
}
TEST_F(DataCacheEngineSTest, PullDataCache_D2D_C2C_with_layer_range) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {1, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.shape = {4, 128};
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = 128 * 4;
pull_cache_param.batch_index = 1;
pull_cache_param.src_tensor_indices = {0, 1, 2, 3};
pull_cache_param.dst_tensor_indices = {0, 1, 2, 3};
std::vector<int32_t> pull_result(4 * 128);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[128], &pull_result[128 + 4]);
EXPECT_EQ(actual, (std::vector<int32_t>{1, 2, 3, 4}));
}
TEST_F(DataCacheEngineSTest, PullDataCache_D2D_B2B_with_layer_range) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {128, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
src_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.placement = 1;
dst_cache_desc.num_tensors = 4;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
pull_cache_param.prompt_blocks = {0,1,4,5,6};
pull_cache_param.decoder_blocks = {1,2,4,6,9};
pull_cache_param.src_tensor_indices = {0, 1, 2, 3};
std::vector<int32_t> pull_result(128 * 128);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[128], &pull_result[128 + 4]);
EXPECT_EQ(actual, (std::vector<int32_t>{1, 2, 3, 4}));
}
TEST_F(DataCacheEngineSTest, PullDataCache_D2D_C2B_with_layer_range) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 4;
src_cache_desc.shape = {1, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
auto dst_cache_desc = src_cache_desc;
dst_cache_desc.shape = {64, 2};
dst_cache_desc.placement = 1;
dst_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
dst_cache_desc.num_tensors = 8;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
pull_cache_param.decoder_blocks = {1, 3, 5, 7};
pull_cache_param.dst_tensor_indices = {0, 1, 2, 3};
std::vector<int32_t> pull_result(64 * 2);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[0], &pull_result[8]);
EXPECT_EQ(actual, (std::vector<int32_t>{0, 0, 1, 2, 0, 0, 3, 4}));
}
TEST_F(DataCacheEngineSTest, PullDataCache_H2D_B2B_with_layer_range) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 4;
src_cache_desc.shape = {128, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 0;
src_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.placement = 1;
dst_cache_desc.num_tensors = 8;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
pull_cache_param.dst_tensor_indices = {0, 1, 2, 3};
for (int i = 0; i < 64; ++i) {
pull_cache_param.prompt_blocks.push_back(i);
pull_cache_param.decoder_blocks.push_back(i);
}
std::reverse(pull_cache_param.decoder_blocks.begin(), pull_cache_param.decoder_blocks.end());
std::vector<int32_t> pull_result(128 * 128);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[128 * 63], &pull_result[128 * 63 + 4]);
EXPECT_EQ(actual, (std::vector<int32_t>{1, 2, 3, 4}));
}
TEST_F(DataCacheEngineSTest, PullDataCache_H2D_C2C_with_layer_range) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {2, 256 * 1024 * 3};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 0;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.shape = {2, 256 * 1024 * 4};
dst_cache_desc.placement = 1;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = 1024 * 1024 * 3;
pull_cache_param.batch_index = 1;
pull_cache_param.src_tensor_indices = {0, 1, 2, 3};
pull_cache_param.dst_tensor_indices = {0, 1, 2, 3};
std::vector<int32_t> pull_result(2 * 256 * 1024 * 4);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[256 * 1024 * 4], &pull_result[256 * 1024 * 4 + 4]);
EXPECT_EQ(actual, (std::vector<int32_t>{1, 2, 3, 4}));
}
TEST_F(DataCacheEngineSTest, PullDataCache_D2H_C2B_with_layer_range) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {1, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
auto dst_cache_desc = src_cache_desc;
dst_cache_desc.shape = {64, 2};
dst_cache_desc.placement = 0;
dst_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
dst_cache_desc.num_tensors = 4;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
pull_cache_param.decoder_blocks = {1, 3, 5, 7};
pull_cache_param.src_tensor_indices = {0, 1, 2, 3};
std::vector<int32_t> pull_result(4 * 32);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[0], &pull_result[8]);
EXPECT_EQ(actual, (std::vector<int32_t>{0, 0, 1, 2, 0, 0, 3, 4}));
}
TEST_F(DataCacheEngineSTest, PullDataCache_D2H_C2C_with_layer_range) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 8;
src_cache_desc.shape = {4, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
llm::CacheDesc dst_cache_desc = src_cache_desc;
src_cache_desc.shape = {2, 128};
dst_cache_desc.placement = 0;
dst_cache_desc.num_tensors = 8;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = 128 * 4;
pull_cache_param.batch_index = 1;
pull_cache_param.src_tensor_indices = {0, 1, 2, 3};
pull_cache_param.dst_tensor_indices = {0, 1, 2, 3};
std::vector<int32_t> pull_result(2 * 128);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[128], &pull_result[128 + 4]);
EXPECT_EQ(actual, (std::vector<int32_t>{1, 2, 3, 4}));
}
TEST_F(DataCacheEngineSTest, PullDataCache_D2H_B2B_with_layer_range) {
llm::CacheDesc src_cache_desc{};
src_cache_desc.num_tensors = 4;
src_cache_desc.shape = {128, 128};
src_cache_desc.data_type = DT_INT32;
src_cache_desc.placement = 1;
src_cache_desc.cache_mem_type = llm::CacheMemType::BLOCKS;
llm::CacheDesc dst_cache_desc = src_cache_desc;
dst_cache_desc.placement = 0;
dst_cache_desc.num_tensors = 8;
llm::PullCacheParam pull_cache_param{};
pull_cache_param.size = -1;
pull_cache_param.prompt_blocks = {0,1,4,5,6};
pull_cache_param.decoder_blocks = {1,2,4,6,9};
pull_cache_param.dst_tensor_indices = {0, 1, 2, 3};
std::vector<int32_t> pull_result(128 * 128);
DataCacheEngineRunner data_cache_engine_runner;
data_cache_engine_runner.LlmDatadistInitAndLink(src_cache_desc, dst_cache_desc, pull_cache_param);
data_cache_engine_runner.PullDataCache(pull_cache_param, pull_result);
data_cache_engine_runner.ReleaseResource();
std::vector<int32_t> actual(&pull_result[128], &pull_result[128 + 4]);
EXPECT_EQ(actual, (std::vector<int32_t>{1, 2, 3, 4}));
}
TEST_F(DataCacheEngineSTest, SwapBlocks) {
std::map<ge::AscendString, ge::AscendString> default_options{
{llm::LLM_OPTION_ROLE, llm::kDecoder},
{OPTION_EXEC_DEVICE_ID, "0"},
{llm::LLM_OPTION_MEM_POOL_CONFIG, "{\"memory_size\": 102428800}"}};
llm::LLMDataDistV2 llm_data_dist(1);
EXPECT_EQ(llm_data_dist.LLMDataDistInitialize(default_options), ge::SUCCESS);
llm::CacheDesc kv_desc{};
kv_desc.num_tensors = 10;
kv_desc.data_type = ge::DT_FLOAT16;
kv_desc.shape = {10, 128};
llm::CacheKey cache_key{};
cache_key.prompt_cluster_id = 0;
cache_key.req_id = 1;
cache_key.model_id = 0;
llm::Cache cached_tensors;
EXPECT_EQ(llm_data_dist.AllocateCache(kv_desc, cached_tensors, {cache_key}), ge::SUCCESS);
EXPECT_EQ(cached_tensors.cache_id, 1);
EXPECT_EQ(cached_tensors.per_device_tensor_addrs.size(), 1U);
llm::Cache cached_tensors_2;
EXPECT_EQ(llm_data_dist.AllocateCache(kv_desc, cached_tensors_2), ge::SUCCESS);
EXPECT_EQ(cached_tensors_2.cache_id, 2);
std::vector<std::pair<int64_t, int64_t>> block_mapping{{3,4}, {0,0}, {1,1}, {2,2}, {5,6}, {6,7}, {9,9}};
EXPECT_EQ(llm_data_dist.SwapBlocks(cached_tensors, cached_tensors_2, 128, 0, block_mapping), ge::SUCCESS);
EXPECT_EQ(llm_data_dist.SwapBlocks(cached_tensors_2, cached_tensors, 128, 1, block_mapping), ge::SUCCESS);
EXPECT_EQ(llm_data_dist.DeallocateCache(cached_tensors.cache_id), ge::SUCCESS);
EXPECT_EQ(llm_data_dist.DeallocateCache(cached_tensors_2.cache_id), ge::SUCCESS);
}
}