import base64
import datetime
import json
import random
import pytz
from util.influx_connect import InfluxConnect
from util.kafka_producer_connect import KafkaProducerConnect
from util.middleware import get_lock_middleware
from util.redis_connect import RedisPoolConnect
from util.response_format import ResponseFormat
PRODUCER = KafkaProducerConnect()
REDIS_POOL_INFO = RedisPoolConnect(0)
INFLUX_CONNECT = InfluxConnect()
INFLUX_DATABASE = "every_topic_red_envelope_record"
@get_lock_middleware
def handler(event, context):
"""
抢红包Api
post请求参数
topicId 消息队列标题名,因influxdb的原因,topic_id不能带有“-”号
redEnvelopeId 红包id
userName 用户名
"""
logger = context.getLogger()
params = json.loads(base64.b64decode(event["body"]).decode().replace("'", '"'))
logger.info(params)
red_envelope_id = params.get("redEnvelopeId")
topic_id = params.get("topicId")
user_name = params.get("userName")
value = json.loads(REDIS_POOL_INFO.find_key_value(red_envelope_id))
total_amount = value.get("totalAmount")
total_count = value.get("totalCount")
query_result = INFLUX_CONNECT.query(
f"SELECT redEnvelopeId,userName,getMoney,localTime,time"
f" FROM {topic_id} WHERE redEnvelopeId='{red_envelope_id}'", database=INFLUX_DATABASE)
result = list()
is_grab = False
for query_value in query_result.get_points():
if user_name == query_value.get("userName"):
is_grab = True
result.append(query_value)
if is_grab:
return ResponseFormat({"code": 200, "msg": "你已抢过红包", "redEnvelopeInfo": result}, 200).to_json()
if total_count <= 0:
return ResponseFormat({"code": 200, "msg": "红包抢完啦", "redEnvelopeInfo": result}, 200).to_json()
rate = 1 if total_count == 1 else float("%.2f" % random.random())
get_money = total_amount * rate
REDIS_POOL_INFO.set_key_value(
red_envelope_id, json.dumps({"totalAmount": total_amount - get_money, "totalCount": total_count - 1}))
now_time = datetime.datetime.now()
local_time = now_time.strftime("%Y/%m/%d %H:%M:%S")
zero_time = now_time.astimezone(pytz.timezone("UTC")).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
INFLUX_CONNECT.write_points([{
"measurement": topic_id,
"tags": {
"redEnvelopeId": red_envelope_id,
},
"fields": {
"userName": user_name,
"getMoney": get_money,
"localTime": local_time,
}}], database=INFLUX_DATABASE)
msg = {"message": "{0} 抢了红包".format(user_name), "messageType": 1, "userName": None, "topicId": topic_id}
try:
PRODUCER.send_msg(topic_id, msg)
except ConnectionError:
logger.error("kafka连接错误,请检查kafka配置信息")
return ResponseFormat({"code": 500, "msg": "服务器错误"}, 500).to_json()
result.append({"redEnvelopeInfo": red_envelope_id, "userName": user_name, "getMoney": get_money,
"localTime": local_time, "time": zero_time})
return ResponseFormat({"code": 200, "msg": "恭喜你抢到了红包", "redEnvelopeInfo": result}, 200).to_json()