1.项目介绍
本项目是华为云开发者团队基于华为云分布式消息服务Rocketmq搭建集群,模拟电商场景。电商场景中通常会涉及到订单、支付和通知等等场景的业务处理。业务链通常都是多个系统相互协作完成一次作业,上层服务强依赖于下层服务,上层服务的性能会强依赖于下层服务,当业务链过深,则会严重影响外层服务的性能和用户体验。
2.设计思想
系统解耦
分布式消息服务RocketMQ版可以解除多个业务系统之间的耦合度,提升各系统的处理能力和响应速度。
消息定时、延迟
分布式消息服务RocketMQ版提供的定时、延迟等能力,满足需要订阅通知的电商场景。
3.参数介绍
NamesrvAddr: 实例的元数据连接地址
topic: 消息主题,消息发送与接收的基本单元
Message Key: 消息Key
Message Tag: 消息Tag
__STARTDELIVERTIME: 消息定时投递的时间戳
4.核心代码说明
生产者(Order、Pay、Notice)
@Controller
@RequestMapping("/order")
public class OrderSystemProducer {
private static final Logger logger = LoggerFactory.getLogger(OrderSystemProducer.class);
@RequestMapping("/produce")
@ResponseBody
public String produceOrder() {
SendResult sendResult = new SendResult();
try {
DefaultMQProducer producer = new DefaultMQProducer("OrderSystemProducer");
// 元数据连接地址
producer.setNamesrvAddr("100.85.217.111:8200;100.93.3.194:8200");
producer.start();
// 订单消息标识
String[] tags = new String[]{"CommonOrderTag", "ScheduledOrderTag"};
String orderId = "order0";
// 生产订单消息
Message msg = new Message("OrderTopic", tags[0], "KEY0",
("common order message").getBytes(StandardCharsets.UTF_8));
sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String orderId = (String) arg;
int index = Math.abs(orderId.hashCode() % mqs.size());
return mqs.get(index);
}
}, orderId);
// 延时发送生产订单消息
// 延时消息投递时间戳,该消息60秒后投递
final long deliverTimestamp = Instant.now().plusSeconds(60).toEpochMilli();
// 创建消息对象
Message scheduledMsg = new Message("OrderTopic",
tags[1],
"KEY1",
"scheduled order message".getBytes(StandardCharsets.UTF_8));
// 设置消息定时投递的时间戳属性
scheduledMsg.putUserProperty("__STARTDELIVERTIME", String.valueOf(deliverTimestamp));
producer.send(scheduledMsg);
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
logger.error("send message exception {}", e);
}
return sendResult.toString();
}
}
@Controller
@RequestMapping("/pay")
public class PaySystemProducer {
private static final Logger logger = LoggerFactory.getLogger(PaySystemProducer.class);
@RequestMapping("/produce")
@ResponseBody
public void producePay() {
DefaultMQProducer producer = new DefaultMQProducer("PaySystemProducer");
// 填入元数据连接地址
producer.setNamesrvAddr("100.85.217.111:8200;100.93.3.194:8200");
try {
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
String orderId = "order" + (i % 10);
Message msg = new Message("PayTopic", tags[i % tags.length], "KEY" + i,
("This is pay " + i).getBytes(StandardCharsets.UTF_8));
SendResult sendResult = RocketmqSampleUtils.sendMessage(producer, msg, orderId);
logger.info(sendResult.toString());
}
} catch (Exception e) {
logger.info(e.getMessage());
}
producer.shutdown();
}
}
@Controller
@RequestMapping("/notice")
public class NoticeSystemProducer {
private static final Logger logger = LoggerFactory.getLogger(NoticeSystemProducer.class);
@RequestMapping("/produce")
@ResponseBody
public void produceNotice() {
DefaultMQProducer producer = new DefaultMQProducer("NoticeSystemProducer");
// 填入元数据连接地址
producer.setNamesrvAddr("100.85.217.111:8200;100.93.3.194:8200");
try {
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
String orderId = "order" + (i % 10);
Message msg = new Message("NoticeTopic", tags[i % tags.length], "KEY" + i, ("This is notice " + i).getBytes(StandardCharsets.UTF_8));
SendResult sendResult = RocketmqSampleUtils.sendMessage(producer, msg, orderId);
logger.info(sendResult.toString());
}
} catch (Exception e) {
logger.info(e.getMessage());
}
producer.shutdown();
}
}
消费者(Order、Pay、Notice)
@Controller
@RequestMapping("/order")
public class OrderSystemConsumer {
@Autowired
private IOrderService orderService;
@RequestMapping("/consume")
@ResponseBody
public String consumeOrder() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderSystemConsumer");
// 填入元数据连接地址
consumer.setNamesrvAddr("100.85.217.111:8200;100.93.3.194:8200");
consumer.subscribe("OrderTopic", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt ext : msgs) {
// 获取定时订单消息
if ("ScheduledOrderTag".equals(ext.getTags())) {
// 订单超时未支付则关闭订单
OrderBean orderBean = new OrderBean(false);
if (orderBean.isPay()) {
orderBean.setStatus("closed");
orderService.updateOrder(orderBean);
}
}
}
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 3) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
return "consuming order...";
}
}
@Controller
@RequestMapping("/pay")
public class PaySystemConsumer {
@RequestMapping("/consume")
@ResponseBody
public String consumePay() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PaySystemConsumer");
// 填入元数据连接地址
consumer.setNamesrvAddr("100.85.217.111:8200;100.93.3.194:8200");
consumer.subscribe("PayTopic", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 3) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
return "consuming pay...";
}
}
@Controller
@RequestMapping("/notice")
public class NoticeSystemConsumer {
@RequestMapping("/consume")
@ResponseBody
public String consumeNotice() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("NoticeSystemConsumer");
// 填入元数据连接地址
consumer.setNamesrvAddr("100.85.217.111:8200;100.93.3.194:8200");
consumer.subscribe("NoticeTopic", "*");
RocketmqSampleUtils.addMessageListener(consumer);
consumer.start();
return "consuming notice...";
}
}
生产延时消息
// 延时发送生产订单消息
// 延时消息投递时间戳,该消息60秒后投递
final long deliverTimestamp = Instant.now().plusSeconds(60).toEpochMilli();
// 创建消息对象
Message scheduledMsg = new Message("OrderTopic",
tags[1],
"KEY1",
"scheduled order message".getBytes(StandardCharsets.UTF_8));
// 设置消息定时投递的时间戳属性
scheduledMsg.putUserProperty("__STARTDELIVERTIME", String.valueOf(deliverTimestamp));
producer.send(scheduledMsg);
生产消息成功样例

消费消息成功样例
如果开了消息轨迹后可以在消息页面,查看消费轨迹

程序总入口
本地启动tomcat前,替换index.jsp文件的端口、包名,前端访问地址为:http://localhost:8080/huaweicloud_rocketmq_sample_Java_war_exploded/rocketmq/demo. 其中8080为tomcat端口,huaweicloud_rocketmq_sample_Java_war_exploded为包名。
<%@ page language="java" contentType="text/html; charset=ISO-8859-1"
pageEncoding="ISO-8859-1"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1">
<title>index</title>
</head>
<body>
PRODUCE
<form action = "http://127.0.0.1:8818/huaweicloud-sdk-java-dms-rocketmq/order/produce">
<input type="submit" value="Order">
</form>
<form action = "http://127.0.0.1:8818/huaweicloud-sdk-java-dms-rocketmq/pay/produce">
<input type="submit" value="Pay">
</form>
<form action = "http://127.0.0.1:8818/huaweicloud-sdk-java-dms-rocketmq/notice/produce">
<input type="submit" value="Notice">
</form>
<br/><br/>
CONSUMER
<form action = "http://127.0.0.1:8818/huaweicloud-sdk-java-dms-rocketmq/order/consume">
<input type="submit" value="Order">
</form>
<form action = "http://127.0.0.1:8818/huaweicloud-sdk-java-dms-rocketmq/pay/consume">
<input type="submit" value="Pay">
</form>
<form action = "http://127.0.0.1:8818/huaweicloud-sdk-java-dms-rocketmq/notice/consume">
<input type="submit" value="Notice">
</form>
</body>
</html>
5.组织结构
huaweicloud-rocketmq-sample-Java—— 基于DMS for Rocketmq的SDK包
6.技术选型
| 技术 | 说明 | 官网 |
|---|---|---|
| Spring Web Flow | Spring MVC框架 | https://spring.io/projects/spring-webflow |
| rocketmq-client | rocketmq-clients | https://rocketmq.apache.org/docs/4.x/ |
7.开发环境
| 工具 | 版本号 | 下载 |
|---|---|---|
| JDK | 1.8 | https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html |