| 文件 | 最后提交记录 | 最后更新时间 |
|---|---|---|
| 3 年前 | ||
| 3 年前 | ||
| 3 年前 | ||
| 3 年前 | ||
| 3 年前 |
华为云 Python Cache For Redis 演示
1.说明
本项目将演示通过redis对DCS进行操作。
1.1 环境准备
- python3.9.2
- Pycharm 2022+ / Visual Studio 2022+
- Redis 4.0/5.0
1.2必备Pypi包
- redis == 4.3.4
1.3 数据库选型
- 分布式缓存服务(Distributed Cache Service,简称DCS)
更多详情,请参考DCS 成长地图
1.4 核心代码介绍
- src\redis_connect.py
class RedisUtil:
def __new__(cls, *args, **kwargs):
# 获取配置文件路径
dir_path = os.path.split(os.path.split(__file__)[0])[0]
config_path = os.path.join(dir_path, r"config\config.json")
with open(config_path, "r") as config_file:
# 反序列化配置文件
config = json.load(config_file)
# redis读取的值默认为二进制形式, decode_responses对查询的值进行解码
return redis.Redis(**config, decode_responses=True)
- test\redis_connect_test.py
对redis进行增删测试
class RedisTest(unittest.TestCase):
redis_connect = None
@classmethod
def setUpClass(cls) -> None:
cls.redis_connect = RedisUtil()
@classmethod
def tearDownClass(cls) -> None:
cls.redis_connect.close()
def test_0_insert_value(self):
"""测试添加值"""
print("开始添加值")
self.redis_connect.set("test", "test")
result = self.redis_connect.get("test")
self.assertTrue(result == "test")
def test_1_delete_value(self):
"""测试删除值"""
print("开始删除值")
self.redis_connect.delete("test")
result = self.redis_connect.get("test")
self.assertTrue(result is None)
- config\config.json
{
"host": "ip_address",
"port": "port",
"password": "password",
"db": "db_num"
}
password:用户密码
ip_address:数据库IP地址
port:数据库端口
db_num:数据库编号,为int类型,范围1到12