amqplib
简介
本软件是参照开源软件 amqplib 源码并用 TypeScript 语言实现了相关功能,在OpenHarmony上支持AMQP(Advanced Message Queuing Protocol)网络通信协议的library,可以在一个进程间传递异步消息。
下载安装
-
参考安装教程 如何安装OpenHarmony ohpm包
-
安装命令如下:
ohpm install @ohos/amqplib
使用说明
使用前言
-
需要搭建RabbitMQ环境,请根据该参考链接安装环境:https://zhuanlan.zhihu.com/p/381644577
-
RabbitMQ与Erlang是存在版本对应关系,两者版本如果不对应会出现许多问题,详见:https://www.rabbitmq.com/which-erlang.html
-
可参考版本对应关系:
- RabbitMQ版本为"rabbitmq-server-3.10.7.exe",下载链接:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.10.7
- Erlang版本为"otp_win64_25.0.exe",下载链接:https://www.erlang.org/patches/otp-25.0
-
由于RabbitMQ默认的guest只能从localhost连接不能使用远程连接,官方给出的解决方案是通过配置文件修改RabbitMQ的配置,以下是修改步骤:
-
找到RabbitMQ的安装目录,以rabbitmq-server-3.10.7为例;
-
在\rabbitmq_server-3.10.7\etc下有个README.txt文件,通过文件指引找到"真正"的RabbitMQ;
-
在C:\Users%USERNAME%\AppData\Roaming\RabbitMQ目录下新建一个配置文件:rabbitmq.config,并写入:
[{rabbit, [{loopback_users, []}]}]. -
保存并退出;
-
重启RabbitMQ服务:
-
win键+R输入cmd,打开命令行窗口;
-
输入以下命令打开电脑服务;
SERVICES.MSC -
找到RabbitMQ单击并点击重启动此服务;
-
-
-
测试机需联网并且与服务器的ip地址需要相同;
-
编译并安装hap包到测试机即可进行用例测试;
-
具体使用demo请参考开源库sample页面的实现;
接口说明
| 方法名 | 入参 | 接口描述 |
|---|---|---|
assertQueue |
(queue: string, options?: { durable?: boolean, exclusive?: boolean, autoDelete?: boolean, arguments?: Object }) |
声明队列,若不存在则创建。返回 { queue: string, messageCount: number, consumerCount: number }。 |
checkQueue |
(queue: string) |
检查队列是否存在,返回队列信息(messageCount 和 consumerCount)。 |
deleteQueue |
(queue: string, options?: { ifUnused?: boolean, ifEmpty?: boolean }) |
删除队列。ifUnused(仅当无消费者时删除)、ifEmpty(仅当队列为空时删除)。 |
purgeQueue |
(queue: string) |
清空队列中的消息,返回删除的消息数量({ messageCount: number })。 |
bindQueue |
(queue: string, exchange: string, pattern: string, args?: Object) |
将队列绑定到交换器,pattern 为路由键(或匹配规则)。 |
unbindQueue |
(queue: string, exchange: string, pattern: string, args?: Object) |
解绑队列与交换器的绑定关系。 |
assertExchange |
(exchange: string, type: 'direct' | 'topic' | 'headers' | 'fanout' | string, options?: { durable?: boolean, autoDelete?: boolean, internal?: boolean, arguments?: Object }) |
声明交换器,若不存在则创建。type 支持标准类型或自定义。 |
checkExchange |
(exchange: string) |
检查交换器是否存在。 |
deleteExchange |
(exchange: string, options?: { ifUnused?: boolean }) |
删除交换器。ifUnused 表示仅当无绑定时删除。 |
bindExchange |
(destination: string, source: string, pattern: string, args?: Object) |
绑定两个交换器(目标交换器绑定到源交换器)。 |
unbindExchange |
(destination: string, source: string, pattern: string, args?: Object) |
解绑两个交换器。 |
publish |
(exchange: string, routingKey: string, content: Buffer, options?: { persistent?: boolean, expiration?: string, headers?: Object, ... }) |
向交换器发布消息。options 支持消息属性(如持久化、TTL、优先级等)。 |
sendToQueue |
(queue: string, content: Buffer, options?: { persistent?: boolean, ... }) |
直接发送消息到队列(绕过交换器)。 |
consume |
(queue: string, onMessage: (msg: ConsumeMessage | null) => void, options?: { noAck?: boolean, exclusive?: boolean, ... }) |
注册消费者,处理队列中的消息。返回消费者标签(consumerTag)。 |
cancel |
(consumerTag: string) |
取消指定标签的消费者。 |
get |
(queue: string, options?: { noAck?: boolean }) |
同步获取单条消息(非推送模式)。返回 ConsumeMessage 或 null(空队列)。 |
ack |
(message: Message, allUpTo?: boolean) |
确认单条消息。allUpTo 为 true 时确认当前消费者之前所有未确认的消息。 |
ackAll |
() |
确认当前 Channel 中所有未确认的消息。 |
nack |
(message: Message, allUpTo?: boolean, requeue?: boolean) |
否定确认单条消息,可选择重新入队(requeue)。 |
nackAll |
(requeue?: boolean) |
否定确认所有未确认的消息,可选择重新入队。 |
reject |
(message: Message, requeue?: boolean) |
拒绝单条消息(类似 nack 但仅针对一条)。 |
prefetch |
(count: number, global?: boolean) |
设置 QoS 预取限制(未确认消息数上限)。global 为 true 时作用于整个连接。 |
recover |
() |
重新投递未确认的消息(需服务器支持,非所有场景可用)。 |
事件监听
Connection 和 Channel 对象继承自 EventEmitter,支持使用 on 和 off 等方法监听事件:
Connection 事件: 事件与源库保持一致,可查看源库接口文档
error:连接发生错误时触发,回调参数为错误对象close:连接关闭时触发blocked:连接被服务器阻塞时触发unblocked:连接解除阻塞时触发update-secret-ok: 在收到代理确认连接秘密成功更新后触发
Channel 事件: 事件与源库保持一致,可查看源库接口文档
error:通道被服务器关闭时触发(如参数冲突、权限错误等),回调参数为错误对象。注意:Promise rejection不会触发此事件close:通道关闭时触发return:消息无法路由时触发(需设置mandatory标志)drain:写缓冲区排空时触发(可恢复发送消息)
使用示例:
// 监听连接错误
connection.on('error', (err) => {
console.error('Connection error:', err);
});
// 监听通道关闭
channel.on('close', () => {
console.log('Channel closed');
});
// 移除事件监听
channel.off('close', closeHandler);
关于混淆
- 代码混淆,请查看代码混淆简介
- 如果希望amqplib库在代码混淆过程中不会被混淆,需要在混淆规则配置文件obfuscation-rules.txt中添加相应的排除规则:
-keep
./oh_modules/@ohos/amqplib
约束与限制
在下述版本验证通过:
-
DevEco Studio: NEXT Beta1-5.0.3.806, SDK: API12 Release(5.0.0.66)
-
DevEco Studio 版本: 4.1 Canary(4.1.3.317) OpenHarmony SDK:API11 (4.1.0.36)
目录结构
|---- amqplib
| |---- entry # 示例代码文件夹
| |---- library # amqplib库文件夹
| |---- ets
| |---- lib # 主要依赖
| |---- types # 对外接口文件夹
| |---- callback_api.d.ts # 交换机连接交互相关接口文件
| |---- index.d.ts # 主对外接口文件
| |---- properties.d.ts # 参数类型等对外声明文件
| |---- callback_api.js # callback回调脚本
| |---- channel_api.js # promise回调脚本
| |---- README.md # 安装使用方法
| |---- README_zh.md # 安装使用方法
贡献代码
使用过程中发现任何问题都可以提 Issue ,当然,也非常欢迎提 PR 。
开源协议
本项目基于 MIT LICENSE ,请自由地享受和参与开源。