import time
import unittest
import os
from typing import List, Optional
import numpy as np
from llm_datadist_v1 import *
from llm_datadist_v1.llm_datadist import _shutdown_handler
from llm_datadist_v1.llm_utils import TransferCacheJob, TransferCacheParameters
import json
from llm_datadist_v1.llm_types import KvCache, BlocksCacheKey, Placement
from llm_datadist_v1.config import EngineConfig
_INVALID_ID = 2 ** 64 - 1
_TEST_BASE_DIR = '../tests/dflow/llm_datadist/st/testcase/llm_datadist'
class LayerSynchronizerImpl(LayerSynchronizer):
def __init__(self, ret=True):
self._ret = ret
def synchronize_layer(self, layer_index: int, timeout_in_millis: Optional[int]) -> bool:
if layer_index == 0:
time.sleep(0.3)
return self._ret
class MockTransferCacheJob(TransferCacheJob):
def __init__(self, params: TransferCacheParameters) -> None:
super().__init__(params, LayerSynchronizerImpl(True), None)
def transfer_layer(self, src_layer_index: int, dst_layer_idx, transfer_config: TransferConfig) -> LLMStatusCode:
return LLMStatusCode.LLM_WAIT_PROCESS_TIMEOUT
class LlmEngineV2St(unittest.TestCase):
def setUp(self) -> None:
os.environ['ASCEND_GLOBAL_LOG_LEVEL'] = '1'
print("Begin ", self._testMethodName)
def tearDown(self) -> None:
os.environ.pop('RESOURCE_CONFIG_PATH', None)
_shutdown_handler()
print("End ", self._testMethodName)
@staticmethod
def _engine_options(is_prompt: bool, cluster_id: int = 0, rank_id: int = -1, resource_path: str = ''):
cluster_info = {
'cluster_id': cluster_id, 'logic_device_id': ['0:0:0:0', '0:0:1:0', '0:0:2:0', '0:0:3:0'],
}
if is_prompt:
cluster_info['listen_ip_info'] = [{'ip': 0, 'port': 26000}, {'ip': 1, 'port': 26000},
{'ip': 2, 'port': 26000},
{'ip': 3, 'port': 26000}]
engine_options = {'llm.ClusterInfo': json.dumps(cluster_info)}
if rank_id != -1:
engine_options['ge.exec.rankId'] = str(rank_id)
if resource_path != '':
engine_options['ge.resourceConfigPath'] = resource_path
return engine_options
def test_prompt_cache_ops(self):
cluster_id = 0
engine = LLMDataDist(LLMRole.PROMPT, cluster_id)
engine.init(LlmEngineV2St._engine_options(True, cluster_id))
kv_cache_manager = engine.kv_cache_manager
cache_desc = CacheDesc(80, [2, 8], DataType.DT_FLOAT16)
cache_key = CacheKey(0, 0, 1)
kv_cache = kv_cache_manager.allocate_cache(cache_desc, [cache_key])
with self.assertRaises(LLMException):
tensors = kv_cache_manager.get_cache_tensors(kv_cache, -1)
tensors = kv_cache_manager.get_cache_tensors(kv_cache, 0)
print(f'kv_cache: {kv_cache}')
print(f'tensors: {Tensor(tensors[0]).numpy()}')
kv_cache_manager.deallocate_cache(kv_cache)
kv_cache_manager.remove_cache_key(cache_key)
engine.finalize()
def test_decoder_cache_ops(self):
cluster_id = 0
llm_engine = LLMDataDist(LLMRole.DECODER, cluster_id)
llm_engine.init(LlmEngineV2St._engine_options(False, cluster_id))
kv_cache_manager = llm_engine.kv_cache_manager
cache_desc = CacheDesc(80, [2, 8], DataType.DT_FLOAT16)
cache_key = CacheKey(0, 1, 1)
kv_cache_manager._role = LLMRole.PROMPT
kv_cache = kv_cache_manager.allocate_cache(cache_desc, [cache_key])
kv_cache_manager._role = LLMRole.DECODER
dst_kv_cache = kv_cache_manager.allocate_cache(cache_desc)
print(f'kv_cache: {kv_cache}')
with self.assertRaises(LLMException):
kv_cache_manager.pull_cache(cache_key, dst_kv_cache, 0, -2)
kv_cache_manager.pull_cache(cache_key, dst_kv_cache, src_cache_offset=0, dst_cache_offset=0)
cache_key_id = CacheKeyByIdAndIndex(cluster_id, kv_cache.cache_id, 0)
kv_cache_manager.pull_cache(cache_key_id, dst_kv_cache, 0)
kv_cache_manager.copy_cache(dst_kv_cache, kv_cache)
with self.assertRaises(LLMException):
kv_cache_manager.copy_cache(dst_kv_cache, kv_cache, 0, 0, 0, -2)
with self.assertRaises(LLMException):
kv_cache_manager.copy_cache(dst_kv_cache, kv_cache, 0, 0, 0, 0)
kv_cache_manager.deallocate_cache(kv_cache)
with self.assertRaises(LLMException) as ex:
kv_cache_manager.pull_cache(cache_key, kv_cache)
self.assertEqual(ex.exception.status_code, LLMStatusCode.LLM_KV_CACHE_NOT_EXIST)
kv_cache_manager.deallocate_cache(dst_kv_cache)
llm_engine.finalize()
def test_cluster_ops(self):
cluster_id = 0
llm_engine = LLMDataDist(LLMRole.DECODER, cluster_id)
cluster_info = {
"cluster_id": 1, "logic_device_id": ["0:0:0:0"]
}
engine_options = {'llm.ClusterInfo': json.dumps(cluster_info)}
llm_engine.init(engine_options)
cluster = LLMClusterInfo()
cluster.remote_cluster_id = 1
cluster.append_local_ip_info(1, 26000)
cluster.append_remote_ip_info(1, 26000)
ret, rets = llm_engine.link_clusters([cluster], 5000)
self.assertEqual(ret, LLMStatusCode.LLM_SUCCESS)
ret, rets = llm_engine.unlink_clusters([cluster], 5000)
self.assertEqual(ret, LLMStatusCode.LLM_SUCCESS)
llm_engine.check_link_status(1)
def test_init_and_finalize(self):
cluster_id = 0
decoder_engine = LLMDataDist(LLMRole.DECODER, cluster_id)
decoder_engine.finalize()
decoder_engine.init(self._engine_options(False, cluster_id, 1))
decoder_engine.init(self._engine_options(False, cluster_id, 1))
decoder_engine.finalize()
decoder_engine.finalize()
decoder_engine_2 = LLMDataDist(LLMRole.DECODER, cluster_id)
decoder_engine_2.init(self._engine_options(False, cluster_id, 1))
def test_init_witch_device_id_and_rank_id(self):
llm_config = LLMConfig()
llm_config.device_id = 1
llm_config.listen_ip_info = "127.0.0.1:26000"
llm_config.ge_options = {
"ge.exec.rankId": "1"
}
engine_options = llm_config.generate_options()
print("engine_options:", engine_options)
prompt_engine = LLMDataDist(LLMRole.PROMPT, 0)
prompt_engine.init(engine_options)
EngineConfig.from_engine_options(True, engine_options)
def test_simple_option(self):
cluster_id = 0
prompt_engine = LLMDataDist(LLMRole.PROMPT, cluster_id)
llm_config = LLMConfig()
llm_config.device_id = 1
llm_config.listen_ip_info = "127.0.0.1:26000"
llm_config.deploy_res_path = "./"
llm_config.ge_options = {
"ge.flowGraphMemMaxSize": "10000000"
}
engine_options = llm_config.generate_options()
print("engine_options:", engine_options)
prompt_engine.init(engine_options)
def test_shutdown_failed(self):
cluster_id = 0
decoder_engine = LLMDataDist(LLMRole.DECODER, cluster_id)
decoder_engine.init(self._engine_options(False, cluster_id, 1))
decoder_engine._llm_datadist.finalize()
_shutdown_handler()
decoder_engine._is_initialized = False
LLMDataDist.llm_engine_instance = None
def test_switch_role_failed_option_not_set(self):
engine = LLMDataDist(LLMRole.MIX, 0)
options = {'ge.exec.deviceId': '0'}
engine.init(options)
try:
engine.switch_role(LLMRole.DECODER)
except LLMException as e:
self.assertEqual(e.status_code, LLMStatusCode.LLM_FEATURE_NOT_ENABLED)
def test_switch_role_failed_identical_role(self):
engine = LLMDataDist(LLMRole.MIX, 0)
options = {'llm.EnableSwitchRole': '1', 'ge.exec.deviceId': '0'}
engine.init(options)
with self.assertRaises(LLMException):
engine.switch_role(LLMRole.MIX)
def test_switch_role(self):
engine = LLMDataDist(LLMRole.MIX, 0)
llm_config = LLMConfig()
llm_config.device_id = 0
llm_config.enable_switch_role = True
options = llm_config.generate_options()
os.environ['RESOURCE_CONFIG_PATH'] = _TEST_BASE_DIR + '/json_file/numa_config.json'
engine.init(options)
engine.switch_role(LLMRole.DECODER)
switch_options = {
'llm.listenIpInfo': '127.0.0.1:1111',
}
engine.switch_role(LLMRole.PROMPT, switch_options)
@staticmethod
def _allocate_npu_cache(kv_cache_manager, block_size, num_block, num_tensors):
npu_cache_desc = CacheDesc(num_tensors=num_tensors, shape=[num_block, block_size],
data_type=DataType.DT_FLOAT16)
npu_cache_key = BlocksCacheKey(0, 0)
cache = kv_cache_manager.allocate_blocks_cache(npu_cache_desc, npu_cache_key)
return cache, npu_cache_key
@staticmethod
def _allocate_cpu_cache(kv_cache_manager, block_size, num_block, num_tensors):
cpu_cache_desc = CacheDesc(num_tensors=num_tensors, shape=[num_block, block_size],
data_type=DataType.DT_FLOAT16)
cpu_cache_key = BlocksCacheKey(1, 1)
cache = kv_cache_manager.allocate_blocks_cache(cpu_cache_desc, cpu_cache_key)
cpu_cache_desc = CacheDesc(num_tensors=num_tensors, shape=[num_block, block_size],
data_type=DataType.DT_FLOAT16, placement=Placement.HOST)
return KvCache.create_cpu_cache(cpu_cache_desc, cache.per_device_tensor_addrs[0]), cache
def test_swap_blocks(self):
cluster_id = 0
llm_engine = LLMDataDist(LLMRole.PROMPT, cluster_id)
llm_config = LLMConfig()
llm_config.device_id = 0
llm_config.listen_ip_info = "0.0.0.0:26000"
llm_config.ge_options = {"ge.flowGraphMemMaxSize": "10240"}
init_options = llm_config.generate_options()
llm_engine.init(init_options)
kv_cache_manager = llm_engine.kv_cache_manager
npu_cache, npu_cache_key = self._allocate_npu_cache(kv_cache_manager, 64*1024, 10, 10)
cpu_cache, tmp_cache = self._allocate_cpu_cache(kv_cache_manager, 64*1024, 20, 10)
src_to_dst = {3:4, 0:0, 1:1, 2:2, 5:6, 6:7, 7:8, 9:9}
kv_cache_manager.swap_blocks(npu_cache, cpu_cache, src_to_dst)
kv_cache_manager.swap_blocks(cpu_cache, npu_cache, src_to_dst)
def test_create_cpu_cache_failed(self):
cluster_id = 0
llm_engine = LLMDataDist(LLMRole.PROMPT, cluster_id)
llm_config = LLMConfig()
llm_config.device_id = 0
llm_config.listen_ip_info = "0.0.0.0:26000"
llm_config.ge_options = {"ge.flowGraphMemMaxSize": "10240"}
init_options = llm_config.generate_options()
llm_engine.init(init_options)
has_err = False
try:
cpu_cache_desc = CacheDesc(num_tensors=1, shape=[1, 1],
data_type=DataType.DT_FLOAT16, placement=Placement.HOST)
KvCache.create_cpu_cache(cpu_cache_desc, [[1], 1])
except LLMException as ex:
self.assertEqual("should be consistent" in ex.__str__(), True)
has_err = True
self.assertEqual(has_err, True)
def test_copy_blocks_validate(self):
cluster_id = 0
llm_datadist = LLMDataDist(LLMRole.DECODER, cluster_id)
llm_datadist.init(self._engine_options(False, cluster_id))
kv_cache_manager = llm_datadist.kv_cache_manager
cache_desc = CacheDesc(1, [2, 8], DataType.DT_FLOAT16)
kv_cache = kv_cache_manager.allocate_blocks_cache(cache_desc)
try:
kv_cache_manager.copy_blocks(kv_cache, {0: "1"})
except Exception as ex:
self.assertEqual("only support" in ex.__str__(), True)
print("copy_blocks validate2")
try:
kv_cache_manager.copy_blocks(kv_cache, {0: ["1"]})
except Exception as ex:
self.assertEqual("inner type only support" in ex.__str__(), True)
llm_datadist.finalize()
def test_transfer_cache(self):
cluster_id = 0
engine = LLMDataDist(LLMRole.PROMPT, cluster_id)
engine.init(LlmEngineV2St._engine_options(True, cluster_id))
kv_cache_manager = engine.kv_cache_manager
cache_desc = CacheDesc(10, [2, 8], DataType.DT_FLOAT16)
cache_key = CacheKey(0, 0, 1)
kv_cache = kv_cache_manager.allocate_cache(cache_desc, [cache_key])
dst_addrs_1 = [10000000, 20000000, 30000000, 40000000, 50000000, 60000000]
dst_addrs_2 = [10000000, 20000000, 30000000, 40000000]
transfer_config_1 = TransferConfig(1, dst_addrs_1, range(0, 3))
print(transfer_config_1)
transfer_config_2 = TransferConfig(2, dst_addrs_2, range(2, 4))
transfer_configs = (transfer_config_1, transfer_config_2)
cache_task = kv_cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(True), transfer_configs)
ret = cache_task.synchronize(0)
self.assertEqual(ret, LLMStatusCode.LLM_WAIT_PROCESS_TIMEOUT)
rets = cache_task.get_results(0)
self.assertEqual(rets, [LLMStatusCode.LLM_WAIT_PROCESS_TIMEOUT] * 2)
ret = cache_task.synchronize(1000)
rets = cache_task.get_results(1000)
self.assertEqual(ret, LLMStatusCode.LLM_SUCCESS)
self.assertEqual(rets[0], LLMStatusCode.LLM_SUCCESS)
self.assertEqual(rets[1], LLMStatusCode.LLM_SUCCESS)
transfer_config_3 = TransferConfig(2, dst_addrs_1 + dst_addrs_2)
cache_task = kv_cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(True), [transfer_config_3])
ret = cache_task.synchronize()
self.assertEqual(ret, LLMStatusCode.LLM_SUCCESS)
kv_cache = kv_cache_manager.allocate_blocks_cache(cache_desc)
dst_addrs_1 = [10000000, 20000000, 30000000, 40000000, 50000000, 60000000]
transfer_config_1 = TransferConfig(1, dst_addrs_1, range(0, 3))
transfer_configs = [transfer_config_1]
block_indices = [0, 1]
cache_task = kv_cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(True), transfer_configs,
block_indices, block_indices, dst_block_memory_size=32)
ret = cache_task.synchronize()
rets = cache_task.get_results()
self.assertEqual(ret, LLMStatusCode.LLM_SUCCESS)
self.assertEqual(rets[0], LLMStatusCode.LLM_SUCCESS)
transfer_config_1 = TransferConfig(1, dst_addrs_1, range(0, 3))
transfer_config_2 = TransferConfig(2, dst_addrs_2, range(2, 4))
transfer_configs = (transfer_config_1, transfer_config_2)
cache_task = kv_cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(False), transfer_configs)
ret = cache_task.synchronize()
rets = cache_task.get_results()
self.assertNotEqual(ret, LLMStatusCode.LLM_SUCCESS)
self.assertNotEqual(rets[0], LLMStatusCode.LLM_SUCCESS)
self.assertEqual(rets[1], None)
params = TransferCacheParameters(kv_cache, transfer_configs, None, None, None)
job = MockTransferCacheJob(params)
job.init()
job.transfer_layers()
self.assertEqual(job.get_results()[0], LLMStatusCode.LLM_WAIT_PROCESS_TIMEOUT)
self.assertEqual(job.get_results()[1], None)
src_block_indices = [1, 2]
with self.assertRaisesRegex(LLMException, "transfer from blocks to cache is not supported"):
_ = kv_cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(False), transfer_configs,
src_block_indices)
with self.assertRaises(TypeError):
_ = kv_cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(False), transfer_configs,
[0, 1], [0, "1"])
with self.assertRaises(ValueError):
_ = kv_cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(False), transfer_configs,
[-1, 1], [0, 1])
with self.assertRaises(ValueError):
_ = kv_cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(False), transfer_configs,
[0, 1], [-1, 1])
with self.assertRaises(LLMException):
_ = kv_cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(False), transfer_configs,
[0, 1], [0])
with self.assertRaises(LLMException):
_ = kv_cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(False), transfer_configs,
[], [], 1)
with self.assertRaises(ValueError):
_ = kv_cache_manager.transfer_cache_async(kv_cache, LayerSynchronizerImpl(False), transfer_configs,
[0, 1], [0, 1], -1)
with self.assertRaises(TypeError):
_ = kv_cache_manager.transfer_cache_async('cache', LayerSynchronizerImpl(False), transfer_configs)
with self.assertRaises(TypeError):
_ = kv_cache_manager.transfer_cache_async(None, LayerSynchronizerImpl(False), transfer_configs)
kv_cache_manager.deallocate_cache(kv_cache)
kv_cache_manager.remove_cache_key(cache_key)
engine.finalize()
def test_transfer_config(self):
config = TransferConfig(2, [1000, 2000])
self.assertEqual(config.dst_cluster_id, 2)
self.assertEqual(config.dst_addrs, [1000, 2000])
self.assertIsNone(config.src_layer_range)
self.assertEqual(config.src_batch_index, 0)
with self.assertRaises(TypeError):
config.dst_cluster_id = None
with self.assertRaises(TypeError):
config.dst_cluster_id = '123'
with self.assertRaises(TypeError):
config.dst_addrs = None
with self.assertRaises(TypeError):
config.dst_addrs = '123'
with self.assertRaises(TypeError):
config.dst_addrs = ['123']
with self.assertRaises(ValueError):
config.dst_addrs = [-1]
with self.assertRaises(TypeError):
config.src_layer_range = [0, 1]
with self.assertRaises(LLMException):
config.src_layer_range = range(0, 5, 2)
with self.assertRaises(LLMException):
config.src_layer_range = range(0, 1, -1)
with self.assertRaises(LLMException):
config.src_layer_range = range(4, 1)
with self.assertRaises(LLMException):
config.src_layer_range = range(0, 0)
with self.assertRaises(TypeError):
config.src_batch_index = None
with self.assertRaises(TypeError):
config.src_batch_index = '1'
with self.assertRaises(ValueError):
config.src_batch_index = -1
with self.assertRaises(ValueError):
config.src_batch_index = 2 ** 32
with self.assertRaises(TypeError):
_ = TransferConfig(2, [1000, 2000], None, '1')
with self.assertRaises(TypeError):
_ = TransferConfig(2, [1000, 2000], [0, 1])
with self.assertRaises(TypeError):
_ = TransferConfig(2, [1000, '2000'])
with self.assertRaises(TypeError):
_ = TransferConfig(2, None)
with self.assertRaises(TypeError):
_ = TransferConfig('1', [1000, 2000])
def test_multiple_devices(self):
llm_config = LLMConfig()
llm_config.device_id = [1, 2]
llm_config.listen_ip_info = "127.0.0.1:26000;127.0.0.1:26000"
engine_options = llm_config.generate_options()
print("engine_options:", engine_options)
prompt_engine = LLMDataDist(LLMRole.PROMPT, 0)
prompt_engine.init(engine_options)
print(engine_options['llm.ClusterInfo'])
def test_tensor_bf16(self):
arr1 = np.array([1.875], np.float16)
tensor_desc = TensorDesc(DataType.DT_BF16, [1])
tensor = Tensor(arr1, tensor_desc)
print("generated numpy:", tensor.numpy())
self.assertEqual(tensor.numpy().dtype, np.float32)
self.assertEqual(int(tensor.numpy()[0]), 1)
def test_tensor_foat32_copy_true(self):
arr1 = np.array([[1.0, 2.0], [3.0, 4.0]], np.float32)
tensor_desc = TensorDesc(DataType.DT_FLOAT, (2, 2))
tensor = Tensor(arr1, tensor_desc)
res = tensor.numpy(copy=True)
print("generated numpy:", res)
np.testing.assert_array_equal(res, arr1)
self.assertEqual(res.dtype, np.float32)
self.assertTrue(res.flags.c_contiguous)
self.assertTrue(res.flags.writeable)
arr1[0, 0] = 999.0
self.assertNotEqual(res[0, 0], 999.0)
def test_str_tensor(self):
arr1 = np.array([["aaaa", "bbbb"], ["ccc", "ddd"]])
src_type = arr1.dtype
tensor = Tensor(arr1)
self.assertEqual(tensor.numpy(True).dtype, src_type)
print(tensor.numpy(True))
def test_check_flow_graph_mem_max_size(self):
cluster_id = 0
llm_engine = LLMDataDist(LLMRole.PROMPT, cluster_id)
llm_config = LLMConfig()
llm_config.device_id = 0
llm_config.listen_ip_info = "0.0.0.0:26000"
llm_config.ge_options = {"ge.flowGraphMemMaxSize": "-1"}
init_options = llm_config.generate_options()
has_err = False
try:
llm_engine.init(init_options)
except LLMException:
has_err = True
self.assertEqual(has_err, True)
def test_check_flow_graph_mem_max_size2(self):
cluster_id = 0
llm_engine = LLMDataDist(LLMRole.PROMPT, cluster_id)
llm_config = LLMConfig()
llm_config.device_id = 0
llm_config.listen_ip_info = "0.0.0.0:26000"
llm_config.ge_options = {"llm.EnableCacheManager": "0"}
init_options = llm_config.generate_options()
has_err = False
try:
llm_engine.init(init_options)
except LLMException:
has_err = True
self.assertEqual(has_err, False)