import json
import socketio
from aiohttp import web
from util.aio_kafka_consumer_connect import AIOKafkaConsumerConnect
from util.kafka_producer_connect import KafkaProducerConnect
from util.logger import Logger
PRODUCER = KafkaProducerConnect()
logger = Logger().logger
ROOM_ID_CONSUMER_MAPPING = dict()
SID_ROOM_MAPPING = dict()
ROOM_HUMAN_COUNT = dict()
ROOM_READY_CONSUMER = dict()
class AsyncSocketIO:
"""异步SocketIO"""
def __init__(self):
self.app = web.Application()
self.sio = socketio.AsyncServer(cors_allowed_origins="*")
self.event_list = list()
def register_event(self, event, namespace="/"):
"""
app注册事件装饰器
:param event: 事件名
:param namespace: 命名空间
:return:
"""
def inner_register_event(func):
def wrapper():
self.event_list.append({
"event": event,
"handler": func,
"namespace": namespace
})
return wrapper()
return inner_register_event
def run_app(self, host, port, **kwargs):
"""运行app"""
for event in self.event_list:
self.sio.on(**event)
self.sio.attach(self.app)
web.run_app(self.app, host=host, port=port, **kwargs)
NAMESPACE = "/chat"
app = AsyncSocketIO()
@app.register_event("disconnect", NAMESPACE)
async def disconnect(sid):
"""
socket断开连接
:param sid: socket_id
:return:
"""
topic_id = SID_ROOM_MAPPING[sid]
logger.info(topic_id)
ROOM_HUMAN_COUNT[topic_id] -= 1
if ROOM_HUMAN_COUNT.get(topic_id) == 0:
ROOM_HUMAN_COUNT.pop(topic_id)
await app.sio.close_room(topic_id, namespace=NAMESPACE)
consumer_connect = ROOM_ID_CONSUMER_MAPPING.get(topic_id)
logger.info(f"关闭topic:{topic_id}")
await consumer_connect.consumer.stop()
ROOM_ID_CONSUMER_MAPPING.pop(topic_id)
await app.sio.disconnect(sid)
@app.register_event("get_message", NAMESPACE)
async def get_message(sid, data):
"""
获取队列消息
:param sid: socket_id
:param data: data: {“topicId”: str}
:return:
"""
logger.info(data)
topic_id = data.get('topicId')
if ROOM_READY_CONSUMER.get(topic_id):
logger.info(f"{topic_id}消费已启动")
return
ROOM_READY_CONSUMER[topic_id] = True
consumer_connect = ROOM_ID_CONSUMER_MAPPING.get(topic_id)
logger.info("开始消费")
await consumer_connect.consumer.start()
try:
async for msg in consumer_connect.consumer:
msg = json.loads(msg.value.decode("utf-8"))
logger.info(f"kafka:{str(msg)}")
await app.sio.emit("rev_msg", msg, to=topic_id, namespace=NAMESPACE)
except Exception as e:
logger.info(e)
await app.sio.disconnect(sid, namespace=NAMESPACE)
await consumer_connect.consumer.stop()
@app.register_event("create_or_connect_channel", NAMESPACE)
async def create_or_connect_channel(sid, data):
"""
创建或连接通道
:param sid: socket_id
:param data: {“topicId”: str}
:return:
"""
logger.info(data)
topic_id = data.get('topicId')
app.sio.enter_room(sid, topic_id, NAMESPACE)
SID_ROOM_MAPPING[sid] = topic_id
if ROOM_ID_CONSUMER_MAPPING.get(topic_id):
await app.sio.emit("rev_msg", {"message": "群组已创建", "messageType": 0}, to=sid, namespace=NAMESPACE)
ROOM_HUMAN_COUNT[topic_id] += 1
else:
consumer_connect = AIOKafkaConsumerConnect(topic_id, "default")
ROOM_READY_CONSUMER[topic_id] = False
logger.info("consumer_connect success")
ROOM_ID_CONSUMER_MAPPING[topic_id] = consumer_connect
ROOM_HUMAN_COUNT[topic_id] = 1
await app.sio.emit("rev_msg", {"message": "群组创建成功", "messageType": 0}, to=sid, namespace=NAMESPACE)
@app.register_event("send_msg", NAMESPACE)
async def send_msg(sid, data):
"""
发送消息
:param sid: socket_id
:param data: {“topicId”: str, "message": str, "messageType": int, "userName": str}
:return:
"""
logger.info(data)
topic_id = data['topicId']
PRODUCER.send_msg(topic_id, data)
await app.sio.emit("rev_msg", {"message": "发送成功", "messageType": 0}, to=sid, namespace=NAMESPACE)
if __name__ == '__main__':
app.run_app(host="0.0.0.0", port=5000)