import json

import socketio
from aiohttp import web

from util.aio_kafka_consumer_connect import AIOKafkaConsumerConnect
from util.kafka_producer_connect import KafkaProducerConnect
from util.logger import Logger

PRODUCER = KafkaProducerConnect()

logger = Logger().logger
# 房间ID与异步消费者映射
ROOM_ID_CONSUMER_MAPPING = dict()
# socket_id与房间ID映射
SID_ROOM_MAPPING = dict()
# 房间ID与房间的人数映射
ROOM_HUMAN_COUNT = dict()
# 房间ID与房间的状态映射
ROOM_READY_CONSUMER = dict()


class AsyncSocketIO:
    """异步SocketIO"""
    def __init__(self):
        self.app = web.Application()
        self.sio = socketio.AsyncServer(cors_allowed_origins="*")
        self.event_list = list()

    def register_event(self, event, namespace="/"):
        """
        app注册事件装饰器
        :param event: 事件名
        :param namespace: 命名空间
        :return:
        """
        def inner_register_event(func):
            def wrapper():
                self.event_list.append({
                    "event": event,
                    "handler": func,
                    "namespace": namespace
                })
            return wrapper()
        return inner_register_event

    def run_app(self, host, port, **kwargs):
        """运行app"""
        for event in self.event_list:
            self.sio.on(**event)
        self.sio.attach(self.app)
        web.run_app(self.app, host=host, port=port, **kwargs)


NAMESPACE = "/chat"
app = AsyncSocketIO()


@app.register_event("disconnect", NAMESPACE)
async def disconnect(sid):
    """
    socket断开连接
    :param sid: socket_id
    :return:
    """
    topic_id = SID_ROOM_MAPPING[sid]
    logger.info(topic_id)
    ROOM_HUMAN_COUNT[topic_id] -= 1
    if ROOM_HUMAN_COUNT.get(topic_id) == 0:
        ROOM_HUMAN_COUNT.pop(topic_id)
        await app.sio.close_room(topic_id, namespace=NAMESPACE)
        consumer_connect = ROOM_ID_CONSUMER_MAPPING.get(topic_id)
        logger.info(f"关闭topic:{topic_id}")
        await consumer_connect.consumer.stop()
        ROOM_ID_CONSUMER_MAPPING.pop(topic_id)
    # await app.sio.emit("rev_msg", {"message": "连接结束", "messageType": 0}, to=sid, namespace=NAMESPACE)
    await app.sio.disconnect(sid)


@app.register_event("get_message", NAMESPACE)
async def get_message(sid, data):
    """
    获取队列消息
    :param sid: socket_id
    :param data: data: {“topicId”: str}
    :return:
    """
    logger.info(data)
    topic_id = data.get('topicId')
    if ROOM_READY_CONSUMER.get(topic_id):
        logger.info(f"{topic_id}消费已启动")
        return
    ROOM_READY_CONSUMER[topic_id] = True
    consumer_connect = ROOM_ID_CONSUMER_MAPPING.get(topic_id)
    logger.info("开始消费")
    await consumer_connect.consumer.start()
    try:
        async for msg in consumer_connect.consumer:
            msg = json.loads(msg.value.decode("utf-8"))
            logger.info(f"kafka:{str(msg)}")
            await app.sio.emit("rev_msg", msg, to=topic_id, namespace=NAMESPACE)
    except Exception as e:
        logger.info(e)
        await app.sio.disconnect(sid, namespace=NAMESPACE)
        await consumer_connect.consumer.stop()


@app.register_event("create_or_connect_channel", NAMESPACE)
async def create_or_connect_channel(sid, data):
    """
    创建或连接通道
    :param sid: socket_id
    :param data: {“topicId”: str}
    :return:
    """
    logger.info(data)
    topic_id = data.get('topicId')
    app.sio.enter_room(sid, topic_id, NAMESPACE)
    SID_ROOM_MAPPING[sid] = topic_id
    if ROOM_ID_CONSUMER_MAPPING.get(topic_id):
        await app.sio.emit("rev_msg", {"message": "群组已创建", "messageType": 0}, to=sid, namespace=NAMESPACE)
        ROOM_HUMAN_COUNT[topic_id] += 1
    else:
        consumer_connect = AIOKafkaConsumerConnect(topic_id, "default")
        ROOM_READY_CONSUMER[topic_id] = False
        logger.info("consumer_connect success")
        ROOM_ID_CONSUMER_MAPPING[topic_id] = consumer_connect
        ROOM_HUMAN_COUNT[topic_id] = 1
        await app.sio.emit("rev_msg", {"message": "群组创建成功", "messageType": 0}, to=sid, namespace=NAMESPACE)


@app.register_event("send_msg", NAMESPACE)
async def send_msg(sid, data):
    """
    发送消息
    :param sid:  socket_id
    :param data: {“topicId”: str, "message": str, "messageType": int, "userName": str}
    :return:
    """
    logger.info(data)
    topic_id = data['topicId']
    PRODUCER.send_msg(topic_id, data)
    await app.sio.emit("rev_msg", {"message": "发送成功", "messageType": 0}, to=sid, namespace=NAMESPACE)


if __name__ == '__main__':
    app.run_app(host="0.0.0.0", port=5000)