import unittest
import requests
from util.influx_connect import InfluxConnect
from test_case.common import SEND_API_URL, TEST_TOPIC, INFLUX_DATABASE
from test_case.kafka_client_connect import KafkaAdminClientConnect
class TestPrepared(unittest.TestCase):
kacc = None
influx_connect = None
@classmethod
def setUpClass(cls) -> None:
cls.kacc = KafkaAdminClientConnect()
cls.influx_connect = InfluxConnect()
@classmethod
def tearDownClass(cls) -> None:
cls.kacc.admin.close()
cls.influx_connect.close()
def test_create_influx_database(self):
"""创建influxdb的database"""
self.influx_connect.create_database(INFLUX_DATABASE)
def test_create_topic(self):
"""创建消息队列的topic"""
self.kacc.create_topic(TEST_TOPIC)
def test_send_red_envelope(self):
total_amount = 100
total_count = 5
user_name = "user_2"
headers = {
"X-Apig-AppCode": "",
"X-stage": "RELEASE",
"host": "",
}
data = {
"totalAmount": total_amount,
"totalCount": total_count,
"topicId": TEST_TOPIC,
"userName": user_name,
"message": "恭喜发财"
}
res = requests.post(SEND_API_URL, json=data, headers=headers, verify=False)
self.assertTrue(res.json().get("msg") == '红包发送成功')
if __name__ == '__main__':
unittest.main()