文件最后提交记录最后更新时间
3 年前
3 年前
README.md

1.项目介绍

本项目是华为云开发者团队基于华为云分布式消息服务Rocketmq搭建集群,模拟电商场景。电商场景中通常会涉及到订单、支付和通知等等场景的业务处理。业务链通常都是多个系统相互协作完成一次作业,上层服务强依赖于下层服务,上层服务的性能会强依赖于下层服务,当业务链过深,则会严重影响外层服务的性能和用户体验。

2.设计思想

系统解耦

分布式消息服务RocketMQ版可以解除多个业务系统之间的耦合度,提升各系统的处理能力和响应速度。

消息定时、延迟

分布式消息服务RocketMQ版提供的定时、延迟等能力,满足需要订阅通知的电商场景。

3.参数介绍

NamesrvAddr: 实例的元数据连接地址

topic: 消息主题,消息发送与接收的基本单元

Message Key: 消息Key

Message Tag: 消息Tag

__STARTDELIVERTIME: 消息定时投递的时间戳

4.核心代码说明

生产者(Order、Pay、Notice)

@Controller
@RequestMapping("/order")
public class OrderSystemProducer {

    private static final Logger logger = LoggerFactory.getLogger(OrderSystemProducer.class);

    @RequestMapping("/produce")
    @ResponseBody
    public String produceOrder() {
        SendResult sendResult = new SendResult();
        try {
            DefaultMQProducer producer = new DefaultMQProducer("OrderSystemProducer");
            // 元数据连接地址
            producer.setNamesrvAddr("100.85.217.111:8200;100.93.3.194:8200");
            producer.start();

            // 订单消息标识
            String[] tags = new String[]{"CommonOrderTag", "ScheduledOrderTag"};
            String orderId = "order0";

            // 生产订单消息
            Message msg = new Message("OrderTopic", tags[0], "KEY0",
                    ("common order message").getBytes(StandardCharsets.UTF_8));
            sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    String orderId = (String) arg;
                    int index = Math.abs(orderId.hashCode() % mqs.size());
                    return mqs.get(index);
                }
            }, orderId);

            // 延时发送生产订单消息
            // 延时消息投递时间戳,该消息60秒后投递
            final long deliverTimestamp = Instant.now().plusSeconds(60).toEpochMilli();
            // 创建消息对象
            Message scheduledMsg = new Message("OrderTopic",
                    tags[1],
                    "KEY1",
                    "scheduled order message".getBytes(StandardCharsets.UTF_8));
            // 设置消息定时投递的时间戳属性
            scheduledMsg.putUserProperty("__STARTDELIVERTIME", String.valueOf(deliverTimestamp));
            producer.send(scheduledMsg);

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            logger.error("send message exception {}", e);
        }
        return sendResult.toString();
    }
}
@Controller
@RequestMapping("/pay")
public class PaySystemProducer {

    private static final Logger logger = LoggerFactory.getLogger(PaySystemProducer.class);

    @RequestMapping("/produce")
    @ResponseBody
    public void producePay() {
        DefaultMQProducer producer = new DefaultMQProducer("PaySystemProducer");
        // 填入元数据连接地址
        producer.setNamesrvAddr("100.85.217.111:8200;100.93.3.194:8200");
        try {
            producer.start();
            String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 10; i++) {
                String orderId = "order" + (i % 10);
                Message msg = new Message("PayTopic", tags[i % tags.length], "KEY" + i,
                        ("This is pay " + i).getBytes(StandardCharsets.UTF_8));
                SendResult sendResult = RocketmqSampleUtils.sendMessage(producer, msg, orderId);
                logger.info(sendResult.toString());
            }
        } catch (Exception e) {
            logger.info(e.getMessage());
        }
        producer.shutdown();
    }
}
@Controller
@RequestMapping("/notice")
public class NoticeSystemProducer {

    private static final Logger logger = LoggerFactory.getLogger(NoticeSystemProducer.class);

    @RequestMapping("/produce")
    @ResponseBody
    public void produceNotice() {
        DefaultMQProducer producer = new DefaultMQProducer("NoticeSystemProducer");
        // 填入元数据连接地址
        producer.setNamesrvAddr("100.85.217.111:8200;100.93.3.194:8200");
        try {
            producer.start();
            String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 10; i++) {
                String orderId = "order" + (i % 10);
                Message msg = new Message("NoticeTopic", tags[i % tags.length], "KEY" + i, ("This is notice " + i).getBytes(StandardCharsets.UTF_8));
                SendResult sendResult = RocketmqSampleUtils.sendMessage(producer, msg, orderId);
                logger.info(sendResult.toString());
            }
        } catch (Exception e) {
            logger.info(e.getMessage());
        }
        producer.shutdown();
    }
}

消费者(Order、Pay、Notice)

@Controller
@RequestMapping("/order")
public class OrderSystemConsumer {

    @Autowired
    private IOrderService orderService;

    @RequestMapping("/consume")
    @ResponseBody
    public String consumeOrder() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderSystemConsumer");
        // 填入元数据连接地址
        consumer.setNamesrvAddr("100.85.217.111:8200;100.93.3.194:8200");

        consumer.subscribe("OrderTopic", "*");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt ext : msgs) {
                    // 获取定时订单消息
                    if ("ScheduledOrderTag".equals(ext.getTags())) {
                        // 订单超时未支付则关闭订单
                        OrderBean orderBean = new OrderBean(false);
                        if (orderBean.isPay()) {
                            orderBean.setStatus("closed");
                            orderService.updateOrder(orderBean);
                        }
                    }
                }

                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 3) == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        return "consuming order...";
    }

}
@Controller
@RequestMapping("/pay")
public class PaySystemConsumer {

    @RequestMapping("/consume")
    @ResponseBody
    public String consumePay() throws MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PaySystemConsumer");
        // 填入元数据连接地址
        consumer.setNamesrvAddr("100.85.217.111:8200;100.93.3.194:8200");

        consumer.subscribe("PayTopic", "*");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 3) == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        return "consuming pay...";
    }
}
@Controller
@RequestMapping("/notice")
public class NoticeSystemConsumer {

    @RequestMapping("/consume")
    @ResponseBody
    public String consumeNotice() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("NoticeSystemConsumer");
        // 填入元数据连接地址
        consumer.setNamesrvAddr("100.85.217.111:8200;100.93.3.194:8200");

        consumer.subscribe("NoticeTopic", "*");
        RocketmqSampleUtils.addMessageListener(consumer);
        consumer.start();
        return "consuming notice...";
    }
}

生产延时消息

// 延时发送生产订单消息
// 延时消息投递时间戳,该消息60秒后投递
final long deliverTimestamp = Instant.now().plusSeconds(60).toEpochMilli();
// 创建消息对象
Message scheduledMsg = new Message("OrderTopic",
tags[1],
"KEY1",
"scheduled order message".getBytes(StandardCharsets.UTF_8));
// 设置消息定时投递的时间戳属性
scheduledMsg.putUserProperty("__STARTDELIVERTIME", String.valueOf(deliverTimestamp));
producer.send(scheduledMsg);

生产消息成功样例

huaweicloud-lts-sdk-intergration-sample.PNG

消费消息成功样例

如果开了消息轨迹后可以在消息页面,查看消费轨迹

huaweicloud-lts-sdk-intergration-sample.PNG

程序总入口

本地启动tomcat前,替换index.jsp文件的端口、包名,前端访问地址为:http://localhost:8080/huaweicloud_rocketmq_sample_Java_war_exploded/rocketmq/demo. 其中8080为tomcat端口,huaweicloud_rocketmq_sample_Java_war_exploded为包名。

<%@ page language="java" contentType="text/html; charset=ISO-8859-1"
pageEncoding="ISO-8859-1"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1">
<title>index</title>
</head>
<body>

PRODUCE
<form action = "http://127.0.0.1:8818/huaweicloud-sdk-java-dms-rocketmq/order/produce">
    <input type="submit" value="Order">
</form>
<form action = "http://127.0.0.1:8818/huaweicloud-sdk-java-dms-rocketmq/pay/produce">
    <input type="submit" value="Pay">
</form>
<form action = "http://127.0.0.1:8818/huaweicloud-sdk-java-dms-rocketmq/notice/produce">
    <input type="submit" value="Notice">
</form>
<br/><br/>
CONSUMER
<form action = "http://127.0.0.1:8818/huaweicloud-sdk-java-dms-rocketmq/order/consume">
    <input type="submit" value="Order">
</form>
<form action = "http://127.0.0.1:8818/huaweicloud-sdk-java-dms-rocketmq/pay/consume">
    <input type="submit" value="Pay">
</form>
<form action = "http://127.0.0.1:8818/huaweicloud-sdk-java-dms-rocketmq/notice/consume">
    <input type="submit" value="Notice">
</form>
</body>
</html>

5.组织结构

huaweicloud-rocketmq-sample-Java—— 基于DMS for Rocketmq的SDK包

6.技术选型

技术 说明 官网
Spring Web Flow Spring MVC框架 https://spring.io/projects/spring-webflow
rocketmq-client rocketmq-clients https://rocketmq.apache.org/docs/4.x/

7.开发环境

工具 版本号 下载
JDK 1.8 https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

8.参考指南

收发普通消息_分布式消息服务RocketMQ版_开发指南

发送定时消息_分布式消息服务RocketMQ版_开发指南