import json
import os
import pika
class RabbitMQConnect:
"""Rabbit消息队列连接基类"""
def __init__(self):
dir_path = os.path.split(os.path.split(__file__)[0])[0]
config_path = os.path.join(dir_path, f"config{os.sep}config.json")
with open(config_path, "r") as config_file:
config = json.load(config_file)
config = config.get("rabbit_mq", {})
self.ip_address = config.get("ip_address", "")
self.port = config.get("port", "")
self.username = config.get("username", "")
self.password = config.get("password", "")
if not self.ip_address:
raise Exception("配置文件中ip_address为空值")
self.collection = self.init_collection()
self.channel = self.collection.channel()
def init_collection(self):
"""初始化连接"""
credentials = pika.PlainCredentials(self.username, self.password)
return pika.BlockingConnection(pika.ConnectionParameters(
host=self.ip_address, port=self.port, credentials=credentials))
def state_queue(self, queue_name, durable=True):
"""
声明队列
:param queue_name: 队列名
:param durable: 是否持久化
"""
self.channel.queue_declare(queue=queue_name, durable=durable)
def state_exchange(self, exchange, durable=True, exchange_type='direct'):
"""
声明exchange
:param exchange: exchange名
:param durable: 是否持久化
:param exchange_type: exchange类型
"""
self.channel.exchange_declare(exchange=exchange, durable=durable, exchange_type=exchange_type)
def delete_queue(self, queue_name):
"""
删除队列
:param queue_name: 队列名
"""
self.channel.queue_delete(queue_name)
def delete_exchange(self, exchange):
"""
删除exchange
:param exchange: exchange名
"""
self.channel.exchange_delete(exchange)
def close(self):
"""关闭连接"""
self.collection.close()
class RabbitMQProducer(RabbitMQConnect):
"""RabbitMQ生产者实现"""
def send_message_to_queue(self, exchange, queue_name, message, durable=True):
"""
发送消息至队列
:param exchange: 交换机
:param queue_name: 队列名
:param message: 信息
:param durable: 是否持久化
"""
message = json.dumps(message).encode("utf-8")
if not durable:
self.channel.basic_publish(exchange=exchange, routing_key=queue_name, body=message)
else:
self.channel.basic_publish(
exchange=exchange, routing_key=queue_name, body=message,
properties=pika.BasicProperties(delivery_mode=2))
class RabbitMQConsumer(RabbitMQConnect):
"""RabbitMQ消费者实现"""
def queue_bind_exchange(self, queue_name, exchange):
"""
绑定队列及交换机
:param queue_name: 队列名
:param exchange: 交换机
"""
self.channel.queue_bind(exchange=exchange, queue=queue_name)
def consume_msg(self, queue_name, call_back):
"""
消费消息,阻塞式的接受队列中的消息
:param queue_name: 队列名
:param call_back: 回调函数,参数默认为chanel, method, properties, body
"""
self.channel.basic_consume(queue_name, call_back)
self.channel.start_consuming()