之前工作中用过 JMS 的 IBM MQSeries, 自己试玩过 ActiveMQ, 再就是 Kafka, 再到 AWS 上的 SQS 等消息队列。打算调教一下 Python 的 Celery,它首推用 RabbitMQ 作为它的消息,当然也可选择 Redis 或 AWS 的 SQS,首先感觉有必要体验一下 RabbitMQ。
RabbitMQ 是一个 AMQP(Advanced Message Queuing Protocol) 的开源实现, 相关的实现产品还有 OpenAMQ, StormMQ, Apache Qpid, Red Hat Enterprise MRG, Microsoft Azure Service Bus 等,AMQP 与 JMS 还存在一些交集。
本文不打算介绍太多的 RabbitMQ 的一些概念,主要是体验一下如何安装,怎么发送和接受消息,初次体验就不直接上 Docker 了,用 Docker 根本不知道 RabbitMQ 是个什么东西,所以用一个 Ubuntu 20.04 虚拟机来一步步安装。
先用 Vagrant 准备一个虚拟机,Vagrantfile
文件内容如下
1 2 3 4 5 6 7 |
Vagrant.configure("2") do |config| config.vm.box = "generic/ubuntu2004" config.vm.network "public_network" config.vm.define "rabbitmq" config.vm.hostname = "rabbitmq" end |
注:Vagrant 如何管理虚拟机请参考我之前的一篇博客 Vagrant 简介与常用操作及配置
先启动并 SSH 登陆该虚拟机
$ vagrant up
$ vagrant ssh
现在进到了 Ubuntu 20.04 系统中了,下面是我自己能顺序安装的过程,跳过了别的安装向导中一些非必要的步骤
vagrant@rabbitmq:~$ sudo apt install rabbitmq-server
安装后服务 rabbitmq-server
会自动启动。
有些向导中可能说要求安装 Erlang(因为 rabbitmp-server 是用 Erlang 语言写的), 加上 RabbitMQ Repository 和 APT, 然后 sudo apt update 再安装 rabbitmq-server 本身,这些都非必要的。
启用 rabbitmq_management 插件
vagrant@rabbitmq:~$ sudo rabbitmq-plugins enable rabbitmq_management
这时候用浏览器打开 http://localhost:15672 就能用 guest/guest 登陆,但 guest 仅限于本地登陆。由于是 Vagrant 虚拟机中安装的,无法用 localhost 来登陆,先得创建一个 admin 的管理员帐户
vagrant@rabbitmq:~$ sudo rabbitmqctl add_user admin password
vagrant@rabbitmq:~$ sudo rabbitmqctl set_user_tags admin administraor
现在找到虚拟机的 IP 地址,比如 192.168.86.50, 所以打开 http://192.168.86.50:15672, 用前面添加的 admin/password 登陆
之后就能在这个界面里查看管理 Connection, Channel, Exchange 和 Queue 了。
RabbitMQ 还提供了完美的管理 API, 本例中通过 http://192.168.86.50:15672/api 查看所有 API 的使用说明,也就是用 curl 带上用户名和密码就能做和 rabbitmqadmin
一样的事情。
RabbitMQ 启动后,要试下如何发送和接收消息,消息服务通常的模型是
Producer ---------------> Queue(Topic) ------------------> Consumer
我们这儿用的是 AMQP 的实现,所以有必要了解下 AMQP 的消息模型,参考 AMPQ 0-9-1 Model Explained(对于 AMQP 应用的话,这篇文章值得细读)
简单说就是 Publisher(Producer) 是与 Exchange 交互的,消息中携带一个 routing key, Exchange 根据 routing key 把消息放到相应的 Queue 中,Consumer 从有兴趣的 Queue 中获取消息。
在正式用编程的方式应用 RabbitMQ 之前,我们先使用 rabbitmqadmin
来查看管理 RabbitMQ 的资源,发布和接收消息。此处参考的官方的文档 Management Command Line Tool。rabbitmqadmin
是一个 Python 脚本。
本地执行 rabbitmqadmin 命令不用指定 host, username 和 password, 如果连接某个 RabbitMQ broker, 就需加上连接,用户验证的参数
$ rabbimqadmin --host 192.168.86.50 --username=admin --password=password list queues
来看对于一个刚初始的 RabbitMQ 下 rabbitmqadmin
能看到的内容
12345678910111213141516171819202122232425262728293031323334 vagrant@rabbitmq:~$ rabbitmqadmin list vhosts+------+----------+| name | messages |+------+----------+| / | |+------+----------+vagrant@rabbitmq:~$ rabbitmqadmin list nodes+-------------------+------+----------+| name | type | mem_used |+-------------------+------+----------+| rabbit@rabbitmq | disc | 86319104 |+-------------------+------+----------+vagrant@rabbitmq:~$ rabbitmqadmin list vhosts+------+----------+| name | messages |+------+----------+| / | |+------+----------+vagrant@rabbitmq:~$ rabbitmqadmin list exchanges+--------------------+---------+| name | type |+--------------------+---------+| | direct || amq.direct | direct || amq.fanout | fanout || amq.headers | headers || amq.match | headers || amq.rabbitmq.trace | topic || amq.topic | topic |+--------------------+---------+vagrant@rabbitmq:~$ rabbitmqadmin list queuesNo itemsvagrant@rabbitmq:~$ rabbitmqadmin list connectionsNo items
发送一条消息
vagrant@rabbitmq:~$ rabbitmqadmin publish routing_key=test payload="hello, world"
Message published but NOT routed
没有用 exchange
参数指定 exchange 的话会使用默认的 "". 比如可以使用前面用 list exchanges 列出的某一个,exchange=amq.direct
, 前提是该 exchange 与 queue 进行了绑定。
消息发送成功但没有路由,原因是没有声明一个 queue 与该 routing_key 关联,或选择的 exchange 没有绑定任何 queue. 先创建一个 queue
vagrant@rabbitmq:~$ rabbitmqadmin declare queue name=my-new-queue durable=false
queue declared
参数 durable 是否持久化该 queue,如果为 false, 在 rabbitmq 服务重启扣该 queue 就消失了, queue 中的消息在 rabbitmqctl stop_app/start_app 之后都会消失。auto_delete 是否自动删除(有过消费,并在所有消费者解除订阅后自动删除 queue),默认的 vhost 是 /
, 可用 --vhost
指定不同的 vhost
然后再发送消息,仍然选择默认的 exchange,因为默认 exchange 与任何 queue 都有绑定, routing key 就是 queue 名称
vagrant@rabbitmq:~$ rabbitmqadmin publish routing_key=my-new-queue payload="hello, world"
Message published
注意 routing_key 就是前面创建的 queue my-new-queue
,就是告诉 exchange 要把消息路由到 routing_key 指定的 queue 中去。我们还能创建自己的 exchange
vagrant@rabbitmq:~$ rabbitmqadmin declare exchange name=my-new-exchange type=direct
exchange declared
在 publish 消息的时候可指定自己新创建的 exchange。如果直接使用新建的 exchange 发送消息
vagrant@rabbitmq:~$ rabbitmqadmin publish exchange=my-new-exchange routing_key=my-new-queue payload="hello, world"
Message published but NOT routed
原因就是上面提到的 exchange 与 queue 未建立绑定,要能成功发送消息到 queue 中去,所以建立绑定再发送消息
vagrant@rabbitmq:~$ rabbitmqadmin declare binding source=my-new-exchange destination=my-new-queue routing_key=my-new-queue
binding declared
vagrant@rabbitmq:~$ rabbitmqadmin publish exchange=my-new-exchange routing_key=my-new-queue payload="hello, world"
Message published
如果需要 RabbitMQ 的消息持久化, 比如 RabbitMQ 崩溃后,或用 rabbitmqctl stop_app|start_app 后消息还在 queue 中,除了 queue 的 durable=true 外,发送消息时还带上额外属性
vagrant@rabbitmq:~$ rabbitmqadmin publish routing_key=task_queue payload="hello 3" properties='{"delivery_mode":2}'
delivery_mode=1 不持久化消息,2 持久化
接收消息
1 2 3 4 5 6 |
vagrant@rabbitmq:~$ rabbitmqadmin get queue=my-new-queue ackmode=ack_requeue_false +--------------+----------+---------------+--------------+---------------+------------------+------------+-------------+ | routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | properties | redelivered | +--------------+----------+---------------+--------------+---------------+------------------+------------+-------------+ | my-new-queue | | 0 | hello, world | 12 | string | | False | +--------------+----------+---------------+--------------+---------------+------------------+------------+-------------+ |
再执行相同的 rabbitmqadmin get queue=my-new-queue ackmode=ack_requeue_false
命令,消息就不见了。如果用 ackmode=ack_requeue_true
就能多次 get 该消息。RabbitMQ 的 ackmode 和 Kafka 的 auto commit, manual commit 是一样的,可以自动 ack, 或处理完消息后手动的 ack.
到这里一个完整的消息传送过程和 RabbitMQ 的工作方式差不多清楚了,至于 vhost, vhost 如何与 exchange, queue 绑定, Node, Cluster 等如何配置,那是更高级的内容,不应该在本文中展开。
编程实现消息的发送与接收
前面命令方式发送和接收消息就是为编程作准备的,以前演示一般都是选择 Java, 现在要感谢 Python, 它比 Java 代码有更直观自然的表现力,而且代码更精炼,加之 JupyterLab 是个演练 Python 的好战场。Python 要用到的包是 pika
$ pip install pika
等会,还有一件事要做,不过等到程序报错来修改 admin
的权限也不迟。我们将要在代码中使用 admin
用户,先要给它访问 vhost /
的权限,需要 RabbitMQ 服务器的控制台下执行
vagrant@rabbitmq:~$ sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
Setting permissions for user "admin" in vhost "/" ...
没有 vhost /
的访问权限会报错:ConnectionClosedByBroker: (530) "NOT_ALLOWED - access to vhost '/' refused for user 'admin'
Python 发送消息
1 2 3 4 5 6 7 8 9 10 |
import pika credentials = pika.PlainCredentials('admin', 'password') connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.86.50',credentials=credentials)) channel = connection.channel() channel.basic_publish(exchange='', routing_key='my-new-queue', body='Hello Python') connection.close() |
代码看起来非常简单,程序连接 Brocker, 再打开一个通道 channel 就能发送消息了。写代码的时候需要知道 exchange, binding 以及 queue 是什么,而且有可能把消息发往了错误的 queue。而 Kafka 只需要知道 Queue 是什么。
完后用 rabbitmqadmin get
命令我们看到新的消息进入了队列
1 2 3 4 5 6 |
vagrant@rabbitmq:~$ rabbitmqadmin get queue=my-new-queue ackmode=ack_requeue_true +--------------+----------+---------------+--------------+---------------+------------------+------------+-------------+ | routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | properties | redelivered | +--------------+----------+---------------+--------------+---------------+------------------+------------+-------------+ | my-new-queue | | 0 | Hello Python | 12 | string | | False | +--------------+----------+---------------+--------------+---------------+------------------+------------+-------------+ |
由于我们用了 ackmode=ack_requeue_true
, 所以消息仍然在队列中,接下来用
Python 接收消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import pika credentials = pika.PlainCredentials('admin', 'password') connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.86.50',credentials=credentials)) channel = connection.channel() def callback(ch, method, properties, body): print(f'Received message: {body.decode()}') channel.basic_consume(queue='my-new-queue', on_message_callback=callback, auto_ack=True) print('[*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() |
连接并进入通道和 producer 是一样的,最后 channel.start_consumming()
进入死循环。接着用之前的 Producer 发送多条消息,我们给消息加上编号 1, 2, 3, 这时看到 Consumer 端的输出为
yanbin@ubuntu-server:~$ python consumer.py
[*] Waiting for messages. To exit press CTRL+C
Received message: 1: Hello Python
Received message: 2: Hello Python
Received message: 3: Hello Python
在 callback 函数中,我们可以适时的利用其他参数 ch, method, properties 中提供的信息。
进入 RabbitMQ 高级应用的准备
相对于 Kafka 的 Broker, Partition, Topic, Consumer Group 仅有的几个概念, RabbitMQ 涵盖了更多的概念,诸如 Broker, vhost, Exchange, Queue, Routing key, Channel,,还有 Connection, Producer,Consumer 那是它们共有的概念。如果建立 Cluster 应该就是在 Broker 之上的。我们快速了解一下 RabbitMQ(或者说是 AMQP) 的 Broker, Vhost, Exchange, Queu, Routing key, Channel 的概念
Broker:它是消息队列的服务器实体,一个运行了 RabbitMQ 服务的进程
消息:每条消息要有一个路由键(routing key), 一个简单的字符串,它将决定自己经过 Exchange, Binding 规则最终送到哪个队列
vhost(Virtual Host): RabbitMQ 服务器中的虚拟主机,像一个 Namespace。每个 vhost 拥有自己的 exchange, queue, binding 和权限控制。RabbitMQ 有一个默认的 vhost /
,用 guest/guest 就能访问
Channel: 几乎所有的操作都在 Channel 中进行,它是消息读的通道。客户端可建立多个 Channel, 每个 Channel 代表一个会话
Exchange: 接收消息的交换机,根据 routing key 转换(点对点,组播,多播,广播等)到相应的 queue 中去
Binding: 基于 routing key 把 exchange 与队列关联起来的路由规则。这样看来 Exchange 更像是个路由器,其中的 Binding 构建了它的路由表
Queue: 最终存放消息的队列,一条消息可被投入到一个或多个队列,再等待消费者将其取走
Exchange
发往 Exchange 中的消息要能正确到达预定的 Queue, 就必须事先建立好相应的 Binding
Exchange 有四种类型:direct, topic, headers 和 fanout. 每种类型有不同的算法根据 Binding, routing key 投递消息到相应的 Queue 中去,算法就会影响到 CPU 开销。
direct 类型(精确匹配,单播):完全匹配 routing key 和 queue 的名称来投递。比如 routing key 是 my-new-queue 就直接把消息丢到名称为 my-new-queue 的队列中去
topic 类型(模式匹配,组播):按模式匹配由 routing key 找到相应的 queue. bind key 和 routing key 是点分割的单词。#
匹配 0 个或多个单词,*
匹配一个单词,不知道谁定义了这种有违常理的匹配规则。比如 binding key *.stock.#
匹配 routing -key: usa.stock
, eur.stock.db
, usa.stock.nasdaq.aapl
fanout 类型(广播): 不处理 routing key,简单的把消息发送到所有绑定的 queue
headers: 根据定义的 key-value 属性来转发消息,如 format=pdf 之类
RabbitMQ 常用的命令
当在终端输入 rabbit 后双击 tab 键显示出 rabbitmqadmin, rabbitmqctl, rabbitmq-diagnostics, rabbitmq-plugins, rabbitmq-queues, rabbitmq-server, rabbitmq-upgrade, 其中的 rabbitmqctl, rabbitmq-server 和 rabbitmqadmin 三个命令尤为重要,它们其实有互为重叠的功能,还有可以使用 RabbitMQ 提供的 RESTful API
常用的
- 启动/禁用管理器:rabbitmq-plugins enable|disable rabbitmq_management
- 彻底的启动和关闭 RabbitMQ, 包括 Erlang VM, broker: rabbitmq-server start|stop
- 启动/关闭应用(依赖于 rabbitmq-server):rabbitmqctl start_app|stop_app
- 新增虚拟机:rabbitmqctl add_vhost vhost_name
- 添加用户:rabbitmqctl add_user username password
- 分配角色:rabbitmapctl set_user_tags username administrator
- 将虚拟主机授权给用户:rabbitmqctl set_permissions -p vhost_name username ".*" ".*" ".*" (后面三个 ".*" 代表用户有配置,写,读全部权限)
用 rabbitmqctl --help 查看所有的功能,它有很多 list-xxx
的功能,如 list_queues
, list_exchanges
, list_consuemrs
等,有些资源管理的功能就要用 rabbitmqadmin
命令了。
链接:
- Install RabbitMQ Server on Ubuntu 20.04 | 18.04
- AMQP 0-9-1 Model Explained
- Management Command Line Tool
- Part 4: RabbitMQ Exchanges, routing keys and bindings
本文链接 https://yanbin.blog/rabbitmq-installation-concepts-usage/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。