RabbitMQ 初体验(安装,概念及应用)

之前工作中用过 JMS 的 IBM MQSeries, 自己试玩过 ActiveMQ, 再就是 Kafka, 再到 AWS 上的 SQS 等消息队列。打算调教一下 Python 的 Celery,它首推用 RabbitMQ 作为它的消息,当然也可选择 Redis 或 AWS 的 SQS,首先感觉有必要体验一下 RabbitMQ。

RabbitMQ 是一个 AMQP(Advanced Message Queuing Protocol) 的开源实现, 相关的实现产品还有 OpenAMQ, StormMQ, Apache Qpid, Red Hat Enterprise MRG, Microsoft Azure Service Bus 等,AMQP 与 JMS 还存在一些交集。

本文不打算介绍太多的 RabbitMQ 的一些概念,主要是体验一下如何安装,怎么发送和接受消息,初次体验就不直接上 Docker 了,用 Docker 根本不知道 RabbitMQ 是个什么东西,所以用一个 Ubuntu 20.04 虚拟机来一步步安装。

先用 Vagrant 准备一个虚拟机,Vagrantfile 文件内容如下

注:Vagrant 如何管理虚拟机请参考我之前的一篇博客 Vagrant 简介与常用操作及配置

先启动并 SSH 登陆该虚拟机

$ vagrant up
$ vagrant ssh

现在进到了 Ubuntu 20.04 系统中了,下面是我自己能顺序安装的过程,跳过了别的安装向导中一些非必要的步骤

vagrant@rabbitmq:~$ sudo apt install rabbitmq-server

安装后服务 rabbitmq-server 会自动启动。

有些向导中可能说要求安装 Erlang(因为 rabbitmp-server 是用 Erlang 语言写的), 加上 RabbitMQ Repository 和 APT, 然后 sudo apt update 再安装 rabbitmq-server 本身,这些都非必要的。

启用 rabbitmq_management 插件

vagrant@rabbitmq:~$ sudo rabbitmq-plugins enable rabbitmq_management

这时候用浏览器打开 http://localhost:15672 就能用 guest/guest 登陆,但 guest 仅限于本地登陆。由于是 Vagrant 虚拟机中安装的,无法用 localhost 来登陆,先得创建一个 admin 的管理员帐户

vagrant@rabbitmq:~$ sudo rabbitmqctl add_user admin password
vagrant@rabbitmq:~$ sudo rabbitmqctl set_user_tags admin administraor

现在找到虚拟机的 IP 地址,比如 192.168.86.50, 所以打开 http://192.168.86.50:15672, 用前面添加的 admin/password 登陆

之后就能在这个界面里查看管理 Connection, Channel, Exchange 和 Queue 了。

RabbitMQ 还提供了完美的管理 API, 本例中通过 http://192.168.86.50:15672/api 查看所有 API 的使用说明,也就是用 curl 带上用户名和密码就能做和 rabbitmqadmin 一样的事情。

RabbitMQ 启动后,要试下如何发送和接收消息,消息服务通常的模型是

Producer   ---------------> Queue(Topic)  ------------------> Consumer

我们这儿用的是 AMQP 的实现,所以有必要了解下 AMQP 的消息模型,参考 AMPQ 0-9-1 Model Explained(对于 AMQP 应用的话,这篇文章值得细读)

简单说就是 Publisher(Producer) 是与 Exchange 交互的,消息中携带一个 routing key, Exchange 根据 routing key 把消息放到相应的 Queue 中,Consumer 从有兴趣的 Queue 中获取消息。

在正式用编程的方式应用 RabbitMQ 之前,我们先使用 rabbitmqadmin 来查看管理 RabbitMQ 的资源,发布和接收消息。此处参考的官方的文档 Management Command Line Toolrabbitmqadmin 是一个 Python 脚本。

本地执行 rabbitmqadmin 命令不用指定 host, username 和 password, 如果连接某个 RabbitMQ broker, 就需加上连接,用户验证的参数

$ rabbimqadmin --host 192.168.86.50 --username=admin --password=password list queues

来看对于一个刚初始的 RabbitMQ 下 rabbitmqadmin  能看到的内容

发送一条消息

vagrant@rabbitmq:~$ rabbitmqadmin publish  routing_key=test payload="hello, world"
Message published but NOT routed

没有用 exchange 参数指定 exchange 的话会使用默认的 "". 比如可以使用前面用 list exchanges 列出的某一个,exchange=amq.direct, 前提是该 exchange 与 queue 进行了绑定。

消息发送成功但没有路由,原因是没有声明一个 queue 与该 routing_key 关联,或选择的 exchange 没有绑定任何 queue. 先创建一个 queue

vagrant@rabbitmq:~$ rabbitmqadmin declare queue name=my-new-queue durable=false
queue declared

参数 durable 是否持久化该 queue,如果为 false, 在 rabbitmq 服务重启扣该 queue 就消失了, queue 中的消息在 rabbitmqctl stop_app/start_app 之后都会消失。auto_delete 是否自动删除(有过消费,并在所有消费者解除订阅后自动删除 queue),默认的 vhost 是 /, 可用 --vhost 指定不同的 vhost

然后再发送消息,仍然选择默认的 exchange,因为默认 exchange 与任何 queue 都有绑定, routing key 就是 queue 名称

vagrant@rabbitmq:~$ rabbitmqadmin publish routing_key=my-new-queue payload="hello, world"
Message published

注意 routing_key 就是前面创建的 queue  my-new-queue,就是告诉 exchange 要把消息路由到 routing_key 指定的 queue 中去。我们还能创建自己的 exchange

vagrant@rabbitmq:~$ rabbitmqadmin declare exchange name=my-new-exchange type=direct
exchange declared

在 publish 消息的时候可指定自己新创建的 exchange。如果直接使用新建的 exchange 发送消息

vagrant@rabbitmq:~$ rabbitmqadmin publish exchange=my-new-exchange routing_key=my-new-queue payload="hello, world"
Message published but NOT routed

原因就是上面提到的 exchange 与 queue 未建立绑定,要能成功发送消息到 queue 中去,所以建立绑定再发送消息

vagrant@rabbitmq:~$ rabbitmqadmin declare binding source=my-new-exchange destination=my-new-queue routing_key=my-new-queue
binding declared
vagrant@rabbitmq:~$ rabbitmqadmin publish exchange=my-new-exchange routing_key=my-new-queue payload="hello, world"
Message published

如果需要 RabbitMQ 的消息持久化, 比如 RabbitMQ 崩溃后,或用 rabbitmqctl stop_app|start_app 后消息还在 queue 中,除了 queue 的 durable=true 外,发送消息时还带上额外属性

vagrant@rabbitmq:~$ rabbitmqadmin publish routing_key=task_queue payload="hello 3" properties='{"delivery_mode":2}'

delivery_mode=1 不持久化消息,2 持久化

接收消息

再执行相同的 rabbitmqadmin get queue=my-new-queue ackmode=ack_requeue_false 命令,消息就不见了。如果用 ackmode=ack_requeue_true 就能多次 get 该消息。RabbitMQ 的 ackmode 和 Kafka 的 auto commit, manual commit 是一样的,可以自动 ack, 或处理完消息后手动的 ack.

到这里一个完整的消息传送过程和 RabbitMQ 的工作方式差不多清楚了,至于 vhost, vhost 如何与 exchange, queue 绑定, Node, Cluster 等如何配置,那是更高级的内容,不应该在本文中展开。

编程实现消息的发送与接收

前面命令方式发送和接收消息就是为编程作准备的,以前演示一般都是选择 Java, 现在要感谢 Python, 它比 Java 代码有更直观自然的表现力,而且代码更精炼,加之 JupyterLab 是个演练 Python 的好战场。Python 要用到的包是 pika

$ pip install pika

等会,还有一件事要做,不过等到程序报错来修改 admin 的权限也不迟。我们将要在代码中使用 admin 用户,先要给它访问 vhost / 的权限,需要 RabbitMQ 服务器的控制台下执行

vagrant@rabbitmq:~$ sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
Setting permissions for user "admin" in vhost "/" ...

没有 vhost / 的访问权限会报错:ConnectionClosedByBroker: (530) "NOT_ALLOWED - access to vhost '/' refused for user 'admin'

Python 发送消息

代码看起来非常简单,程序连接 Brocker, 再打开一个通道 channel 就能发送消息了。写代码的时候需要知道 exchange, binding 以及 queue 是什么,而且有可能把消息发往了错误的 queue。而 Kafka 只需要知道 Queue 是什么。

完后用 rabbitmqadmin get 命令我们看到新的消息进入了队列

由于我们用了 ackmode=ack_requeue_true, 所以消息仍然在队列中,接下来用

Python 接收消息

连接并进入通道和 producer 是一样的,最后 channel.start_consumming() 进入死循环。接着用之前的 Producer 发送多条消息,我们给消息加上编号 1, 2, 3, 这时看到 Consumer 端的输出为

yanbin@ubuntu-server:~$ python consumer.py
[*] Waiting for messages. To exit press CTRL+C
Received message: 1: Hello Python
Received message: 2: Hello Python
Received message: 3: Hello Python

在 callback 函数中,我们可以适时的利用其他参数 ch, method, properties 中提供的信息。

进入 RabbitMQ 高级应用的准备

相对于 Kafka 的 Broker, Partition, Topic, Consumer Group 仅有的几个概念, RabbitMQ 涵盖了更多的概念,诸如 Broker, vhost, Exchange, Queue, Routing key, Channel,,还有 Connection, Producer,Consumer 那是它们共有的概念。如果建立 Cluster 应该就是在 Broker 之上的。我们快速了解一下 RabbitMQ(或者说是 AMQP) 的 Broker, Vhost, Exchange, Queu, Routing key, Channel 的概念

Broker:它是消息队列的服务器实体,一个运行了 RabbitMQ 服务的进程

消息:每条消息要有一个路由键(routing key), 一个简单的字符串,它将决定自己经过 Exchange, Binding 规则最终送到哪个队列

vhost(Virtual Host): RabbitMQ 服务器中的虚拟主机,像一个 Namespace。每个 vhost 拥有自己的 exchange, queue, binding 和权限控制。RabbitMQ 有一个默认的 vhost /,用 guest/guest 就能访问

Channel: 几乎所有的操作都在 Channel 中进行,它是消息读的通道。客户端可建立多个 Channel, 每个 Channel 代表一个会话

Exchange: 接收消息的交换机,根据 routing key 转换(点对点,组播,多播,广播等)到相应的 queue 中去

Binding: 基于 routing key 把 exchange 与队列关联起来的路由规则。这样看来 Exchange 更像是个路由器,其中的 Binding 构建了它的路由表

Queue: 最终存放消息的队列,一条消息可被投入到一个或多个队列,再等待消费者将其取走

Exchange

发往 Exchange 中的消息要能正确到达预定的 Queue, 就必须事先建立好相应的 Binding

Exchange 有四种类型:direct, topic, headers 和 fanout. 每种类型有不同的算法根据 Binding, routing key 投递消息到相应的 Queue 中去,算法就会影响到 CPU 开销。

direct 类型(精确匹配,单播):完全匹配 routing key 和 queue 的名称来投递。比如 routing key 是 my-new-queue 就直接把消息丢到名称为 my-new-queue 的队列中去

topic 类型(模式匹配,组播):按模式匹配由 routing key 找到相应的 queue. bind key 和 routing key 是点分割的单词# 匹配 0 个或多个单词,* 匹配一个单词,不知道谁定义了这种有违常理的匹配规则。比如 binding key *.stock.# 匹配 routing -key: usa.stockeur.stock.db, usa.stock.nasdaq.aapl

fanout 类型(广播): 不处理 routing key,简单的把消息发送到所有绑定的 queue

headers: 根据定义的 key-value 属性来转发消息,如 format=pdf 之类

RabbitMQ 常用的命令

当在终端输入 rabbit 后双击  tab 键显示出 rabbitmqadmin, rabbitmqctl, rabbitmq-diagnostics, rabbitmq-plugins, rabbitmq-queues, rabbitmq-server, rabbitmq-upgrade, 其中的 rabbitmqctl, rabbitmq-server 和  rabbitmqadmin 三个命令尤为重要,它们其实有互为重叠的功能,还有可以使用 RabbitMQ 提供的 RESTful API

常用的

  1. 启动/禁用管理器:rabbitmq-plugins enable|disable rabbitmq_management
  2. 彻底的启动和关闭 RabbitMQ, 包括 Erlang VM, broker: rabbitmq-server start|stop
  3. 启动/关闭应用(依赖于 rabbitmq-server):rabbitmqctl start_app|stop_app
  4. 新增虚拟机:rabbitmqctl add_vhost vhost_name
  5. 添加用户:rabbitmqctl add_user username password
  6. 分配角色:rabbitmapctl set_user_tags username administrator
  7. 将虚拟主机授权给用户:rabbitmqctl set_permissions -p vhost_name username ".*" ".*" ".*" (后面三个 ".*" 代表用户有配置,写,读全部权限)

用 rabbitmqctl --help 查看所有的功能,它有很多 list-xxx 的功能,如 list_queues, list_exchanges, list_consuemrs 等,有些资源管理的功能就要用 rabbitmqadmin 命令了。

链接:

  1. Install RabbitMQ Server on Ubuntu 20.04 | 18.04
  2. AMQP 0-9-1 Model Explained
  3. Management Command Line Tool
  4. Part 4: RabbitMQ Exchanges, routing keys and bindings

本文链接 https://yanbin.blog/rabbitmq-installation-concepts-usage/, 来自 隔叶黄莺 Yanbin Blog

[版权声明] Creative Commons License 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。

Subscribe
Notify of
guest

0 Comments
Inline Feedbacks
View all comments