import multiprocessing
import sys
import time
import unittest
import acl
from memcache_hybrid import DistributedObjectStore, LocalConfig, MetaConfig, MetaService
def start_meta_service():
config = MetaConfig()
config.meta_service_url = "tcp://127.0.0.1:5000"
config.config_store_url = "tcp://127.0.0.1:6000"
config.metrics_url = "http://127.0.0.1:8000"
try:
assert MetaService.setup(config) == 0, "setup meta config failed"
MetaService.main()
except Exception as e:
print(f"MetaService 出错: {e}")
proc = multiprocessing.Process(target=start_meta_service)
proc.start()
print(f"子进程已启动,PID: {proc.pid}")
time.sleep(3)
class TestExample(unittest.TestCase):
key_1 = "key_1"
original_data = b"some data!"
def setUp(self):
print("开始执行测试...")
acl.init()
count, ret = acl.rt.get_device_count()
print("设备数量:", acl.rt.get_device_count())
ret = acl.rt.set_device(count - 1)
print("set_device returned: {}".format(ret))
config = LocalConfig()
config.protocol = "device_rdma"
config.dram_size = "10GB"
config.max_dram_size = "64GB"
print(config)
self._distributed_object_store = DistributedObjectStore()
res = self._distributed_object_store.setup(config)
self.assertEqual(res, 0)
res = self._distributed_object_store.init(count - 1)
self.assertEqual(res, 0)
def test_1(self):
exist_res = self._distributed_object_store.is_exist(self.key_1)
self.assertEqual(exist_res, 0)
put_res = self._distributed_object_store.put(self.key_1, self.original_data)
self.assertEqual(put_res, 0)
exist_res = self._distributed_object_store.is_exist(self.key_1)
self.assertEqual(exist_res, 1)
key_info = self._distributed_object_store.get_key_info(self.key_1)
print(key_info)
retrieved_data = self._distributed_object_store.get(self.key_1)
print(retrieved_data)
self.assertEqual(retrieved_data, self.original_data)
time.sleep(3)
rm_res = self._distributed_object_store.remove(self.key_1)
self.assertEqual(rm_res, 0)
exist_res = self._distributed_object_store.is_exist(self.key_1)
self.assertEqual(exist_res, 0)
def tearDown(self):
self._distributed_object_store.close()
print("object store destroyed")
print(f"测试完成,PID: {proc.pid}")
if proc.is_alive():
print("正在终止子进程...")
proc.terminate()
proc.join(timeout=2)
if proc.is_alive():
print("子进程未响应,强制杀死...")
proc.kill()
proc.join()
if __name__ == "__main__":
unittest.main()