AMQP协议学习
- Published on
- Authors
- Name
- 0neSe7en
- @0ne_Se7en
最近在阅读AMQP协议。AMQP协议算是消息队列里无法绕开的一个协议,通过阅读该协议来学习消息队列以及自有协议设计。该协议的阅读体验非常好,协议本身没有过于复杂,会解释各个地方的设计思路。
AMQP协议中的各个概念和组件
AMQP的全称:Advanced Message Queuing Protocol
AMQP所覆盖的内容包含了网络协议以及服务端服务
- 一套被称作 “高级消息队列协议模型(AMQ Model)”的消息能力定义。该模型涵盖了Broker服务中用于路由和存储消息的组件,以及把这些组件连在一起的规则。
- 一个网络层协议,AMQP。能够让客户端程序与实现了AMQ Model的服务端进行通信。
AMQP更像是一个把东西连在一起的语言,而不是一个系统。其设计目标是:让服务端可通过协议编程。
AMQP协议是一个二进制协议,具有一些现代特性:多通道(multi-channel),可协商(negotiated),异步,安全,便携,中立,高效的。分成两层:
**功能层:**定义了一系列的命令
**传输层:**携带了从应用 → 服务端的方法,用于处理多路复用、分帧、编码、心跳、data-representation、错误处理。
这样分层之后,可以把传输层替换为其它传输协议,而不需要修改功能层。同样,也可以使用同样的传输层,而实现不同的上层协议。可能RabbitMQ也是因为类似的原因,能够较容易的支持MQTT、STOMP等协议的吧。
AMQ Model的设计是由以下需求驱动的:
- 确保符合标准的实现之间的互操作性。
- 提供清晰且直接的控制QoS
- 保持一致和明确的命名
- 通过协议,能够修改服务端的各种配置
- 使用可以轻松映射到应用程序级API的命令符号。
- 清晰,每个操作只能做一件事。
AMQP传输层是由以下需求驱动的
- 紧凑。能够快速封包和解包
- 可以携带任意大小的消息,没有明显的限制
- 同一个连接可以承载多个通道
- 长时间存活,没有显著的限制
- 允许异步命令流水线
- 容易扩展。易于处理新需求、或者变更需求
- 向前兼容
- 使用强大的断言模型,可修复
- 对编程语言保持中立
- 适合代码生成过程
在设计过程中,希望支持不同的消息架构:
- 先存后发。有多个writer,只有一个reader
- 分散工作负载。多个writer,多个reader
- 发布订阅,多个writer,多个reader
- 基于内容的路由,多个writer,多个reader
- 队列文件传输,多个writer,多个reader
- 两个节点之间,点对点连接
- 市场数据(Market data)分发。多个源,多个reader
AMQ Model
主要包含了三个主要的组件:
exchange
(交换器):从Publisher程序中收取消息,并把这些消息根据一些规则路由到消息队列(Message Queue)中message queue
(消息队列):存储消息。直到消息被安全的投递给了消费者。binding
:定义了message queue
和exchange
之间的关系,提供了消息路由的规则。
AMQ Model的整体架构
可以把AMQP的架构理解为一个邮件服务:
- 一个AMQP消息类似于一封邮件信息
- 消息队列类似于一个邮箱(mailbox)
- 消费者类似一个邮件客户端,能够拉取和删除邮件。
- 交换器类似一个MTA(邮件服务器)。检查邮件,基于邮件里的路由信息、路由表,来决定如何把邮件发送到一个或多个邮箱里。
- Routing Key类似于邮件中的
To:
,Cc:
,Bcc:
的地址。不包含服务端信息。 - 每一个交换器实例,类似于各个MTA进程。用于处理不同子域名的邮件,或者特定类型的邮件。
Binding
类似于MTA中的路由表。
在AMQP里,生产者直接把消息发到服务端,然后服务端把这些消息路由到邮箱中。消费者直接从邮箱里取消息。但在AMQP之前的很多中间件中,发布者是直接把消息发到对应的邮箱里(存储发布队列),或者直接发到邮件列表里(类似topic订阅)。
这里的区别在于,用户可以控制消息队列和交换器的保定规则,而不是内部的代码。这样就可以做很多有趣的事情。比如定义一个这样的规则:”把所有包含这样和这样Header的消息,都复制一份到这个消息队列中。“
而这一点也是我认为AMQP和其他一些消息队列最重要的差异。
生命周期
消息的生命周期
- **消息由生产者产生。**生产者把内容放到消息里,并设置一些属性,以及消息的路由。然后生产者把消息发给服务端
- 服务端收到消息,交换器(大部分情况)把消息路由到若干个该服务器上的消息队列中。如果这个消息找不到路由,则会丢弃或者退回给生产者(生产者可自行决定)
- 一条消息可以存在于许多消息队列中。 服务器可以通过复制消息,引用计数等方式来实现。这不会影响互操作性。 但是,将一条消息路由到多个消息队列时,每个消息队列上的消息都是相同的。 没有可以区分各种副本的唯一标识符。
- 消息到达消息队列。消息队列会立即尝试通过AMQP将其传递给消费者。 如果做不到,消息队列将消息存储(按生产者的要求存储在内存中或磁盘上),并等待消费者准备就绪。 如果没有消费者,则消息队列可以通过AMQP将消息返回给生产者(同样,如果生产者要求这样做)。
- 当消息队列可以将消息传递给消费者时,它将消息从其内部缓冲区中删除。 可以立即删除,也可以在使用者确认其已成功处理消息之后删除(ack)。 由消费者选择“确认”消息的方式和时间。 消费者也可以拒绝消息(否定确认)。
- 生产者发消息与消费者确认,被分组成一个事务。当一个应用同时扮演多个角色时:发消息,发ack,commit或者回滚事务。消息从服务端投递给消费者这个过程不是事务的。消费者对消息进行确认就够了。
在这个过程中,生产者只能把所有消息发到一个单点(交换器),而不能直接把消息发到某个消息队列(message-queue)中。
交换器(exchange)的生命周期
每个AMQP服务端都会自己创建一些交换器,这些不能被销毁。AMQP程序也可以创建其自己的交换器。AMQP并不使用 create
这个方法,而是使用 declare
方法来表示:如果不存在,则创建,存在了则继续。程序可以创建交换器用于私有使用,并在任务完成后销毁它们。虽然AMQP提供了销毁交换器的方法,但一般来讲程序不需要销户它。
队列(queue)的生命周期
队列分为两种,
- 持久化消息队列:由很多消费者共享。当消费者都退出后,队列依然存在,并会继续收集消息。
- **临时消息队列:**临时消息队列对于消费者是私有和绑定的。当消费者断开连接,则消息队列被删除。
绑定(Bindings)
绑定是交换器和消息队列之间的关系,告诉交换器如何路有消息。
Queue.Bind <queue> TO <exchange> WHERE <condition>
绑定命令的伪代码
几个经典的使用案例:共享队列、私有的回复队列、发布-订阅
构造一个共享队列
Queue.Declare queue=app.svc01 // 声明一个叫做 app.svc01 的队列
// Comsumer
Basic.Consume queue=app.svc01 // 消费者消费该队列
// Producer
Basic.Publish routing-key=app.svc01 // 生产者发布消息。routingKey为队列名称
构造一个共享队列
构造一个私有回复队列
一般来讲,回复队列是私有的、临时的、由服务端命名、只有一个消费者。(该例子没有直接使用AMQP协议中的列子,而是使用了RabbitMQ的列子)
Queue.Declare queue=rpc_queue // 调用的队列
// Server
Basic.Consume queue=rpc_queue
// Client
Queue.Declare queue=<empty> exclusive=TRUE
S:Queue.Declare-Ok queue=amq.gen-X... // AMQP服务端告诉队列名称
Basic.Publish queue=rpc_queue reply_to=amq_gen-X... // 客户端向服务端发送请求
// Server
handleMessage()
// 服务端处理好消息后,向消息列的reply-to字段中的队列发送响应
Basic.Publish exchange=<empty> routing-key={message.replay_to}
构造一个发布-订阅队列
在传统的中间件中,术语 subscription
含糊不清。至少包含两个概念:匹配消息的条件集,和一个临时队列存放匹配的消息。AMQP把这两部分拆成:binding
和message queus
。在AMQP中,并没有一个实体叫做 subscription
AMQP的发布订阅模型为:
- 给一个消费者保留消息。(一些场景下是多个消费者)
- 从多个源收集消息,比如匹配Topic、消息的字段、或者内容等方式
订阅队列与命名队列或回复队列之间的关键区别在于,订阅队列名称与路由目的无关,并且路由是根据抽象的匹配条件完成的,而不是路由键字段的一对一匹配。
// Consumer
Queue.Declare queue=<empty> exclusive=TRUE
// 这里是使用服务端下发的队列名称,并设置为独占。
// 也可以使用约定的队列名称。这样就相当于把发布-订阅模型与共享队列组合使用了
S:Queue.Declare-Ok queue=tmp.2
Queue.Bind queue=tmp.2 TO exchange=amq.topic WHERE routing-key=*.orange.*
Basic.Consume queue=tmp.2
// Producer
Basic.Publish exchange=amq.topic routing-key=quick.orange.rabbit
AMQP命令架构
中间件很复杂,所以在设计协议结构的挑战是要驯服其复杂性。方法是基于类来建立传统API模型。类中包含方法,并定义了方法明确应该做什么。
有两种不同的方式进行对话:
- 同步请求-响应。一个节点发送请求,另一个阶段发送响应。适用于性能不重要的方法。发送同步请求时,该节点直到收到回复后,才能发送下一个请求
- 异步通知。一个节点发送数据,但是不期待回复。一般用于性能很重要的地方。异步请求会尽可能快的发送消息,不等待确认。只在需要的时候在更上层(比如消费者层)实现限流等功能。AMQP中可以没有确认,要么成功,要么就会收到关闭Channel或者连接的异常。如果需要明确的追踪成功或者失败,那么应该使用事务。
AMQP中的类
Connection类
AMQP是一个长连接协议。Connection被设计为长期使用的,可以携带多个cannel。Connection的生命周期是:
- 客户端打开到服务端的TCP/IP连接,发送protocol头。这是唯一的,客户端发送的数据不能被解析为方法的。
- 服务端返回其协议版本、属性(比如支持的安全机制列表)。
the Start method
- 客户端选择安全机制
Start-Ok
- 服务端开始认证过程, 它使用SASL的质询-响应模型(challenge-response model). 它向客户端发送一个质询
Secure
- 客户端向服务端发送一个认证响应
Secure-Ok
。比如,如果使用plain
认证机制,则响应会包含登录名和密码 - 客户端重复质询
Secure
或转到协商步骤,发送一系列参数,如最大帧大小Tune
- 客户端接受,或者调低这些参数
Tune-Ok
- 客户端正式打开连接,并选择一个Vhost
Open
- 服务端确认VHost有效
Open-Ok
- 客户端可以按照预期使用连接
- 当一个节点打算结束连接
Close
- 另一个节点需要结束握手
Close-Ok
- 服务端和客户端关闭Socket连接。
如果在发送或者收到 Open
或者 Open-Ok
之前,某一个节点发现了一个错误,则必须直接关闭Socket,且不发送任何数据。
Channel类
AMQP是一个多通道协议。Channel提供了一种方式,在比较重的TCP/IP连接上建立多个轻量级的连接。这会让协议更加的防火墙友好,因为端口使用是可预知的。它也意味着流量调整和其他QoS特性也很容易支持。
Channels相互是独立的,可以同步执行不同的功能。可用带宽会在当前活动之间共享.
这里期望也鼓励多线程客户端程序应该使用 每个线程一个channel
的模型。不过,一个客户端在一个或多个AMQP服务端上打开多个连接也是可以的。
Channel的生命周期为:
- 客户端打开一个新通道
Open
- 服务端确认新通道准备就绪
Open-Ok
- 客户端和服务端按预期来使用通道.
- 一个节点关闭了通道
Close
- 另一个节点对通道关闭进行握手
Close-Ok
Exchange类
Exchange类能够让应用操作服务端的交换器。这个类能够让程序自己设置路由,而不是通过某些配置。注意:大部分程序并不需要这个级别的复杂度,过去的中间件也不只支持这个语义。
Exchange的生命周期为:
- 客户端让服务端确保该exchange存在
Declare
。客户端可以细化为:“如果交换器不存在则进行创建” 或 “如果交换器不存在,警告我,不需要创建” - 客户端向Exchange发消息
- 客户端也可以选择删掉Exchange
Delete
Queue类
该类用于让程序管理服务端上的消息队列。几乎所有的消费者应用都是基本步骤,至少要验证使用的消息队列是否存在。
一个持久化消息队列的生命周期非常简单
- 客户端断言这个消息队列存在
Declare
(设置passive
参数) - 服务端确认消息队列存在
Declare-Ok
- 客户端消息队列中读消息
一个临时消息队列的生命周期会更有趣些:
- 客户端创建消息队列
Declare
(不提供队列名称,服务器会分配一个名称)。服务端确认Declare-Ok
- 客户端在消息队列上启动一个消费者
- 客户端取消消费,可以是显示取消,也可以是通过关闭通道或者连接连接隐式取消的
- 当最后一个消费者从消息队列中消失的时候,在过了礼貌性超时后,服务端会删除消息队列
AMQP实现了Topic订阅的分发模型。这可以让订阅在合作的订阅者间进行负载均衡。涉及到额外的绑定阶段的生命周期:
- 客户端创建一个队列
Declare
,服务端确认Declare-Ok
- 客户端绑定消息队列到一个topic exchange上
Bind
,服务端确认Bind-Ok
- 客户端像之前一样使用消息队列。
Basic类
Basic实现本规范中描述的消息功能。支持如下语义:
- 从客户端→服务端发消息。异步
Publish
- 开始或者停止消费
Consume
,Cancel
- 从服务端到客户端发消息。异步
Deliver
,Return
- 确认消息
Ack
,Reject
- 同步的从消息队列中读取消息
Get
事务类:
AMQP支持两种类型的事务:
- 自动事务。每个发布的消息和应答都处理为独立事务.
- 服务端本地事务:服务器会缓存发布的消息和应答,并会根据需要由client来提交它们.
Transaction 类(“tx”) 使应用程序可访问第二种类型,即服务器事务。这个类的语义是:
- 应用程序要求服务端事务,在需要的每个channel里
Select
- 应用程序做一些工作
Publish
,Ack
- 应用程序提交或回滚工作
Commit
,Roll-back
- 应用程序正常工作,循环往复。
事务包含发布消息和ack,不包含分发。所以,回滚并不能重入队列或者重新分发任何消息。客户端有权在事务中确认这些消息。
功能说明
AMQP的功能描述,一定程度上也是RabbitMQ的功能描述,不过RabbitMQ基于AMQP做了一些扩展
消息和内容
消息会携带一些属性,以及具体内容(二进制数据)
消息是可被持久化的。持久化消息是可以安全的存在硬盘上的,即使发生了验证的网络错误、服务端崩溃溢出等情况,也可以确保被投递。
消息可以有优先级。同一个队列中,高优先级的消息会比低优先级的消息先被发送。当消息需要被丢弃时(比如服务端内存不足等),将会优先丢弃低优先级消息
服务端一定不能修改消息的内容。但服务端可能会在消息头上添加一些属性,但一定不会移除或者修改已经存在的属性。
虚拟主机(VHost)
虚拟主机是服务端的一个数据分区。在多租户使用是,可以方便进行管理。
虚拟主机有自己的命名空间、交换器、消息队列等等。所有连接,只可能和一个虚拟主机建立
交换器(Exchange)
交换器是一个虚拟主机内的消息路由Agent。用于处理消息的路由信息(一般是Routing-Key),然后将其发送到消息队列或者内部服务中。交换器可能是持久化的、临时的、自动删除的。交换器把消息路由到消息队列时可以是并行的。这会创建一个消息的多个实例。
Direct
交换器
- 一个消息队列使用RoutingKey
K
绑定到交换器 - 生产者向交换器发送RoutingKey为
R
的消息 - 当
K=R
时,消息被转发到该消息队列中
Fanout
交换器
- 一个消息队列没有使用任何参数绑定交换器
- 生产者向交换器发了一条消息
- 这个消息无条件的发送到该消息队列
Topic
交换器
- 消息队列使用路由规则
P
绑定到交换器 - 生产者使用RoutingKey
R
发送消息到交换器 - 如果
R 能够匹配 P
,则把消息发到该消息队列。
RoutingKey必须由若干个被点.
分隔的单词组成。每个单词只能包含字母和数字。其中 *
匹配一个单词,#
匹配0个或者多个单词。比如 *.stock.#
匹配 usd.stock
和 eur.stock.db
但是不匹配 stock.nasdaq
Headers
交换器
- 消息队列使用Header的参数表来绑定。不适用RoutingKey
- 生产者向交换器发送消息,Header中包含了指定的键值对
- 如果匹配,则传给消息队列。
比如:
format=json,type=log,x-match=all
format=line,type=log,x-match=any
如果 x-match
为all
,则必须都匹配才行。如果x-match
为any
,则有任意一个header匹配即可。
系统交换器
这个略过吧。
AMQP的传输架构
解释了命令如何映射到传输层的。想要设计自有协议时,可以参考一下它的设计思路,以及中间需要注意的问题。
AMQP是一个二进制协议。有不同类型的帧frame
构成。帧会携带协议的方法以及其他信息。所有的帧都有相同的基本结构,即:帧头,payload,帧尾。payload格式取决于帧的类型。
我们假设使用的是面向流的可靠网络层(比如TCP/IP)。单个Socket连接上可以有多个独立的控制线程,也就是通道Channel
。不同的通道共享一个连接,每个通道上的帧都是按严格的顺序排列,这样可以用一个状态机来解析协议。
传输层(wire-level)的格式被设计为扩展性强、且足够通用,可以用于任何更高层的协议(不仅仅是AMQP)。我们假设AMQP是会被扩展、优化的。
主要涉及这几个部分:数据类型、协议协商、分帧方式、帧细节、方法帧、内容帧、心跳帧、错误处理、通道与连接的关闭
数据类型
AMQP的数据类型用于方法帧中,他们有
- 整数(1-8个字节),表示大小,数量,范围等。全都是无符号整数
- Bits。用于表示为开/关值,会被封包为字节。
- 短字符串。用于存放短的文本属性。最多255字节,解析时不用担心缓冲区溢出。
- 长字符串:用于存放二进制数据块
- 字段表(Field Table),用于存放键值对
协议协商
客户端连接时,和服务端协商可接受的配置。当两个节点达成一致后,连接才能继续使用。通过协商,可以让我们断言假设和前提条件。主要协商这几方面的信息
- 实现的协议和版本。服务端可能会在同一端口提供多种协议的支持
- 加密参数和验证
- 最大帧尺寸、Channel的数量、某些操作的限制。
如果协商达成一致,双方会根据协商预分配缓冲区避免死锁。传入的帧如果满足协商条件,则认为其实安全的。如果超过了,那么另一方必须断开连接。
分帧方式
TCP/IP是流协议。没有内置的分帧机制。现有的协议一般有这几种方式进行分帧:
- 每个连接只发送一个帧。简单,但是慢。
- 在流中加入分隔符来分帧。简单,但是解析较慢(因为需要不断的读取,去寻找分隔符)
- 计算帧的尺寸,并在每个帧之前发送尺寸。简单且快速。也是AMQP的选择
帧细节
帧头包括:帧类型、通道、尺寸。
帧尾包含错误检测信息。
处理一个帧的步骤:
- 读帧头,检查帧类型和Channel
- 根据帧类型,读取payload并处理
- 读帧尾校验
在实现时,性能很重要的时候,我们会使用 read-ahead buffering
或者 gathering reads
去避免读帧时进行三次系统调用。
方法帧
处理方式:
- 读取方法帧的payload
- 解包为结构
- 检查方法在当前上下文中是否允许
- 检查参数是否有效
- 执行方法。
方法帧是由AMQP数据字段组成。编码代码直接从协议规范中生成,速度非常快。
内容帧
内容是端到端直接发送的应用数据。内容由一系列属性和二进制数据组成。其中一系列的属性组成了 ”内容帧的帧头“。而二进制数据,可以是任意大小,它可能被拆分成多个块发送,每个块是一个 content-body帧
一些方法(比如 Basic.Publish
,Basic.Deliver
)是会携带内容的。一个内容帧的帧头如下结构:
这里把 content-body
作为单独的帧,这样就可以支持Zero-copy技术,这部分内容就不需要被编码。把内容属性放到自己的帧里,这样收件人就可以选择性的丢弃不想处理的内容。
通道与连接的关闭
对于客户端,只要发送了 Open
就认为连接和通道是打开的。对于服务端则是 Open-Ok
。如果一个节点想要关闭通道和连接必须要进行握手。
如果突然或者意外关闭,没办法立刻被检测到,可能会导致丢失返回值。所以需要在关闭之前进行握手。在一个节点发送 Close
后,另一个节点必须发送 Close-Ok
来回复。然后双方可以关闭通道或者连接。如果节点忽略了 Close
操作,当双方同时发送 Close
时,可能会导致死锁。